diff --git a/services/api-server/src/api_server/main.py b/services/api-server/src/api_server/main.py index e072c1e..a989f28 100644 --- a/services/api-server/src/api_server/main.py +++ b/services/api-server/src/api_server/main.py @@ -125,6 +125,18 @@ from api_server.normalized_project_service import ( normalized_all_groups as _normalized_all_groups, normalized_project_summary as _normalized_project_summary, ) +from api_server.normalized_object_models import ( + BslCompletionItemResponse, + ModuleRoutineResponse, + ModuleSourceResponse, + NormalizedObjectDetail, +) +from api_server.normalized_object_service import ( + module_object_part_for_response as _module_object_part_for_response, + normalized_bsl_completion_items as _normalized_bsl_completion_items, + normalized_module_sources_for_object as _normalized_module_sources_for_object, + normalized_object_detail as _normalized_object_detail, +) from impact_engine import object_impact, routine_impact from incremental_indexer import rebuild_changed_file from integration_topology import IntegrationKind, build_integration_topology @@ -929,50 +941,6 @@ class ImportSyncPreview(BaseModel): items: list[ImportSyncDiffItem] = Field(default_factory=list) -class NormalizedObjectDetail(BaseModel): - project_id: str | None = None - group_name: str - object: MetadataObject - - -class ModuleRoutineResponse(BaseModel): - name: str - kind: str - line_start: int | None = None - line_end: int | None = None - export: bool = False - calls_count: int = 0 - queries_count: int = 0 - writes_count: int = 0 - calls: list[str] = Field(default_factory=list) - queries: list[str] = Field(default_factory=list) - writes: list[str] = Field(default_factory=list) - impact_level: str = "LOW" - impact_reasons: list[str] = Field(default_factory=list) - - -class ModuleSourceResponse(BaseModel): - name: str - qualified_name: str - module_role: str = "MODULE" - owner_qualified_name: str | None = None - owner_kind: str | None = None - object_part: str | None = None - form_name: str | None = None - form_qualified_name: str | None = None - source_path: str - source_text: str - routines_count: int = 0 - routines: list[ModuleRoutineResponse] = Field(default_factory=list) - - -class BslCompletionItemResponse(BaseModel): - label: str - kind: str = "VALUE" - detail: str | None = None - insert_text: str | None = None - - class ImportSummary(BaseModel): source: ImportSourceKind mode: ImportMode = ImportMode.FULL_REPLACE @@ -7516,111 +7484,6 @@ def _import_quality_response(project_id: str) -> ImportQualityResponse: ) -def _normalized_object_detail(normalized: NormalizedProject, qualified_name: str) -> NormalizedObjectDetail | None: - for group in normalized.configuration.groups: - for item in group.objects: - if item.qualified_name == qualified_name: - return NormalizedObjectDetail( - project_id=normalized.project_id, - group_name=group.name, - object=item, - ) - return None - - -def _normalized_module_sources_for_object(normalized: NormalizedProject, qualified_name: str) -> list[ModuleSourceResponse]: - normalized_query = qualified_name.strip().casefold() - if not normalized_query: - return [] - selected_module = None - selected_owner = None - selected_object = None - for group in _normalized_all_groups(normalized): - for item in group.objects: - if item.qualified_name.casefold() == normalized_query or item.name.casefold() == normalized_query: - selected_object = item - break - for module in item.modules: - module_keys = { - str(module.qualified_name or "").casefold(), - str(module.name or "").casefold(), - str(module.source_path or "").casefold(), - } - if normalized_query in module_keys: - selected_module = module - selected_owner = item - break - if selected_object is not None or selected_module is not None: - break - if selected_object is not None or selected_module is not None: - break - if selected_module is not None: - return [_normalized_module_source_response(selected_module, selected_owner)] - if selected_object is None: - return [] - return sorted( - [_normalized_module_source_response(module, selected_object) for module in selected_object.modules], - key=lambda item: (item.module_role, item.name), - ) - - -def _normalized_bsl_completion_items( - normalized: NormalizedProject, - receiver: str | None, - qualified_name: str | None, -) -> list[BslCompletionItemResponse]: - receiver_key = (receiver or "").strip().casefold() - qualified_key = (qualified_name or "").strip().casefold() - items: list[BslCompletionItemResponse] = [] - for group in _normalized_all_groups(normalized): - for metadata_object in group.objects: - object_names = { - metadata_object.name.casefold(), - metadata_object.qualified_name.casefold(), - } - if receiver_key and receiver_key in object_names: - items.extend( - BslCompletionItemResponse( - label=part.name, - kind=_completion_kind_for_part(part.kind), - detail=f"{metadata_object.qualified_name}: {part.kind}", - insert_text=part.name, - ) - for part in [ - *metadata_object.attributes, - *metadata_object.resources, - *metadata_object.dimensions, - *metadata_object.tabular_sections, - *metadata_object.commands, - ] - ) - for module in metadata_object.modules: - module_names = { - str(module.name or "").casefold(), - str(module.qualified_name or "").casefold(), - str(module.source_path or "").casefold(), - f"{metadata_object.name}.{module.name}".casefold(), - f"{metadata_object.qualified_name}.{module.name}".casefold(), - } - if receiver_key and receiver_key not in module_names and receiver_key not in object_names: - continue - if not receiver_key and qualified_key and qualified_key not in module_names and qualified_key not in object_names: - continue - source_text = str((module.attributes or {}).get("source_text", "")) - for routine in _normalized_module_routines(source_text): - if receiver_key and not routine.export: - continue - items.append( - BslCompletionItemResponse( - label=routine.name, - kind="FUNCTION" if routine.kind == "FUNCTION" else "PROCEDURE", - detail=f"{module.qualified_name or module.name}{' · Export' if routine.export else ''}", - insert_text=f"{routine.name}()", - ) - ) - return items - - def _snapshot_bsl_completion_items(snapshot: SirSnapshot, receiver: str | None) -> list[BslCompletionItemResponse]: receiver_key = (receiver or "").strip().casefold() nodes_by_lineage = {node.lineage_id: node for node in snapshot.nodes} @@ -7654,73 +7517,6 @@ def _snapshot_bsl_completion_items(snapshot: SirSnapshot, receiver: str | None) return items -def _completion_kind_for_part(kind: str) -> str: - normalized = kind.upper() - if normalized in {"ATTRIBUTE", "RESOURCE", "DIMENSION", "FIELD"}: - return "PROPERTY" - if normalized in {"COMMAND", "METHOD", "OPERATION"}: - return "METHOD" - if normalized in {"TABULAR_SECTION", "TABLE"}: - return "COLLECTION" - return "VALUE" - - -def _normalized_module_source_response(module, owner) -> ModuleSourceResponse: - attributes = module.attributes or {} - source_text = str(attributes.get("source_text", "")) - routines = _normalized_module_routines(source_text) - module_role = str(module.module_kind or attributes.get("module_role") or "MODULE") - return ModuleSourceResponse( - name=module.name, - qualified_name=module.qualified_name or module.name, - module_role=module_role, - owner_qualified_name=str(attributes.get("owner_qualified_name") or getattr(owner, "qualified_name", "") or "") or None, - owner_kind=str(attributes.get("owner_kind") or getattr(owner, "object_kind", "") or "") or None, - object_part=str(attributes.get("object_part") or _module_object_part_for_response(module_role, str(attributes.get("form_name") or ""))), - form_name=str(attributes.get("form_name") or "") or None, - form_qualified_name=str(attributes.get("form_qualified_name") or "") or None, - source_path=module.source_path or "", - source_text=source_text, - routines_count=len(routines), - routines=routines, - ) - - -def _module_object_part_for_response(module_role: str, form_name: str = "") -> str: - return { - "OBJECT_MODULE": "object.module", - "MANAGER_MODULE": "object.manager", - "RECORD_SET_MODULE": "object.record_set", - "FORM_MODULE": f"form.{form_name}.module" if form_name else "form.module", - "MODULE": "module", - }.get(module_role, "module") - - -def _normalized_module_routines(source_text: str) -> list[ModuleRoutineResponse]: - if not source_text: - return [] - declarations: list[tuple[int, re.Match[str]]] = [] - pattern = re.compile(r"^\s*(Процедура|Функция|Procedure|Function)\s+([A-Za-zА-Яа-яЁё_][\wА-Яа-яЁё]*)\s*\(([^)]*)\)\s*(.*)$", re.IGNORECASE | re.MULTILINE) - for match in pattern.finditer(source_text): - line_start = source_text.count("\n", 0, match.start()) + 1 - declarations.append((line_start, match)) - routines: list[ModuleRoutineResponse] = [] - for index, (line_start, match) in enumerate(declarations): - line_end = declarations[index + 1][0] - 1 if index + 1 < len(declarations) else len(source_text.splitlines()) - kind_label = match.group(1).casefold() - tail = match.group(4) or "" - routines.append( - ModuleRoutineResponse( - name=match.group(2), - kind="FUNCTION" if kind_label in {"функция", "function"} else "PROCEDURE", - line_start=line_start, - line_end=line_end, - export=bool(re.search(r"\b(Экспорт|Export)\b", tail, re.IGNORECASE)), - ) - ) - return routines - - def _module_sources_for_object(snapshot: SirSnapshot, qualified_name: str) -> list[ModuleSourceResponse]: nodes_by_lineage = {node.lineage_id: node for node in snapshot.nodes} selected_routine = next( diff --git a/services/api-server/src/api_server/normalized_object_models.py b/services/api-server/src/api_server/normalized_object_models.py new file mode 100644 index 0000000..3d6e9cf --- /dev/null +++ b/services/api-server/src/api_server/normalized_object_models.py @@ -0,0 +1,49 @@ +from __future__ import annotations + +from pydantic import BaseModel, Field + +from one_c_normalizer import MetadataObject + + +class NormalizedObjectDetail(BaseModel): + project_id: str | None = None + group_name: str + object: MetadataObject + + +class ModuleRoutineResponse(BaseModel): + name: str + kind: str + line_start: int | None = None + line_end: int | None = None + export: bool = False + calls_count: int = 0 + queries_count: int = 0 + writes_count: int = 0 + calls: list[str] = Field(default_factory=list) + queries: list[str] = Field(default_factory=list) + writes: list[str] = Field(default_factory=list) + impact_level: str = "LOW" + impact_reasons: list[str] = Field(default_factory=list) + + +class ModuleSourceResponse(BaseModel): + name: str + qualified_name: str + module_role: str = "MODULE" + owner_qualified_name: str | None = None + owner_kind: str | None = None + object_part: str | None = None + form_name: str | None = None + form_qualified_name: str | None = None + source_path: str + source_text: str + routines_count: int = 0 + routines: list[ModuleRoutineResponse] = Field(default_factory=list) + + +class BslCompletionItemResponse(BaseModel): + label: str + kind: str = "VALUE" + detail: str | None = None + insert_text: str | None = None diff --git a/services/api-server/src/api_server/normalized_object_service.py b/services/api-server/src/api_server/normalized_object_service.py new file mode 100644 index 0000000..dc01505 --- /dev/null +++ b/services/api-server/src/api_server/normalized_object_service.py @@ -0,0 +1,190 @@ +from __future__ import annotations + +import re + +from api_server.normalized_object_models import ( + BslCompletionItemResponse, + ModuleRoutineResponse, + ModuleSourceResponse, + NormalizedObjectDetail, +) +from api_server.normalized_project_service import normalized_all_groups +from one_c_normalizer import NormalizedProject + + +def normalized_object_detail(normalized: NormalizedProject, qualified_name: str) -> NormalizedObjectDetail | None: + for group in normalized.configuration.groups: + for item in group.objects: + if item.qualified_name == qualified_name: + return NormalizedObjectDetail( + project_id=normalized.project_id, + group_name=group.name, + object=item, + ) + return None + + +def normalized_module_sources_for_object(normalized: NormalizedProject, qualified_name: str) -> list[ModuleSourceResponse]: + normalized_query = qualified_name.strip().casefold() + if not normalized_query: + return [] + selected_module = None + selected_owner = None + selected_object = None + for group in normalized_all_groups(normalized): + for item in group.objects: + if item.qualified_name.casefold() == normalized_query or item.name.casefold() == normalized_query: + selected_object = item + break + for module in item.modules: + module_keys = { + str(module.qualified_name or "").casefold(), + str(module.name or "").casefold(), + str(module.source_path or "").casefold(), + } + if normalized_query in module_keys: + selected_module = module + selected_owner = item + break + if selected_object is not None or selected_module is not None: + break + if selected_object is not None or selected_module is not None: + break + if selected_module is not None: + return [_normalized_module_source_response(selected_module, selected_owner)] + if selected_object is None: + return [] + return sorted( + [_normalized_module_source_response(module, selected_object) for module in selected_object.modules], + key=lambda item: (item.module_role, item.name), + ) + + +def normalized_bsl_completion_items( + normalized: NormalizedProject, + receiver: str | None, + qualified_name: str | None, +) -> list[BslCompletionItemResponse]: + receiver_key = (receiver or "").strip().casefold() + qualified_key = (qualified_name or "").strip().casefold() + items: list[BslCompletionItemResponse] = [] + for group in normalized_all_groups(normalized): + for metadata_object in group.objects: + object_names = { + metadata_object.name.casefold(), + metadata_object.qualified_name.casefold(), + } + if receiver_key and receiver_key in object_names: + items.extend( + BslCompletionItemResponse( + label=part.name, + kind=_completion_kind_for_part(part.kind), + detail=f"{metadata_object.qualified_name}: {part.kind}", + insert_text=part.name, + ) + for part in [ + *metadata_object.attributes, + *metadata_object.resources, + *metadata_object.dimensions, + *metadata_object.tabular_sections, + *metadata_object.commands, + ] + ) + for module in metadata_object.modules: + module_names = { + str(module.name or "").casefold(), + str(module.qualified_name or "").casefold(), + str(module.source_path or "").casefold(), + f"{metadata_object.name}.{module.name}".casefold(), + f"{metadata_object.qualified_name}.{module.name}".casefold(), + } + if receiver_key and receiver_key not in module_names and receiver_key not in object_names: + continue + if not receiver_key and qualified_key and qualified_key not in module_names and qualified_key not in object_names: + continue + source_text = str((module.attributes or {}).get("source_text", "")) + for routine in normalized_module_routines(source_text): + if receiver_key and not routine.export: + continue + items.append( + BslCompletionItemResponse( + label=routine.name, + kind="FUNCTION" if routine.kind == "FUNCTION" else "PROCEDURE", + detail=f"{module.qualified_name or module.name}{' · Export' if routine.export else ''}", + insert_text=f"{routine.name}()", + ) + ) + return items + + +def _completion_kind_for_part(kind: str) -> str: + normalized = kind.upper() + if normalized in {"ATTRIBUTE", "RESOURCE", "DIMENSION", "FIELD"}: + return "PROPERTY" + if normalized in {"COMMAND", "METHOD", "OPERATION"}: + return "METHOD" + if normalized in {"TABULAR_SECTION", "TABLE"}: + return "COLLECTION" + return "VALUE" + + +def _normalized_module_source_response(module, owner) -> ModuleSourceResponse: + attributes = module.attributes or {} + source_text = str(attributes.get("source_text", "")) + routines = normalized_module_routines(source_text) + module_role = str(module.module_kind or attributes.get("module_role") or "MODULE") + return ModuleSourceResponse( + name=module.name, + qualified_name=module.qualified_name or module.name, + module_role=module_role, + owner_qualified_name=str(attributes.get("owner_qualified_name") or getattr(owner, "qualified_name", "") or "") or None, + owner_kind=str(attributes.get("owner_kind") or getattr(owner, "object_kind", "") or "") or None, + object_part=str( + attributes.get("object_part") + or module_object_part_for_response(module_role, str(attributes.get("form_name") or "")) + ), + form_name=str(attributes.get("form_name") or "") or None, + form_qualified_name=str(attributes.get("form_qualified_name") or "") or None, + source_path=module.source_path or "", + source_text=source_text, + routines_count=len(routines), + routines=routines, + ) + + +def module_object_part_for_response(module_role: str, form_name: str = "") -> str: + return { + "OBJECT_MODULE": "object.module", + "MANAGER_MODULE": "object.manager", + "RECORD_SET_MODULE": "object.record_set", + "FORM_MODULE": f"form.{form_name}.module" if form_name else "form.module", + "MODULE": "module", + }.get(module_role, "module") + + +def normalized_module_routines(source_text: str) -> list[ModuleRoutineResponse]: + if not source_text: + return [] + declarations: list[tuple[int, re.Match[str]]] = [] + pattern = re.compile( + r"^\s*(Процедура|Функция|Procedure|Function)\s+([A-Za-zА-Яа-яЁё_][\wА-Яа-яЁё]*)\s*\(([^)]*)\)\s*(.*)$", + re.IGNORECASE | re.MULTILINE, + ) + for match in pattern.finditer(source_text): + line_start = source_text.count("\n", 0, match.start()) + 1 + declarations.append((line_start, match)) + routines: list[ModuleRoutineResponse] = [] + for index, (line_start, match) in enumerate(declarations): + line_end = declarations[index + 1][0] - 1 if index + 1 < len(declarations) else len(source_text.splitlines()) + kind_label = match.group(1).casefold() + tail = match.group(4) or "" + routines.append( + ModuleRoutineResponse( + name=match.group(2), + kind="FUNCTION" if kind_label in {"функция", "function"} else "PROCEDURE", + line_start=line_start, + line_end=line_end, + export=bool(re.search(r"\b(Экспорт|Export)\b", tail, re.IGNORECASE)), + ) + ) + return routines