diff --git a/services/api-server/src/api_server/html5_ai_structure_controller.py b/services/api-server/src/api_server/html5_ai_structure_controller.py index 6c645ae..df3c0dd 100644 --- a/services/api-server/src/api_server/html5_ai_structure_controller.py +++ b/services/api-server/src/api_server/html5_ai_structure_controller.py @@ -2,6 +2,7 @@ from __future__ import annotations from collections.abc import Callable, Iterable from pathlib import Path +import shutil from typing import Any from uuid import uuid4 @@ -97,6 +98,7 @@ async def html5_ai_structure_run( save_run_state( job.job_id, { + **(getattr(job, "state", {}) or {}), "project_id": project_id, "effective_project_id": effective_project_id, "input_path": input_path, @@ -158,7 +160,7 @@ async def html5_ai_structure_check_path( return render_html5_ai_structure_path_check(result) -def html5_ai_structure_job( +async def html5_ai_structure_job( *, project_id: str, job_id: str, @@ -168,6 +170,7 @@ def html5_ai_structure_job( save_run_state: Callable[[str, dict[str, Any]], None], load_job: Callable[[str], object | None], current_project_source_root: Callable[[str], Path | None], + advance_binary_run: Callable[[str, dict[str, Any]], Any] | None = None, ) -> str: state = load_run_state(job_id) if state is None: @@ -175,35 +178,59 @@ def html5_ai_structure_job( if state.get("result") is not None: return render_html5_ai_structure_result(dict(state["result"])) - job = load_job(job_id) - if job is None or str(getattr(job, "project_id", "")) != project_id: - return render_html5_ai_structure_error(f"Задача агента не найдена: {job_id}") + if state.get("agent_sequence"): + if advance_binary_run is None: + return render_html5_ai_structure_error("Сервис последовательной выгрузки CF/CFE не подключен.") + step_result = await advance_binary_run(job_id, dict(state)) + if step_result.get("phase") == "error": + return render_html5_ai_structure_error(str(step_result.get("error") or "Windows Agent завершил задачу с ошибкой.")) + if step_result.get("phase") == "running": + updated_state = dict(step_result.get("state") or state) + save_run_state(job_id, updated_state) + return render_html5_ai_structure_job( + project_id=project_id, + job_id=job_id, + status=str(step_result.get("status") or "RUNNING"), + source=str(step_result.get("source") or ""), + message=str(step_result.get("message") or "Windows Agent выгружает структуру"), + logs=list(step_result.get("logs") or []), + ) + if step_result.get("phase") != "completed": + return render_html5_ai_structure_error("Не удалось определить состояние последовательной выгрузки CF/CFE.") + state = dict(step_result.get("state") or state) + save_run_state(job_id, state) + source_roots = [Path(path) for path in list(step_result.get("source_roots") or []) if str(path).strip()] + else: + job = load_job(job_id) + if job is None or str(getattr(job, "project_id", "")) != project_id: + return render_html5_ai_structure_error(f"Задача агента не найдена: {job_id}") - status = _enum_text(getattr(job, "status", "UNKNOWN")) - source = _enum_text(getattr(job, "source", "")) - logs = list(getattr(job, "logs", []) or []) - if status in {"QUEUED", "RUNNING"}: - return render_html5_ai_structure_job( - project_id=project_id, - job_id=job_id, - status=status, - source=source, - message="Windows Agent выгружает структуру и передает ее на сервер", - logs=logs, - ) - if status != "SUCCEEDED": - error = str(getattr(job, "error", "") or "Windows Agent завершил задачу с ошибкой.") - if logs: - error = f"{error} Последние сообщения: {' | '.join(str(item) for item in logs[-4:])}" - return render_html5_ai_structure_error(error) + status = _enum_text(getattr(job, "status", "UNKNOWN")) + source = _enum_text(getattr(job, "source", "")) + logs = list(getattr(job, "logs", []) or []) + if status in {"QUEUED", "RUNNING"}: + return render_html5_ai_structure_job( + project_id=project_id, + job_id=job_id, + status=status, + source=source, + message="Windows Agent выгружает структуру и передает ее на сервер", + logs=logs, + ) + if status != "SUCCEEDED": + error = str(getattr(job, "error", "") or "Windows Agent завершил задачу с ошибкой.") + if logs: + error = f"{error} Последние сообщения: {' | '.join(str(item) for item in logs[-4:])}" + return render_html5_ai_structure_error(error) - source_root = current_project_source_root(str(state.get("effective_project_id") or project_id)) - if source_root is None: - import_summary = getattr(job, "import_summary", None) or {} - source_path = str(import_summary.get("source_path") or "") - source_root = Path(source_path) if source_path else None - if source_root is None or not source_root.exists(): - return render_html5_ai_structure_error("После выгрузки агентом сервер не нашел папку с XML/BSL-структурой для подготовки пакета.") + source_root = current_project_source_root(str(state.get("effective_project_id") or project_id)) + if source_root is None: + import_summary = getattr(job, "import_summary", None) or {} + source_path = str(import_summary.get("source_path") or "") + source_root = Path(source_path) if source_path else None + if source_root is None or not source_root.exists(): + return render_html5_ai_structure_error("После выгрузки агентом сервер не нашел папку с XML/BSL-структурой для подготовки пакета.") + source_roots = [source_root] output_path = str(state.get("output_path") or "") username = str(state.get("username") or "") @@ -216,6 +243,7 @@ def html5_ai_structure_job( try: work_dir.mkdir(parents=True, exist_ok=True) local_output = work_dir / "output" if is_unc_path(output_path) else Path(output_path) + source_root = _compose_ai_structure_source_root(work_dir / "source", source_roots) result = prepare( project_id=str(state.get("effective_project_id") or project_id), input_path=source_root, @@ -296,5 +324,30 @@ def _normalize_binary_match(value: str | dict[str, str] | None) -> dict[str, str return {"suffix": value, "relative_path": "", "binary_relative_paths": []} +def _compose_ai_structure_source_root(target_root: Path, source_roots: list[Path]) -> Path: + existing_roots = [path for path in source_roots if path.exists()] + if not existing_roots: + raise FileNotFoundError("После выгрузки агентом сервер не нашел папки с XML/BSL-структурой для подготовки пакета.") + if len(existing_roots) == 1: + return existing_roots[0] + target_root.mkdir(parents=True, exist_ok=True) + base_root = next((path for path in existing_roots if path.name.casefold().endswith(".cf") is False and (path / "src").exists()), existing_roots[0]) + if base_root.exists(): + shutil.copytree(base_root, target_root, dirs_exist_ok=True) + used_names: set[str] = set() + for root in existing_roots: + if root == base_root: + continue + name = root.name or f"extension-{len(used_names) + 1}" + candidate = name + index = 2 + while candidate in used_names or (target_root / candidate).exists(): + candidate = f"{name}-{index}" + index += 1 + used_names.add(candidate) + shutil.copytree(root, target_root / candidate, dirs_exist_ok=True) + return target_root + + def _enum_text(value: object) -> str: return str(getattr(value, "value", value or "")) diff --git a/services/api-server/src/api_server/main.py b/services/api-server/src/api_server/main.py index 05287db..e3e3dfb 100644 --- a/services/api-server/src/api_server/main.py +++ b/services/api-server/src/api_server/main.py @@ -18,6 +18,7 @@ from collections import Counter from difflib import SequenceMatcher from pathlib import Path from typing import Any, Callable +from types import SimpleNamespace from urllib.parse import quote, urljoin, urlsplit, urlunsplit from uuid import uuid4 @@ -334,6 +335,9 @@ def _load_ai_structure_agent_run(job_id: str) -> dict[str, Any] | None: except (ValueError, UnicodeDecodeError): password = "" result = payload.get("result") + agent_sequence = payload.get("agent_sequence") + completed_steps = payload.get("completed_steps") + sequence_logs = payload.get("sequence_logs") return { "project_id": str(payload.get("project_id") or ""), "effective_project_id": str(payload.get("effective_project_id") or ""), @@ -345,6 +349,13 @@ def _load_ai_structure_agent_run(job_id: str) -> dict[str, Any] | None: "domain": str(payload.get("domain") or ""), "password": password, "result": result if isinstance(result, dict) else None, + "agent_sequence": agent_sequence if isinstance(agent_sequence, list) else [], + "current_step_index": int(payload.get("current_step_index") or 0), + "current_agent_job_id": str(payload.get("current_agent_job_id") or ""), + "agent_id": str(payload.get("agent_id") or ""), + "display_source": str(payload.get("display_source") or ""), + "completed_steps": completed_steps if isinstance(completed_steps, list) else [], + "sequence_logs": sequence_logs if isinstance(sequence_logs, list) else [], } @@ -364,6 +375,13 @@ def _save_ai_structure_agent_run(job_id: str, payload: dict[str, Any]) -> None: "domain": str(payload.get("domain") or ""), "password_b64": base64.b64encode(password.encode("utf-8")).decode("ascii") if password else "", "result": payload.get("result") if isinstance(payload.get("result"), dict) else None, + "agent_sequence": payload.get("agent_sequence") if isinstance(payload.get("agent_sequence"), list) else [], + "current_step_index": int(payload.get("current_step_index") or 0), + "current_agent_job_id": str(payload.get("current_agent_job_id") or ""), + "agent_id": str(payload.get("agent_id") or ""), + "display_source": str(payload.get("display_source") or ""), + "completed_steps": payload.get("completed_steps") if isinstance(payload.get("completed_steps"), list) else [], + "sequence_logs": payload.get("sequence_logs") if isinstance(payload.get("sequence_logs"), list) else [], }, ) @@ -612,6 +630,33 @@ def _ai_structure_agent_root_mismatch_detail(agent_id: str, input_path: str, roo ) +def _ai_structure_step_path(raw_input_path: str, relative_path: str | None) -> Path: + if relative_path: + return Path(ntpath.join(raw_input_path, relative_path.replace("/", "\\"))) + return Path(raw_input_path) + + +async def _queue_ai_structure_agent_step( + *, + project_id: str, + source: ImportSourceKind, + agent_id: str, + local_path: str, + metadata: dict[str, Any], +) -> AgentImportJob: + return await create_agent_import_job( + project_id, + source, + AgentImportJobRequest( + agent_id=agent_id, + source=source, + local_path=local_path, + mode=ImportMode.FULL_REPLACE, + metadata=metadata, + ), + ) + + async def _start_ai_structure_agent_job( *, project_id: str, @@ -631,16 +676,6 @@ async def _start_ai_structure_agent_job( cf_files = [path for path in binary_files if path.suffix.casefold() == ".cf"] cfe_files = [path for path in binary_files if path.suffix.casefold() == ".cfe"] - if cf_files and cfe_files: - raise HTTPException( - status_code=400, - detail=( - "Во входной папке одновременно лежат .cf и .cfe. " - f"Найдены: {_format_binary_file_list(binary_files)}. " - "Укажите конкретный файл, который нужно подготовить для ИИ." - ), - ) - source = ImportSourceKind.CF_FILE if cf_files else ImportSourceKind.CFE_FILE agent_id = _agent_id_for_source(settings, ImportSourceKind.CF_FILE) if not agent_id: raise HTTPException( @@ -663,49 +698,85 @@ async def _start_ai_structure_agent_job( status_code=400, detail=_ai_structure_agent_root_mismatch_detail(agent_id, input_path, network_roots), ) - agent = settings.agent if isinstance(settings.agent, dict) else {} - metadata: dict[str, Any] = { + if len(cf_files) > 1: + raise HTTPException( + status_code=400, + detail=( + "Для проекта ожидается одна основная конфигурация .cf. " + f"Найдены: {_format_binary_file_list(cf_files)}." + ), + ) + + common_metadata: dict[str, Any] = { "platform_version": settings.platform_version or None, "compatibility_mode": settings.compatibility_mode or None, } - local_path: str | None = None + steps: list[dict[str, Any]] = [] + if cf_files: + cf_file = cf_files[0] + steps.append( + { + "source": ImportSourceKind.CF_FILE, + "local_path": str(cf_file), + "label": cf_file.name, + "metadata": {**common_metadata, "input_mode": "cf_file"}, + } + ) + for cfe_file in sorted(cfe_files): + steps.append( + { + "source": ImportSourceKind.CFE_FILE, + "local_path": str(cfe_file), + "label": cfe_file.name, + "metadata": { + **common_metadata, + "input_mode": "cfe_file", + "one_c_extension": cfe_file.stem, + }, + } + ) + if not steps: + raise HTTPException(status_code=400, detail="Во входном пути не найдены файлы .cf или .cfe.") - if source == ImportSourceKind.CF_FILE: - if len(cf_files) != 1: - raise HTTPException( - status_code=400, - detail=( - "Для прямого разбора .cf укажите один конкретный файл .cf, " - f"а не папку с несколькими конфигурациями. Найдены: {_format_binary_file_list(cf_files)}." - ), - ) - local_path = str(cf_files[0]) - metadata["input_mode"] = "cf_file" - else: - if len(cfe_files) != 1: - raise HTTPException( - status_code=400, - detail=( - "Для прямого разбора расширения укажите один конкретный файл .cfe, " - f"а не папку с несколькими расширениями. Найдены: {_format_binary_file_list(cfe_files)}." - ), - ) - cfe_file = cfe_files[0] - local_path = str(cfe_file) - metadata["one_c_extension"] = cfe_file.stem - metadata["input_mode"] = "cfe_file" - - return await create_agent_import_job( - effective_project_id, - source, - AgentImportJobRequest( - agent_id=agent_id, - source=source, - local_path=local_path, - mode=ImportMode.FULL_REPLACE, - metadata=metadata, - ), + first = steps[0] + first_job = await _queue_ai_structure_agent_step( + project_id=effective_project_id, + source=first["source"], + agent_id=agent_id, + local_path=first["local_path"], + metadata=first["metadata"], + ) + binary_kind = "CF_FILE+CFE_FILE" if cf_files and cfe_files else first["source"].value + intro_logs = [ + ( + f"Найдено файлов для подготовки: конфигураций {len(cf_files)}, " + f"расширений {len(cfe_files)}. Сначала выгружается конфигурация, затем все расширения." + ) + if cf_files and cfe_files + else f"Найден файл для подготовки: {first['label']}." + ] + return SimpleNamespace( + job_id=first_job.job_id, + status=first_job.status, + source=binary_kind, + logs=[*intro_logs, *list(first_job.logs or [])], + state={ + "agent_sequence": [ + { + "source": step["source"].value, + "local_path": step["local_path"], + "label": step["label"], + "metadata": step["metadata"], + } + for step in steps + ], + "current_step_index": 0, + "current_agent_job_id": first_job.job_id, + "agent_id": agent_id, + "display_source": binary_kind, + "completed_steps": [], + }, ) @@ -802,6 +873,96 @@ async def _check_ai_structure_agent_path(*, project_id: str, input_path: str) -> } +async def _advance_ai_structure_agent_run(run_id: str, state: dict[str, Any]) -> dict[str, Any]: + sequence = list(state.get("agent_sequence") or []) + if not sequence: + return {"phase": "error", "error": "План выгрузки CF/CFE не найден."} + current_job_id = str(state.get("current_agent_job_id") or run_id) + current_step_index = int(state.get("current_step_index") or 0) + current_job = _agent_import_jobs.get(current_job_id) + if current_job is None: + return {"phase": "error", "error": f"Задача агента не найдена: {current_job_id}"} + + display_source = str(state.get("display_source") or getattr(current_job.source, "value", current_job.source or "")) + logs = list(state.get("sequence_logs") or []) + logs.extend(str(item) for item in list(current_job.logs or [])) + state["sequence_logs"] = logs[-24:] + + if current_job.status in {AgentImportJobStatus.QUEUED, AgentImportJobStatus.RUNNING}: + return { + "phase": "running", + "state": state, + "job_id": run_id, + "status": current_job.status.value, + "source": display_source, + "logs": state["sequence_logs"], + "message": "Windows Agent выгружает структуру и передает ее на сервер", + } + if current_job.status != AgentImportJobStatus.SUCCEEDED: + error = str(current_job.error or "Windows Agent завершил задачу с ошибкой.") + if current_job.logs: + error = f"{error} Последние сообщения: {' | '.join(str(item) for item in current_job.logs[-4:])}" + return {"phase": "error", "error": error} + + completed_steps = list(state.get("completed_steps") or []) + if not any(str(item.get("job_id")) == current_job_id for item in completed_steps): + import_summary = current_job.import_summary or {} + source_path = str(import_summary.get("source_path") or current_job.server_path or "") + if not source_path: + return { + "phase": "error", + "error": "После выгрузки агентом сервер не вернул путь к экспортированной структуре.", + } + completed_steps.append( + { + "job_id": current_job_id, + "source": sequence[current_step_index]["source"], + "label": sequence[current_step_index]["label"], + "source_path": source_path, + } + ) + state["completed_steps"] = completed_steps + + next_step_index = current_step_index + 1 + if next_step_index < len(sequence): + next_step = sequence[next_step_index] + next_job = await _queue_ai_structure_agent_step( + project_id=str(state.get("effective_project_id") or state.get("project_id") or ""), + source=ImportSourceKind(next_step["source"]), + agent_id=str(state.get("agent_id") or ""), + local_path=str(next_step["local_path"]), + metadata=dict(next_step.get("metadata") or {}), + ) + state["current_step_index"] = next_step_index + state["current_agent_job_id"] = next_job.job_id + state["sequence_logs"] = [ + *state["sequence_logs"], + f"Шаг {current_step_index + 1} завершен: {sequence[current_step_index]['label']}.", + f"Запущен следующий шаг {next_step_index + 1}: {next_step['label']}.", + *list(next_job.logs or []), + ][-24:] + return { + "phase": "running", + "state": state, + "job_id": run_id, + "status": next_job.status.value, + "source": display_source, + "logs": state["sequence_logs"], + "message": "Windows Agent продолжает выгрузку конфигурации и расширений", + } + + state["sequence_logs"] = [ + *state["sequence_logs"], + f"Шаг {current_step_index + 1} завершен: {sequence[current_step_index]['label']}.", + "Все найденные CF/CFE выгружены. Сервер собирает объединенный пакет для ИИ.", + ][-24:] + return { + "phase": "completed", + "state": state, + "source_roots": [str(item.get("source_path") or "") for item in completed_steps if str(item.get("source_path") or "")], + } + + def _ai_structure_binary_files( raw_input_path: str, detected_binary_relative_path: str | None = None, @@ -1978,7 +2139,7 @@ async def html5_project_ai_structure_check_path(project_id: str, request: Reques @app.get("/html5/projects/{project_id}/ai-structure/jobs/{job_id}") async def html5_project_ai_structure_job(project_id: str, job_id: str) -> Response: return _html5_response( - _html5_ai_structure_job( + await _html5_ai_structure_job( project_id=project_id, job_id=job_id, prepare=_prepare_ai_structure, @@ -1987,6 +2148,7 @@ async def html5_project_ai_structure_job(project_id: str, job_id: str) -> Respon save_run_state=_save_ai_structure_agent_run, load_job=lambda current_job_id: _agent_import_jobs.get(current_job_id), current_project_source_root=_current_project_source_root, + advance_binary_run=_advance_ai_structure_agent_run, ) ) diff --git a/services/api-server/tests/test_api.py b/services/api-server/tests/test_api.py index df945d5..7d7aabb 100644 --- a/services/api-server/tests/test_api.py +++ b/services/api-server/tests/test_api.py @@ -1962,6 +1962,107 @@ def test_html5_ai_structure_routes_binary_cfe_through_windows_agent(tmp_path: Pa assert (output / f"codex-1c-context-{project_id}" / "AGENTS.md").exists() +def test_html5_ai_structure_routes_cf_and_cfe_as_single_project(tmp_path: Path): + base_root = tmp_path / "base-export" + base_root.mkdir() + (base_root / "metadata.xml").write_text( + """ + + + +""", + encoding="utf-8", + ) + extension_root = tmp_path / "extension-export" + extension_root.mkdir() + (extension_root / "РасширениеCRM.mdo").write_text( + """ + + CRM + 1.0 + + КонтрагентыCRM + + +""", + encoding="utf-8", + ) + source_dir = tmp_path / "binary" + source_dir.mkdir() + (source_dir / "1Cv8.cf").write_bytes(b"binary-cf") + (source_dir / "Marketplace.cfe").write_bytes(b"binary-cfe") + output = tmp_path / "ai-out-mixed" + client = TestClient(app) + project_id = f"ai-agent-mixed-{uuid4()}" + agent_id = f"win-agent-{uuid4()}" + + settings = client.post( + f"/projects/{project_id}/settings", + json={"name": "AI Agent Mixed Demo", "structure_source": "CF_FILE", "agent": {"cf_agent_id": agent_id}}, + ) + assert settings.status_code == 200 + heartbeat = client.post("/agent/heartbeat", json={"agent_id": agent_id, "host": "test-host"}) + assert heartbeat.status_code == 200 + + queued = client.post( + f"/html5/projects/{project_id}/ai-structure/run", + data={"project_id": project_id, "input_path": str(source_dir), "output_path": str(output)}, + ) + assert queued.status_code == 200 + assert "конфигураций 1, расширений 1" in queued.text + match = re.search(r"/html5/projects/[^/]+/ai-structure/jobs/([A-Za-z0-9-]+)", queued.text) + assert match is not None + job_id = match.group(1) + + first_claimed = client.get("/agent/jobs/next", params={"agent_id": agent_id}) + assert first_claimed.status_code == 200 + assert first_claimed.json()["job_id"] == job_id + assert first_claimed.json()["source"] == "CF_FILE" + + completed_cf = client.post( + f"/agent/jobs/{job_id}/result", + json={"status": "SUCCEEDED", "server_path": str(base_root), "logs": ["Выгрузка конфигурации завершена."]}, + ) + assert completed_cf.status_code == 200 + main._agent_import_jobs[job_id].status = main.AgentImportJobStatus.SUCCEEDED + main._agent_import_jobs[job_id].import_summary = {"source_path": str(base_root)} + + deadline = time.monotonic() + 10 + second_job = None + while time.monotonic() < deadline: + polled = client.get(f"/html5/projects/{project_id}/ai-structure/jobs/{job_id}") + assert polled.status_code == 200 + second_claimed = client.get("/agent/jobs/next", params={"agent_id": agent_id}) + assert second_claimed.status_code == 200 + second_job = second_claimed.json() + if second_job: + break + time.sleep(0.05) + assert second_job is not None + assert second_job["source"] == "CFE_FILE" + second_job_id = second_job["job_id"] + + completed_cfe = client.post( + f"/agent/jobs/{second_job_id}/result", + json={"status": "SUCCEEDED", "server_path": str(extension_root), "logs": ["Выгрузка расширения завершена."]}, + ) + assert completed_cfe.status_code == 200 + main._agent_import_jobs[second_job_id].status = main.AgentImportJobStatus.SUCCEEDED + main._agent_import_jobs[second_job_id].import_summary = {"source_path": str(extension_root)} + + deadline = time.monotonic() + 10 + fragment = "" + while time.monotonic() < deadline: + polled = client.get(f"/html5/projects/{project_id}/ai-structure/jobs/{job_id}") + assert polled.status_code == 200 + fragment = polled.text + if "готово" in fragment: + break + time.sleep(0.05) + assert "готово" in fragment + assert (output / f"codex-1c-context-{project_id}" / "AGENTS.md").exists() + + def test_html5_ai_structure_routes_unc_directory_with_cf_through_windows_agent(monkeypatch, tmp_path: Path): from api_server import html5_ai_structure_controller as controller @@ -2041,7 +2142,7 @@ def test_html5_ai_structure_reports_multiple_binary_files_in_directory(tmp_path: data={"project_id": project_id, "input_path": str(tmp_path), "output_path": str(tmp_path / 'out')}, ) assert queued.status_code == 200 - assert "один конкретный файл .cf" in queued.text + assert "одна основная конфигурация .cf" in queued.text assert "first.cf" in queued.text assert "second.cf" in queued.text