From d610a9ad6c19de1d182133efb5f60a62bc812786 Mon Sep 17 00:00:00 2001 From: Mikhail Date: Sun, 17 May 2026 21:54:55 +0300 Subject: [PATCH] Extract snapshot module navigation service --- services/api-server/src/api_server/main.py | 304 +---------------- .../src/api_server/snapshot_module_service.py | 309 ++++++++++++++++++ 2 files changed, 314 insertions(+), 299 deletions(-) create mode 100644 services/api-server/src/api_server/snapshot_module_service.py diff --git a/services/api-server/src/api_server/main.py b/services/api-server/src/api_server/main.py index a989f28..603d2ae 100644 --- a/services/api-server/src/api_server/main.py +++ b/services/api-server/src/api_server/main.py @@ -127,16 +127,18 @@ from api_server.normalized_project_service import ( ) from api_server.normalized_object_models import ( BslCompletionItemResponse, - ModuleRoutineResponse, ModuleSourceResponse, NormalizedObjectDetail, ) from api_server.normalized_object_service import ( - module_object_part_for_response as _module_object_part_for_response, normalized_bsl_completion_items as _normalized_bsl_completion_items, normalized_module_sources_for_object as _normalized_module_sources_for_object, normalized_object_detail as _normalized_object_detail, ) +from api_server.snapshot_module_service import ( + module_sources_for_object as _snapshot_module_sources_for_object, + snapshot_bsl_completion_items as _snapshot_bsl_completion_items, +) from impact_engine import object_impact, routine_impact from incremental_indexer import rebuild_changed_file from integration_topology import IntegrationKind, build_integration_topology @@ -3050,7 +3052,7 @@ async def get_normalized_object_modules(project_id: str, qualified_name: str) -> if modules: return modules snapshot = _project_snapshot_or_404(project_id) - return _module_sources_for_object(snapshot, qualified_name) + return _snapshot_module_sources_for_object(snapshot, qualified_name, _MODULE_OWNER_NODE_KINDS) @app.get("/projects/{project_id}/bsl/completions", response_model=list[BslCompletionItemResponse]) @@ -7484,302 +7486,6 @@ def _import_quality_response(project_id: str) -> ImportQualityResponse: ) -def _snapshot_bsl_completion_items(snapshot: SirSnapshot, receiver: str | None) -> list[BslCompletionItemResponse]: - receiver_key = (receiver or "").strip().casefold() - nodes_by_lineage = {node.lineage_id: node for node in snapshot.nodes} - module_lineages: set[str] = set() - if receiver_key: - for node in snapshot.nodes: - if node.kind == NodeKind.MODULE and ( - node.name.casefold() == receiver_key - or node.qualified_name.casefold() == receiver_key - or str(node.attributes.get("source_path", "")).casefold() == receiver_key - ): - module_lineages.add(node.lineage_id) - else: - module_lineages = {node.lineage_id for node in snapshot.nodes if node.kind == NodeKind.MODULE} - items: list[BslCompletionItemResponse] = [] - for module_lineage in module_lineages: - module = nodes_by_lineage.get(module_lineage) - if module is None: - continue - for routine in _module_routines(snapshot, module_lineage, nodes_by_lineage): - if receiver_key and not routine.export: - continue - items.append( - BslCompletionItemResponse( - label=routine.name, - kind="FUNCTION" if routine.kind == "FUNCTION" else "PROCEDURE", - detail=f"{module.qualified_name}{' · Export' if routine.export else ''}", - insert_text=f"{routine.name}()", - ) - ) - return items - - -def _module_sources_for_object(snapshot: SirSnapshot, qualified_name: str) -> list[ModuleSourceResponse]: - nodes_by_lineage = {node.lineage_id: node for node in snapshot.nodes} - selected_routine = next( - ( - node - for node in snapshot.nodes - if node.kind in {NodeKind.PROCEDURE, NodeKind.FUNCTION} - and (node.qualified_name == qualified_name or node.name == qualified_name) - ), - None, - ) - if selected_routine is not None: - module_edge = next( - ( - edge - for edge in snapshot.edges - if edge.kind == EdgeKind.DECLARES and edge.target_lineage == selected_routine.lineage_id - ), - None, - ) - if module_edge is not None: - module = nodes_by_lineage.get(module_edge.source_lineage) - if module is not None: - return _module_source_response(snapshot, module, nodes_by_lineage) - - selected_command = next( - ( - node - for node in snapshot.nodes - if node.kind in {NodeKind.COMMAND, NodeKind.FORM, NodeKind.FORM_ELEMENT} - and (node.qualified_name == qualified_name or node.name == qualified_name) - ), - None, - ) - if selected_command is not None: - handler_edge = next( - ( - edge - for edge in snapshot.edges - if edge.kind == EdgeKind.HANDLES and edge.source_lineage == selected_command.lineage_id - ), - None, - ) - if handler_edge is not None: - routine = nodes_by_lineage.get(handler_edge.target_lineage) - module_edge = next( - ( - edge - for edge in snapshot.edges - if edge.kind == EdgeKind.DECLARES and routine is not None and edge.target_lineage == routine.lineage_id - ), - None, - ) - if module_edge is not None: - module = nodes_by_lineage.get(module_edge.source_lineage) - if module is not None: - return _module_source_response(snapshot, module, nodes_by_lineage) - - selected_module = next( - ( - node - for node in snapshot.nodes - if node.kind == NodeKind.MODULE - and ( - node.qualified_name == qualified_name - or node.name == qualified_name - or node.source_ref.source_path == qualified_name - or _module_metadata_qualified_name(snapshot, node, nodes_by_lineage) == qualified_name - ) - ), - None, - ) - if selected_module is not None: - return _module_source_response(snapshot, selected_module, nodes_by_lineage) - owner = next( - ( - node - for node in snapshot.nodes - if node.qualified_name == qualified_name and node.kind in _MODULE_OWNER_NODE_KINDS - ), - None, - ) - if owner is None: - return [] - module_edges = [ - edge - for edge in snapshot.edges - if edge.kind == EdgeKind.CONTAINS - and edge.source_lineage == owner.lineage_id - and edge.attributes.get("link_type") == "METADATA_MODULE" - ] - modules: list[ModuleSourceResponse] = [] - for edge in module_edges: - module = nodes_by_lineage.get(edge.target_lineage) - if module is None or module.kind != NodeKind.MODULE: - continue - routines = _module_routines(snapshot, module.lineage_id, nodes_by_lineage) - module_role = str(edge.attributes.get("module_role") or module.attributes.get("module_role") or "MODULE") - modules.append( - ModuleSourceResponse( - name=module.name, - qualified_name=module.qualified_name, - module_role=module_role, - owner_qualified_name=str(module.attributes.get("owner_qualified_name") or owner.qualified_name or "") or None, - owner_kind=str(module.attributes.get("owner_kind") or owner.kind.value or "") or None, - object_part=str(edge.attributes.get("object_part") or module.attributes.get("object_part") or _module_object_part_for_response(module_role, str(module.attributes.get("form_name") or ""))), - form_name=str(edge.attributes.get("form_name") or module.attributes.get("form_name") or "") or None, - form_qualified_name=str(module.attributes.get("form_qualified_name") or "") or None, - source_path=module.source_ref.source_path, - source_text=str(module.attributes.get("source_text", "")), - routines_count=len(routines), - routines=routines, - ) - ) - return sorted(modules, key=lambda item: (item.module_role, item.name)) - - -def _module_source_response( - snapshot: SirSnapshot, - module, - nodes_by_lineage: dict[str, object], -) -> list[ModuleSourceResponse]: - owner_edge = next( - ( - edge - for edge in snapshot.edges - if edge.kind == EdgeKind.CONTAINS - and edge.target_lineage == module.lineage_id - and edge.attributes.get("link_type") == "METADATA_MODULE" - ), - None, - ) - routines = _module_routines(snapshot, module.lineage_id, nodes_by_lineage) - owner = nodes_by_lineage.get(owner_edge.source_lineage) if owner_edge else None - module_role = str((owner_edge.attributes.get("module_role") if owner_edge else None) or module.attributes.get("module_role") or "MODULE") - return [ - ModuleSourceResponse( - name=module.name, - qualified_name=module.qualified_name, - module_role=module_role, - owner_qualified_name=str(module.attributes.get("owner_qualified_name") or getattr(owner, "qualified_name", "") or "") or None, - owner_kind=str(module.attributes.get("owner_kind") or getattr(getattr(owner, "kind", None), "value", "") or "") or None, - object_part=str( - (owner_edge.attributes.get("object_part") if owner_edge else None) - or module.attributes.get("object_part") - or _module_object_part_for_response(module_role, str(module.attributes.get("form_name") or "")) - ), - form_name=str((owner_edge.attributes.get("form_name") if owner_edge else None) or module.attributes.get("form_name") or "") or None, - form_qualified_name=str(module.attributes.get("form_qualified_name") or "") or None, - source_path=module.source_ref.source_path, - source_text=str(module.attributes.get("source_text", "")), - routines_count=len(routines), - routines=routines, - ) - ] - - -def _module_metadata_qualified_name( - snapshot: SirSnapshot, - module, - nodes_by_lineage: dict[str, object], -) -> str | None: - owner_edge = next( - ( - edge - for edge in snapshot.edges - if edge.kind == EdgeKind.CONTAINS - and edge.target_lineage == module.lineage_id - and edge.attributes.get("link_type") == "METADATA_MODULE" - ), - None, - ) - if owner_edge is None: - return None - owner = nodes_by_lineage.get(owner_edge.source_lineage) - if owner is None: - return None - role = str(owner_edge.attributes.get("module_role", "MODULE")) - form_name = str(owner_edge.attributes.get("form_name", "")) - suffix = { - "OBJECT_MODULE": "МодульОбъекта", - "MANAGER_MODULE": "МодульМенеджера", - "RECORD_SET_MODULE": "МодульНабораЗаписей", - "FORM_MODULE": f"Форма.{form_name}.Модуль" if form_name else "МодульФормы", - "MODULE": "Модуль", - }.get(role, module.name) - return f"{owner.qualified_name}.{suffix}" - - -def _module_routines( - snapshot: SirSnapshot, - module_lineage: str, - nodes_by_lineage: dict[str, object], -) -> list[ModuleRoutineResponse]: - routines: list[ModuleRoutineResponse] = [] - for edge in snapshot.edges: - if edge.kind != EdgeKind.DECLARES or edge.source_lineage != module_lineage: - continue - routine = nodes_by_lineage.get(edge.target_lineage) - if routine is None or getattr(routine, "kind", None) not in {NodeKind.PROCEDURE, NodeKind.FUNCTION}: - continue - calls = _routine_relation_values(snapshot, nodes_by_lineage, routine.lineage_id, EdgeKind.CALLS) - queries = _routine_relation_values(snapshot, nodes_by_lineage, routine.lineage_id, EdgeKind.OWNS_QUERY) - writes = _routine_relation_values(snapshot, nodes_by_lineage, routine.lineage_id, EdgeKind.WRITES) - impact_level, impact_reasons = _routine_impact_markers(calls, queries, writes) - routines.append( - ModuleRoutineResponse( - name=routine.name, - kind=routine.kind.value, - line_start=routine.source_ref.line_start, - line_end=routine.source_ref.line_end, - export=bool(routine.attributes.get("export", False)), - calls_count=len(calls), - queries_count=len(queries), - writes_count=len(writes), - calls=calls, - queries=queries, - writes=writes, - impact_level=impact_level, - impact_reasons=impact_reasons, - ) - ) - return sorted(routines, key=lambda item: item.line_start or 0) - - -def _routine_impact_markers(calls: list[str], queries: list[str], writes: list[str]) -> tuple[str, list[str]]: - reasons: list[str] = [] - if writes: - reasons.append("writes data") - if queries: - reasons.append("reads query tables") - if len(calls) >= 3: - reasons.append("fan-out calls") - if writes and (queries or len(calls) >= 2): - level = "HIGH" - elif writes or queries or len(calls) >= 3: - level = "MEDIUM" - else: - level = "LOW" - return level, reasons - - -def _routine_relation_values( - snapshot: SirSnapshot, - nodes_by_lineage: dict[str, object], - routine_lineage: str, - relation: EdgeKind, -) -> list[str]: - values: list[str] = [] - for edge in snapshot.edges: - if edge.kind != relation or edge.source_lineage != routine_lineage: - continue - target = nodes_by_lineage.get(edge.target_lineage) - if target is None: - continue - if relation == EdgeKind.OWNS_QUERY: - query_text = str(target.attributes.get("query_text", "")).strip() - values.append(query_text or target.name) - else: - values.append(target.qualified_name or target.name) - return values - - def _import_summary_from_snapshot( *, project_id: str, diff --git a/services/api-server/src/api_server/snapshot_module_service.py b/services/api-server/src/api_server/snapshot_module_service.py new file mode 100644 index 0000000..e538f7e --- /dev/null +++ b/services/api-server/src/api_server/snapshot_module_service.py @@ -0,0 +1,309 @@ +from __future__ import annotations + +from api_server.normalized_object_models import BslCompletionItemResponse, ModuleRoutineResponse, ModuleSourceResponse +from api_server.normalized_object_service import module_object_part_for_response +from sir import EdgeKind, NodeKind, SirSnapshot + + +def snapshot_bsl_completion_items(snapshot: SirSnapshot, receiver: str | None) -> list[BslCompletionItemResponse]: + receiver_key = (receiver or "").strip().casefold() + nodes_by_lineage = {node.lineage_id: node for node in snapshot.nodes} + module_lineages: set[str] = set() + if receiver_key: + for node in snapshot.nodes: + if node.kind == NodeKind.MODULE and ( + node.name.casefold() == receiver_key + or node.qualified_name.casefold() == receiver_key + or str(node.attributes.get("source_path", "")).casefold() == receiver_key + ): + module_lineages.add(node.lineage_id) + else: + module_lineages = {node.lineage_id for node in snapshot.nodes if node.kind == NodeKind.MODULE} + items: list[BslCompletionItemResponse] = [] + for module_lineage in module_lineages: + module = nodes_by_lineage.get(module_lineage) + if module is None: + continue + for routine in module_routines(snapshot, module_lineage, nodes_by_lineage): + if receiver_key and not routine.export: + continue + items.append( + BslCompletionItemResponse( + label=routine.name, + kind="FUNCTION" if routine.kind == "FUNCTION" else "PROCEDURE", + detail=f"{module.qualified_name}{' · Export' if routine.export else ''}", + insert_text=f"{routine.name}()", + ) + ) + return items + + +def module_sources_for_object( + snapshot: SirSnapshot, + qualified_name: str, + owner_node_kinds: set[NodeKind], +) -> list[ModuleSourceResponse]: + nodes_by_lineage = {node.lineage_id: node for node in snapshot.nodes} + selected_routine = next( + ( + node + for node in snapshot.nodes + if node.kind in {NodeKind.PROCEDURE, NodeKind.FUNCTION} + and (node.qualified_name == qualified_name or node.name == qualified_name) + ), + None, + ) + if selected_routine is not None: + module_edge = next( + ( + edge + for edge in snapshot.edges + if edge.kind == EdgeKind.DECLARES and edge.target_lineage == selected_routine.lineage_id + ), + None, + ) + if module_edge is not None: + module = nodes_by_lineage.get(module_edge.source_lineage) + if module is not None: + return module_source_response(snapshot, module, nodes_by_lineage) + + selected_command = next( + ( + node + for node in snapshot.nodes + if node.kind in {NodeKind.COMMAND, NodeKind.FORM, NodeKind.FORM_ELEMENT} + and (node.qualified_name == qualified_name or node.name == qualified_name) + ), + None, + ) + if selected_command is not None: + handler_edge = next( + ( + edge + for edge in snapshot.edges + if edge.kind == EdgeKind.HANDLES and edge.source_lineage == selected_command.lineage_id + ), + None, + ) + if handler_edge is not None: + routine = nodes_by_lineage.get(handler_edge.target_lineage) + module_edge = next( + ( + edge + for edge in snapshot.edges + if edge.kind == EdgeKind.DECLARES and routine is not None and edge.target_lineage == routine.lineage_id + ), + None, + ) + if module_edge is not None: + module = nodes_by_lineage.get(module_edge.source_lineage) + if module is not None: + return module_source_response(snapshot, module, nodes_by_lineage) + + selected_module = next( + ( + node + for node in snapshot.nodes + if node.kind == NodeKind.MODULE + and ( + node.qualified_name == qualified_name + or node.name == qualified_name + or node.source_ref.source_path == qualified_name + or module_metadata_qualified_name(snapshot, node, nodes_by_lineage) == qualified_name + ) + ), + None, + ) + if selected_module is not None: + return module_source_response(snapshot, selected_module, nodes_by_lineage) + owner = next( + ( + node + for node in snapshot.nodes + if node.qualified_name == qualified_name and node.kind in owner_node_kinds + ), + None, + ) + if owner is None: + return [] + module_edges = [ + edge + for edge in snapshot.edges + if edge.kind == EdgeKind.CONTAINS + and edge.source_lineage == owner.lineage_id + and edge.attributes.get("link_type") == "METADATA_MODULE" + ] + modules: list[ModuleSourceResponse] = [] + for edge in module_edges: + module = nodes_by_lineage.get(edge.target_lineage) + if module is None or module.kind != NodeKind.MODULE: + continue + routines = module_routines(snapshot, module.lineage_id, nodes_by_lineage) + module_role = str(edge.attributes.get("module_role") or module.attributes.get("module_role") or "MODULE") + modules.append( + ModuleSourceResponse( + name=module.name, + qualified_name=module.qualified_name, + module_role=module_role, + owner_qualified_name=str(module.attributes.get("owner_qualified_name") or owner.qualified_name or "") or None, + owner_kind=str(module.attributes.get("owner_kind") or owner.kind.value or "") or None, + object_part=str( + edge.attributes.get("object_part") + or module.attributes.get("object_part") + or module_object_part_for_response(module_role, str(module.attributes.get("form_name") or "")) + ), + form_name=str(edge.attributes.get("form_name") or module.attributes.get("form_name") or "") or None, + form_qualified_name=str(module.attributes.get("form_qualified_name") or "") or None, + source_path=module.source_ref.source_path, + source_text=str(module.attributes.get("source_text", "")), + routines_count=len(routines), + routines=routines, + ) + ) + return sorted(modules, key=lambda item: (item.module_role, item.name)) + + +def module_source_response( + snapshot: SirSnapshot, + module, + nodes_by_lineage: dict[str, object], +) -> list[ModuleSourceResponse]: + owner_edge = next( + ( + edge + for edge in snapshot.edges + if edge.kind == EdgeKind.CONTAINS + and edge.target_lineage == module.lineage_id + and edge.attributes.get("link_type") == "METADATA_MODULE" + ), + None, + ) + routines = module_routines(snapshot, module.lineage_id, nodes_by_lineage) + owner = nodes_by_lineage.get(owner_edge.source_lineage) if owner_edge else None + module_role = str((owner_edge.attributes.get("module_role") if owner_edge else None) or module.attributes.get("module_role") or "MODULE") + return [ + ModuleSourceResponse( + name=module.name, + qualified_name=module.qualified_name, + module_role=module_role, + owner_qualified_name=str(module.attributes.get("owner_qualified_name") or getattr(owner, "qualified_name", "") or "") or None, + owner_kind=str(module.attributes.get("owner_kind") or getattr(getattr(owner, "kind", None), "value", "") or "") or None, + object_part=str( + (owner_edge.attributes.get("object_part") if owner_edge else None) + or module.attributes.get("object_part") + or module_object_part_for_response(module_role, str(module.attributes.get("form_name") or "")) + ), + form_name=str((owner_edge.attributes.get("form_name") if owner_edge else None) or module.attributes.get("form_name") or "") or None, + form_qualified_name=str(module.attributes.get("form_qualified_name") or "") or None, + source_path=module.source_ref.source_path, + source_text=str(module.attributes.get("source_text", "")), + routines_count=len(routines), + routines=routines, + ) + ] + + +def module_metadata_qualified_name( + snapshot: SirSnapshot, + module, + nodes_by_lineage: dict[str, object], +) -> str | None: + owner_edge = next( + ( + edge + for edge in snapshot.edges + if edge.kind == EdgeKind.CONTAINS + and edge.target_lineage == module.lineage_id + and edge.attributes.get("link_type") == "METADATA_MODULE" + ), + None, + ) + if owner_edge is None: + return None + owner = nodes_by_lineage.get(owner_edge.source_lineage) + if owner is None: + return None + role = str(owner_edge.attributes.get("module_role", "MODULE")) + form_name = str(owner_edge.attributes.get("form_name", "")) + suffix = { + "OBJECT_MODULE": "МодульОбъекта", + "MANAGER_MODULE": "МодульМенеджера", + "RECORD_SET_MODULE": "МодульНабораЗаписей", + "FORM_MODULE": f"Форма.{form_name}.Модуль" if form_name else "МодульФормы", + "MODULE": "Модуль", + }.get(role, module.name) + return f"{owner.qualified_name}.{suffix}" + + +def module_routines( + snapshot: SirSnapshot, + module_lineage: str, + nodes_by_lineage: dict[str, object], +) -> list[ModuleRoutineResponse]: + routines: list[ModuleRoutineResponse] = [] + for edge in snapshot.edges: + if edge.kind != EdgeKind.DECLARES or edge.source_lineage != module_lineage: + continue + routine = nodes_by_lineage.get(edge.target_lineage) + if routine is None or getattr(routine, "kind", None) not in {NodeKind.PROCEDURE, NodeKind.FUNCTION}: + continue + calls = _routine_relation_values(snapshot, nodes_by_lineage, routine.lineage_id, EdgeKind.CALLS) + queries = _routine_relation_values(snapshot, nodes_by_lineage, routine.lineage_id, EdgeKind.OWNS_QUERY) + writes = _routine_relation_values(snapshot, nodes_by_lineage, routine.lineage_id, EdgeKind.WRITES) + impact_level, impact_reasons = _routine_impact_markers(calls, queries, writes) + routines.append( + ModuleRoutineResponse( + name=routine.name, + kind=routine.kind.value, + line_start=routine.source_ref.line_start, + line_end=routine.source_ref.line_end, + export=bool(routine.attributes.get("export", False)), + calls_count=len(calls), + queries_count=len(queries), + writes_count=len(writes), + calls=calls, + queries=queries, + writes=writes, + impact_level=impact_level, + impact_reasons=impact_reasons, + ) + ) + return sorted(routines, key=lambda item: item.line_start or 0) + + +def _routine_impact_markers(calls: list[str], queries: list[str], writes: list[str]) -> tuple[str, list[str]]: + reasons: list[str] = [] + if writes: + reasons.append("writes data") + if queries: + reasons.append("reads query tables") + if len(calls) >= 3: + reasons.append("fan-out calls") + if writes and (queries or len(calls) >= 2): + level = "HIGH" + elif writes or queries or len(calls) >= 3: + level = "MEDIUM" + else: + level = "LOW" + return level, reasons + + +def _routine_relation_values( + snapshot: SirSnapshot, + nodes_by_lineage: dict[str, object], + routine_lineage: str, + relation: EdgeKind, +) -> list[str]: + values: list[str] = [] + for edge in snapshot.edges: + if edge.kind != relation or edge.source_lineage != routine_lineage: + continue + target = nodes_by_lineage.get(edge.target_lineage) + if target is None: + continue + if relation == EdgeKind.OWNS_QUERY: + query_text = str(target.attributes.get("query_text", "")).strip() + values.append(query_text or target.name) + else: + values.append(target.qualified_name or target.name) + return values