from __future__ import annotations import os import shutil import subprocess import sys import tempfile from enum import Enum from pathlib import Path from fastapi import FastAPI, HTTPException from pydantic import BaseModel, Field from one_c_normalizer import NormalizedProject, normalize_one_c_project class RuntimeMode(str, Enum): MOCK = "mock" LOCAL_1C = "local_1c" REMOTE_WORKER = "remote_worker" class RuntimeImportRequest(BaseModel): source_kind: str path: str | None = None project_id: str | None = None credentials_ref: str | None = None metadata: dict = Field(default_factory=dict) class RuntimeImportResponse(BaseModel): status: str mode: str platform_found: bool normalized_project: NormalizedProject | None = None diagnostics: list[str] = Field(default_factory=list) dump_plan: list[str] = Field(default_factory=list) class RuntimePlatformResponse(BaseModel): mode: RuntimeMode platform_found: bool designer_path: str | None = None discovered_paths: list[str] = Field(default_factory=list) execution_enabled: bool = False capabilities: list[str] = Field(default_factory=list) diagnostics: list[str] = Field(default_factory=list) app = FastAPI(title="SFERA Runtime Adapter", version="0.1.0") @app.get("/health") async def health() -> dict[str, str]: return {"status": "ok", "mode": _mode()} @app.get("/api/health") async def api_health() -> dict[str, str]: return {"status": "ok", "mode": _mode()} @app.get("/runtime/platform") async def platform() -> RuntimePlatformResponse: return _platform_status() @app.get("/runtime/modes") async def runtime_modes() -> dict: return { "active": _mode(), "modes": [ { "mode": RuntimeMode.MOCK.value, "description": "No 1C platform required; returns deterministic NormalizedProject fixtures.", }, { "mode": RuntimeMode.LOCAL_1C.value, "description": "Uses local Designer CLI when execution is explicitly enabled.", }, { "mode": RuntimeMode.REMOTE_WORKER.value, "description": "Queues work for an external Windows/1C worker.", }, ], } @app.post("/runtime/import", response_model=RuntimeImportResponse) async def runtime_import(request: RuntimeImportRequest) -> RuntimeImportResponse: mode = _mode() if mode == RuntimeMode.MOCK: return RuntimeImportResponse( status="mock_imported", mode=mode.value, platform_found=False, normalized_project=_mock_project(request.project_id), diagnostics=["1C platform is not required in mock runtime-adapter mode."], ) if mode == RuntimeMode.REMOTE_WORKER: return RuntimeImportResponse( status="queued_for_remote_worker", mode=mode.value, platform_found=False, diagnostics=[f"Remote worker dispatch reserved: {_request_fingerprint(request)}"], ) if mode != RuntimeMode.LOCAL_1C: raise HTTPException(status_code=400, detail=f"Unsupported runtime adapter mode: {mode}") platform_status = _platform_status() source_kind = request.source_kind.upper() if source_kind in {"EDT_PROJECT", "XML_DUMP", "FILE_TREE"}: if not request.path: raise HTTPException(status_code=400, detail="path is required for local metadata import") path = Path(request.path) if not path.exists(): raise HTTPException(status_code=404, detail=f"Path not found: {request.path}") return RuntimeImportResponse( status="normalized", mode=mode.value, platform_found=platform_status.platform_found, normalized_project=normalize_one_c_project(path, project_id=request.project_id), diagnostics=platform_status.diagnostics, ) if not platform_status.platform_found: raise HTTPException(status_code=503, detail="1C Designer CLI was not found") dump_plan = _designer_dump_plan(request) if not platform_status.execution_enabled: return RuntimeImportResponse( status="designer_dump_planned", mode=mode.value, platform_found=True, diagnostics=[ "Designer execution is disabled. Set ONEC_ENABLE_DESIGNER_EXECUTION=true on a trusted worker to run dumps.", "Credentials must be supplied by a runtime secret or prompt, not stored in project files.", ], dump_plan=dump_plan, ) if source_kind in {"CF_FILE", "CFE_FILE"}: if not request.path: raise HTTPException(status_code=400, detail="path is required for CF/CFE import") dump_root, execution_logs = _convert_local_cf_or_cfe_to_metadata_dump(request, platform_status) return RuntimeImportResponse( status="normalized", mode=mode.value, platform_found=True, normalized_project=normalize_one_c_project(dump_root, project_id=request.project_id), diagnostics=[*platform_status.diagnostics, *execution_logs], dump_plan=dump_plan, ) raise HTTPException(status_code=501, detail=f"Designer execution runner is not implemented yet for {source_kind}") def _mode() -> RuntimeMode: value = os.environ.get("RUNTIME_ADAPTER_MODE", RuntimeMode.MOCK.value) try: return RuntimeMode(value) except ValueError: return RuntimeMode.MOCK def _designer_path() -> str: configured = os.environ.get("ONEC_DESIGNER_PATH", "") if configured: return configured candidates = _designer_candidates() return candidates[0] if candidates else "" def _designer_candidates() -> list[str]: if os.environ.get("ONEC_DISABLE_AUTO_DISCOVERY", "").casefold() == "true": return [] roots = [ Path("C:/Program Files/1cv8"), Path("C:/Program Files (x86)/1cv8"), Path("/opt/1cv8"), ] candidates: list[Path] = [] for root in roots: if not root.exists(): continue candidates.extend(root.glob("*/bin/1cv8.exe")) candidates.extend(root.glob("*/bin/1cv8")) return [str(path) for path in sorted(candidates, reverse=True) if path.exists()] def _platform_status() -> RuntimePlatformResponse: mode = _mode() discovered_paths = _designer_candidates() designer_path = _designer_path() or None platform_found = bool(designer_path and Path(designer_path).exists()) execution_enabled = os.environ.get("ONEC_ENABLE_DESIGNER_EXECUTION", "").casefold() == "true" capabilities = ["mock_normalized_project"] diagnostics: list[str] = [] if mode == RuntimeMode.LOCAL_1C: capabilities.extend(["edt_project_normalize", "xml_dump_normalize", "file_tree_normalize"]) if platform_found: capabilities.extend(["cf_dump_plan", "cfe_dump_plan", "live_infobase_dump_plan"]) else: diagnostics.append("ONEC_DESIGNER_PATH is empty, does not exist, and auto-discovery found no Designer CLI.") if mode == RuntimeMode.REMOTE_WORKER: capabilities.append("remote_worker_queue") return RuntimePlatformResponse( mode=mode, platform_found=platform_found, designer_path=designer_path, discovered_paths=discovered_paths, execution_enabled=execution_enabled, capabilities=capabilities, diagnostics=diagnostics, ) def _designer_dump_plan(request: RuntimeImportRequest) -> list[str]: source_kind = request.source_kind.upper() temp_root = request.metadata.get("temp_root", "%TEMP%/sfera-runtime") dump_root = request.metadata.get("dump_root", f"{temp_root}/{request.project_id or 'project'}") plan = [ f"designer={_designer_path()}", f"source={source_kind}", f"dump_root={dump_root}", "create temporary file infobase", ] if source_kind in {"CF_FILE", "CFE_FILE", "ARCHIVE_DUMP"}: plan.append(f"load file={request.path or ''}") if source_kind == "LIVE_INFOBASE": connection = request.metadata.get("connection_string") or request.metadata.get("infobase_connection") or "" plan.append(f"connect infobase={_redact_connection_string(str(connection))}") plan.append(f"credentials_ref={request.credentials_ref or request.metadata.get('credentials_ref') or ''}") plan.extend(["run DumpConfigToFiles", "normalize dumped XML/BSL", "return NormalizedProject"]) return plan def _convert_local_cf_or_cfe_to_metadata_dump( request: RuntimeImportRequest, platform_status: RuntimePlatformResponse, ) -> tuple[Path, list[str]]: source_kind = request.source_kind.upper() payload_path = Path(request.path or "") if not payload_path.exists(): raise HTTPException(status_code=404, detail=f"Path not found: {request.path}") if not platform_status.designer_path: raise HTTPException(status_code=503, detail="1C Designer CLI path is not configured") export_root = Path(tempfile.mkdtemp(prefix=f"sfera-runtime-{request.project_id or 'project'}-")) builder_infobase = export_root / "builder-infobase" logs: list[str] = [] _run_designer_command( platform_status.designer_path, ["CREATEINFOBASE", f"File={builder_infobase};"], export_root / "create-builder-infobase.log", "1C CREATEINFOBASE for local CF/CFE conversion", ) builder_args = ["/F", str(builder_infobase)] artifacts_root = export_root / "artifacts" artifacts_root.mkdir(parents=True, exist_ok=True) shutil.copyfile(payload_path, artifacts_root / payload_path.name) if source_kind == "CF_FILE": _run_designer_command( platform_status.designer_path, [*builder_args, "/LoadCfg", str(payload_path)], export_root / "designer-loadcfg-local-cf.log", "1C LoadCfg local CF", ) metadata_root = export_root / "configuration" metadata_root.mkdir(parents=True, exist_ok=True) _run_designer_command( platform_status.designer_path, [*builder_args, "/DumpConfigToFiles", str(metadata_root), "-Format", "Hierarchical"], export_root / "designer-dumpconfigtofiles-local-cf.log", "1C DumpConfigToFiles from local CF", ) shutil.copyfile(payload_path, metadata_root / payload_path.name) logs.append("Local .cf converted to metadata export for server-side parsing.") return export_root, logs if source_kind == "CFE_FILE": extension_name = str(request.metadata.get("one_c_extension") or payload_path.stem).strip() if not extension_name: raise HTTPException(status_code=400, detail="Extension name is required for local CFE conversion.") _run_designer_command( platform_status.designer_path, [*builder_args, "/LoadCfg", str(payload_path), "-Extension", extension_name, "/UpdateDBCfg"], export_root / "designer-loadcfg-local-cfe.log", "1C LoadCfg local CFE", ) metadata_root = export_root / "extension" metadata_root.mkdir(parents=True, exist_ok=True) _run_designer_command( platform_status.designer_path, [*builder_args, "/DumpConfigToFiles", str(metadata_root), "-Format", "Hierarchical", "-Extension", extension_name], export_root / "designer-dumpconfigtofiles-local-cfe.log", "1C DumpConfigToFiles from local CFE", ) shutil.copyfile(payload_path, metadata_root / payload_path.name) logs.append("Local .cfe converted to metadata export for server-side parsing.") return export_root, logs raise HTTPException(status_code=400, detail=f"Unsupported local 1C source: {source_kind}") def _designer_process_command(designer_path: str, arguments: list[str]) -> list[str]: path = Path(designer_path) if path.suffix.casefold() == ".py": return [sys.executable, designer_path, *arguments] return [designer_path, *arguments] def _run_designer_command(designer_path: str, arguments: list[str], log_path: Path, action_title: str, timeout_seconds: int = 180) -> None: command = _designer_process_command(designer_path, arguments) log_path.parent.mkdir(parents=True, exist_ok=True) completed = subprocess.run( command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, encoding="utf-8", errors="replace", timeout=timeout_seconds, check=False, ) output = completed.stdout or "" log_path.write_text(output, encoding="utf-8") if completed.returncode != 0: tail = output[-4000:] if output else "" raise HTTPException( status_code=500, detail=f"{action_title} failed with code {completed.returncode}. {tail}".strip(), ) def _redact_connection_string(value: str) -> str: sensitive_keys = {"pwd", "password", "пароль"} chunks: list[str] = [] for raw_part in value.split(";"): part = raw_part.strip() if not part: continue if "=" not in part: chunks.append(part) continue key, raw_value = part.split("=", 1) clean_key = key.strip().strip('"').casefold() if clean_key in sensitive_keys: chunks.append(f"{key}=") else: chunks.append(f"{key}={raw_value}") return ";".join(chunks) def _request_fingerprint(request: RuntimeImportRequest) -> str: import hashlib value = f"{request.project_id}:{request.source_kind}:{request.path}:{request.metadata.get('connection_string', '')}" return hashlib.sha1(value.encode("utf-8")).hexdigest()[:12] def _mock_project(project_id: str | None) -> NormalizedProject: from one_c_normalizer import ( AccessGroup, AccessModel, AccessProfile, AccessRoleAssignment, AccessUser, Command, ConfigurationRoot, Extension, Form, MetadataGroup, MetadataObject, ObjectPart, Rights, ) return NormalizedProject( project_id=project_id, source_path="mock://runtime-adapter", access=AccessModel( profiles=[ AccessProfile( name="МенеджерПродаж", qualified_name="ПрофильГруппыДоступа.МенеджерПродаж", roles=[AccessRoleAssignment(role="Менеджер", role_qualified_name="Роль.Менеджер")], ) ], groups=[ AccessGroup( name="ОтделПродаж", qualified_name="ГруппаДоступа.ОтделПродаж", profile="МенеджерПродаж", users=["demo.user"], ) ], users=[ AccessUser( name="demo.user", qualified_name="Пользователь.demo.user", full_name="Demo User", groups=["ОтделПродаж"], ) ], ), configuration=ConfigurationRoot( groups=[ MetadataGroup( name="Справочники", object_kinds=["CATALOG"], objects=[ MetadataObject( name="Контрагенты", qualified_name="Справочник.Контрагенты", object_kind="CATALOG", attributes=[ObjectPart(name="ИНН", kind="ATTRIBUTE")], forms=[Form(name="ФормаЭлемента")], commands=[Command(name="ПроверитьИНН")], ) ], ), MetadataGroup( name="Документы", object_kinds=["DOCUMENT"], objects=[ MetadataObject( name="ЗаказПокупателя", qualified_name="Документ.ЗаказПокупателя", object_kind="DOCUMENT", attributes=[ObjectPart(name="Контрагент", kind="ATTRIBUTE")], tabular_sections=[ObjectPart(name="Товары", kind="TABULAR_SECTION")], forms=[Form(name="ФормаДокумента")], commands=[Command(name="Провести")], rights=[ Rights( name="Документ.ЗаказПокупателя", target="Документ.ЗаказПокупателя", permissions={"read": "true", "write": "true", "post": "true"}, ) ], ) ], ), MetadataGroup( name="Регистры", object_kinds=["REGISTER"], objects=[ MetadataObject( name="ОстаткиТоваров", qualified_name="РегистрНакопления.ОстаткиТоваров", object_kind="REGISTER", attributes=[ ObjectPart(name="Номенклатура", kind="ATTRIBUTE", attributes={"attribute_role": "DIMENSION"}), ObjectPart(name="Количество", kind="ATTRIBUTE", attributes={"attribute_role": "RESOURCE"}), ], ) ], ), MetadataGroup( name="Подсистемы", object_kinds=["SUBSYSTEM"], objects=[ MetadataObject( name="Продажи", qualified_name="Подсистема.Продажи", object_kind="SUBSYSTEM", ) ], ), MetadataGroup( name="HTTP-сервисы", object_kinds=["HTTP_SERVICE"], objects=[ MetadataObject( name="ПубличныйAPI", qualified_name="HTTPСервис.ПубличныйAPI", object_kind="HTTP_SERVICE", ) ], ), MetadataGroup( name="XDTO", object_kinds=["XDTO_PACKAGE"], objects=[ MetadataObject( name="ИнтеграцияCRM", qualified_name="XDTO.ИнтеграцияCRM", object_kind="XDTO_PACKAGE", ) ], ), ], extensions=[Extension(name="CRM", version="mock")], ), )