Extract snapshot module navigation service
This commit is contained in:
@@ -127,16 +127,18 @@ from api_server.normalized_project_service import (
|
||||
)
|
||||
from api_server.normalized_object_models import (
|
||||
BslCompletionItemResponse,
|
||||
ModuleRoutineResponse,
|
||||
ModuleSourceResponse,
|
||||
NormalizedObjectDetail,
|
||||
)
|
||||
from api_server.normalized_object_service import (
|
||||
module_object_part_for_response as _module_object_part_for_response,
|
||||
normalized_bsl_completion_items as _normalized_bsl_completion_items,
|
||||
normalized_module_sources_for_object as _normalized_module_sources_for_object,
|
||||
normalized_object_detail as _normalized_object_detail,
|
||||
)
|
||||
from api_server.snapshot_module_service import (
|
||||
module_sources_for_object as _snapshot_module_sources_for_object,
|
||||
snapshot_bsl_completion_items as _snapshot_bsl_completion_items,
|
||||
)
|
||||
from impact_engine import object_impact, routine_impact
|
||||
from incremental_indexer import rebuild_changed_file
|
||||
from integration_topology import IntegrationKind, build_integration_topology
|
||||
@@ -3050,7 +3052,7 @@ async def get_normalized_object_modules(project_id: str, qualified_name: str) ->
|
||||
if modules:
|
||||
return modules
|
||||
snapshot = _project_snapshot_or_404(project_id)
|
||||
return _module_sources_for_object(snapshot, qualified_name)
|
||||
return _snapshot_module_sources_for_object(snapshot, qualified_name, _MODULE_OWNER_NODE_KINDS)
|
||||
|
||||
|
||||
@app.get("/projects/{project_id}/bsl/completions", response_model=list[BslCompletionItemResponse])
|
||||
@@ -7484,302 +7486,6 @@ def _import_quality_response(project_id: str) -> ImportQualityResponse:
|
||||
)
|
||||
|
||||
|
||||
def _snapshot_bsl_completion_items(snapshot: SirSnapshot, receiver: str | None) -> list[BslCompletionItemResponse]:
|
||||
receiver_key = (receiver or "").strip().casefold()
|
||||
nodes_by_lineage = {node.lineage_id: node for node in snapshot.nodes}
|
||||
module_lineages: set[str] = set()
|
||||
if receiver_key:
|
||||
for node in snapshot.nodes:
|
||||
if node.kind == NodeKind.MODULE and (
|
||||
node.name.casefold() == receiver_key
|
||||
or node.qualified_name.casefold() == receiver_key
|
||||
or str(node.attributes.get("source_path", "")).casefold() == receiver_key
|
||||
):
|
||||
module_lineages.add(node.lineage_id)
|
||||
else:
|
||||
module_lineages = {node.lineage_id for node in snapshot.nodes if node.kind == NodeKind.MODULE}
|
||||
items: list[BslCompletionItemResponse] = []
|
||||
for module_lineage in module_lineages:
|
||||
module = nodes_by_lineage.get(module_lineage)
|
||||
if module is None:
|
||||
continue
|
||||
for routine in _module_routines(snapshot, module_lineage, nodes_by_lineage):
|
||||
if receiver_key and not routine.export:
|
||||
continue
|
||||
items.append(
|
||||
BslCompletionItemResponse(
|
||||
label=routine.name,
|
||||
kind="FUNCTION" if routine.kind == "FUNCTION" else "PROCEDURE",
|
||||
detail=f"{module.qualified_name}{' · Export' if routine.export else ''}",
|
||||
insert_text=f"{routine.name}()",
|
||||
)
|
||||
)
|
||||
return items
|
||||
|
||||
|
||||
def _module_sources_for_object(snapshot: SirSnapshot, qualified_name: str) -> list[ModuleSourceResponse]:
|
||||
nodes_by_lineage = {node.lineage_id: node for node in snapshot.nodes}
|
||||
selected_routine = next(
|
||||
(
|
||||
node
|
||||
for node in snapshot.nodes
|
||||
if node.kind in {NodeKind.PROCEDURE, NodeKind.FUNCTION}
|
||||
and (node.qualified_name == qualified_name or node.name == qualified_name)
|
||||
),
|
||||
None,
|
||||
)
|
||||
if selected_routine is not None:
|
||||
module_edge = next(
|
||||
(
|
||||
edge
|
||||
for edge in snapshot.edges
|
||||
if edge.kind == EdgeKind.DECLARES and edge.target_lineage == selected_routine.lineage_id
|
||||
),
|
||||
None,
|
||||
)
|
||||
if module_edge is not None:
|
||||
module = nodes_by_lineage.get(module_edge.source_lineage)
|
||||
if module is not None:
|
||||
return _module_source_response(snapshot, module, nodes_by_lineage)
|
||||
|
||||
selected_command = next(
|
||||
(
|
||||
node
|
||||
for node in snapshot.nodes
|
||||
if node.kind in {NodeKind.COMMAND, NodeKind.FORM, NodeKind.FORM_ELEMENT}
|
||||
and (node.qualified_name == qualified_name or node.name == qualified_name)
|
||||
),
|
||||
None,
|
||||
)
|
||||
if selected_command is not None:
|
||||
handler_edge = next(
|
||||
(
|
||||
edge
|
||||
for edge in snapshot.edges
|
||||
if edge.kind == EdgeKind.HANDLES and edge.source_lineage == selected_command.lineage_id
|
||||
),
|
||||
None,
|
||||
)
|
||||
if handler_edge is not None:
|
||||
routine = nodes_by_lineage.get(handler_edge.target_lineage)
|
||||
module_edge = next(
|
||||
(
|
||||
edge
|
||||
for edge in snapshot.edges
|
||||
if edge.kind == EdgeKind.DECLARES and routine is not None and edge.target_lineage == routine.lineage_id
|
||||
),
|
||||
None,
|
||||
)
|
||||
if module_edge is not None:
|
||||
module = nodes_by_lineage.get(module_edge.source_lineage)
|
||||
if module is not None:
|
||||
return _module_source_response(snapshot, module, nodes_by_lineage)
|
||||
|
||||
selected_module = next(
|
||||
(
|
||||
node
|
||||
for node in snapshot.nodes
|
||||
if node.kind == NodeKind.MODULE
|
||||
and (
|
||||
node.qualified_name == qualified_name
|
||||
or node.name == qualified_name
|
||||
or node.source_ref.source_path == qualified_name
|
||||
or _module_metadata_qualified_name(snapshot, node, nodes_by_lineage) == qualified_name
|
||||
)
|
||||
),
|
||||
None,
|
||||
)
|
||||
if selected_module is not None:
|
||||
return _module_source_response(snapshot, selected_module, nodes_by_lineage)
|
||||
owner = next(
|
||||
(
|
||||
node
|
||||
for node in snapshot.nodes
|
||||
if node.qualified_name == qualified_name and node.kind in _MODULE_OWNER_NODE_KINDS
|
||||
),
|
||||
None,
|
||||
)
|
||||
if owner is None:
|
||||
return []
|
||||
module_edges = [
|
||||
edge
|
||||
for edge in snapshot.edges
|
||||
if edge.kind == EdgeKind.CONTAINS
|
||||
and edge.source_lineage == owner.lineage_id
|
||||
and edge.attributes.get("link_type") == "METADATA_MODULE"
|
||||
]
|
||||
modules: list[ModuleSourceResponse] = []
|
||||
for edge in module_edges:
|
||||
module = nodes_by_lineage.get(edge.target_lineage)
|
||||
if module is None or module.kind != NodeKind.MODULE:
|
||||
continue
|
||||
routines = _module_routines(snapshot, module.lineage_id, nodes_by_lineage)
|
||||
module_role = str(edge.attributes.get("module_role") or module.attributes.get("module_role") or "MODULE")
|
||||
modules.append(
|
||||
ModuleSourceResponse(
|
||||
name=module.name,
|
||||
qualified_name=module.qualified_name,
|
||||
module_role=module_role,
|
||||
owner_qualified_name=str(module.attributes.get("owner_qualified_name") or owner.qualified_name or "") or None,
|
||||
owner_kind=str(module.attributes.get("owner_kind") or owner.kind.value or "") or None,
|
||||
object_part=str(edge.attributes.get("object_part") or module.attributes.get("object_part") or _module_object_part_for_response(module_role, str(module.attributes.get("form_name") or ""))),
|
||||
form_name=str(edge.attributes.get("form_name") or module.attributes.get("form_name") or "") or None,
|
||||
form_qualified_name=str(module.attributes.get("form_qualified_name") or "") or None,
|
||||
source_path=module.source_ref.source_path,
|
||||
source_text=str(module.attributes.get("source_text", "")),
|
||||
routines_count=len(routines),
|
||||
routines=routines,
|
||||
)
|
||||
)
|
||||
return sorted(modules, key=lambda item: (item.module_role, item.name))
|
||||
|
||||
|
||||
def _module_source_response(
|
||||
snapshot: SirSnapshot,
|
||||
module,
|
||||
nodes_by_lineage: dict[str, object],
|
||||
) -> list[ModuleSourceResponse]:
|
||||
owner_edge = next(
|
||||
(
|
||||
edge
|
||||
for edge in snapshot.edges
|
||||
if edge.kind == EdgeKind.CONTAINS
|
||||
and edge.target_lineage == module.lineage_id
|
||||
and edge.attributes.get("link_type") == "METADATA_MODULE"
|
||||
),
|
||||
None,
|
||||
)
|
||||
routines = _module_routines(snapshot, module.lineage_id, nodes_by_lineage)
|
||||
owner = nodes_by_lineage.get(owner_edge.source_lineage) if owner_edge else None
|
||||
module_role = str((owner_edge.attributes.get("module_role") if owner_edge else None) or module.attributes.get("module_role") or "MODULE")
|
||||
return [
|
||||
ModuleSourceResponse(
|
||||
name=module.name,
|
||||
qualified_name=module.qualified_name,
|
||||
module_role=module_role,
|
||||
owner_qualified_name=str(module.attributes.get("owner_qualified_name") or getattr(owner, "qualified_name", "") or "") or None,
|
||||
owner_kind=str(module.attributes.get("owner_kind") or getattr(getattr(owner, "kind", None), "value", "") or "") or None,
|
||||
object_part=str(
|
||||
(owner_edge.attributes.get("object_part") if owner_edge else None)
|
||||
or module.attributes.get("object_part")
|
||||
or _module_object_part_for_response(module_role, str(module.attributes.get("form_name") or ""))
|
||||
),
|
||||
form_name=str((owner_edge.attributes.get("form_name") if owner_edge else None) or module.attributes.get("form_name") or "") or None,
|
||||
form_qualified_name=str(module.attributes.get("form_qualified_name") or "") or None,
|
||||
source_path=module.source_ref.source_path,
|
||||
source_text=str(module.attributes.get("source_text", "")),
|
||||
routines_count=len(routines),
|
||||
routines=routines,
|
||||
)
|
||||
]
|
||||
|
||||
|
||||
def _module_metadata_qualified_name(
|
||||
snapshot: SirSnapshot,
|
||||
module,
|
||||
nodes_by_lineage: dict[str, object],
|
||||
) -> str | None:
|
||||
owner_edge = next(
|
||||
(
|
||||
edge
|
||||
for edge in snapshot.edges
|
||||
if edge.kind == EdgeKind.CONTAINS
|
||||
and edge.target_lineage == module.lineage_id
|
||||
and edge.attributes.get("link_type") == "METADATA_MODULE"
|
||||
),
|
||||
None,
|
||||
)
|
||||
if owner_edge is None:
|
||||
return None
|
||||
owner = nodes_by_lineage.get(owner_edge.source_lineage)
|
||||
if owner is None:
|
||||
return None
|
||||
role = str(owner_edge.attributes.get("module_role", "MODULE"))
|
||||
form_name = str(owner_edge.attributes.get("form_name", ""))
|
||||
suffix = {
|
||||
"OBJECT_MODULE": "МодульОбъекта",
|
||||
"MANAGER_MODULE": "МодульМенеджера",
|
||||
"RECORD_SET_MODULE": "МодульНабораЗаписей",
|
||||
"FORM_MODULE": f"Форма.{form_name}.Модуль" if form_name else "МодульФормы",
|
||||
"MODULE": "Модуль",
|
||||
}.get(role, module.name)
|
||||
return f"{owner.qualified_name}.{suffix}"
|
||||
|
||||
|
||||
def _module_routines(
|
||||
snapshot: SirSnapshot,
|
||||
module_lineage: str,
|
||||
nodes_by_lineage: dict[str, object],
|
||||
) -> list[ModuleRoutineResponse]:
|
||||
routines: list[ModuleRoutineResponse] = []
|
||||
for edge in snapshot.edges:
|
||||
if edge.kind != EdgeKind.DECLARES or edge.source_lineage != module_lineage:
|
||||
continue
|
||||
routine = nodes_by_lineage.get(edge.target_lineage)
|
||||
if routine is None or getattr(routine, "kind", None) not in {NodeKind.PROCEDURE, NodeKind.FUNCTION}:
|
||||
continue
|
||||
calls = _routine_relation_values(snapshot, nodes_by_lineage, routine.lineage_id, EdgeKind.CALLS)
|
||||
queries = _routine_relation_values(snapshot, nodes_by_lineage, routine.lineage_id, EdgeKind.OWNS_QUERY)
|
||||
writes = _routine_relation_values(snapshot, nodes_by_lineage, routine.lineage_id, EdgeKind.WRITES)
|
||||
impact_level, impact_reasons = _routine_impact_markers(calls, queries, writes)
|
||||
routines.append(
|
||||
ModuleRoutineResponse(
|
||||
name=routine.name,
|
||||
kind=routine.kind.value,
|
||||
line_start=routine.source_ref.line_start,
|
||||
line_end=routine.source_ref.line_end,
|
||||
export=bool(routine.attributes.get("export", False)),
|
||||
calls_count=len(calls),
|
||||
queries_count=len(queries),
|
||||
writes_count=len(writes),
|
||||
calls=calls,
|
||||
queries=queries,
|
||||
writes=writes,
|
||||
impact_level=impact_level,
|
||||
impact_reasons=impact_reasons,
|
||||
)
|
||||
)
|
||||
return sorted(routines, key=lambda item: item.line_start or 0)
|
||||
|
||||
|
||||
def _routine_impact_markers(calls: list[str], queries: list[str], writes: list[str]) -> tuple[str, list[str]]:
|
||||
reasons: list[str] = []
|
||||
if writes:
|
||||
reasons.append("writes data")
|
||||
if queries:
|
||||
reasons.append("reads query tables")
|
||||
if len(calls) >= 3:
|
||||
reasons.append("fan-out calls")
|
||||
if writes and (queries or len(calls) >= 2):
|
||||
level = "HIGH"
|
||||
elif writes or queries or len(calls) >= 3:
|
||||
level = "MEDIUM"
|
||||
else:
|
||||
level = "LOW"
|
||||
return level, reasons
|
||||
|
||||
|
||||
def _routine_relation_values(
|
||||
snapshot: SirSnapshot,
|
||||
nodes_by_lineage: dict[str, object],
|
||||
routine_lineage: str,
|
||||
relation: EdgeKind,
|
||||
) -> list[str]:
|
||||
values: list[str] = []
|
||||
for edge in snapshot.edges:
|
||||
if edge.kind != relation or edge.source_lineage != routine_lineage:
|
||||
continue
|
||||
target = nodes_by_lineage.get(edge.target_lineage)
|
||||
if target is None:
|
||||
continue
|
||||
if relation == EdgeKind.OWNS_QUERY:
|
||||
query_text = str(target.attributes.get("query_text", "")).strip()
|
||||
values.append(query_text or target.name)
|
||||
else:
|
||||
values.append(target.qualified_name or target.name)
|
||||
return values
|
||||
|
||||
|
||||
def _import_summary_from_snapshot(
|
||||
*,
|
||||
project_id: str,
|
||||
|
||||
@@ -0,0 +1,309 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from api_server.normalized_object_models import BslCompletionItemResponse, ModuleRoutineResponse, ModuleSourceResponse
|
||||
from api_server.normalized_object_service import module_object_part_for_response
|
||||
from sir import EdgeKind, NodeKind, SirSnapshot
|
||||
|
||||
|
||||
def snapshot_bsl_completion_items(snapshot: SirSnapshot, receiver: str | None) -> list[BslCompletionItemResponse]:
|
||||
receiver_key = (receiver or "").strip().casefold()
|
||||
nodes_by_lineage = {node.lineage_id: node for node in snapshot.nodes}
|
||||
module_lineages: set[str] = set()
|
||||
if receiver_key:
|
||||
for node in snapshot.nodes:
|
||||
if node.kind == NodeKind.MODULE and (
|
||||
node.name.casefold() == receiver_key
|
||||
or node.qualified_name.casefold() == receiver_key
|
||||
or str(node.attributes.get("source_path", "")).casefold() == receiver_key
|
||||
):
|
||||
module_lineages.add(node.lineage_id)
|
||||
else:
|
||||
module_lineages = {node.lineage_id for node in snapshot.nodes if node.kind == NodeKind.MODULE}
|
||||
items: list[BslCompletionItemResponse] = []
|
||||
for module_lineage in module_lineages:
|
||||
module = nodes_by_lineage.get(module_lineage)
|
||||
if module is None:
|
||||
continue
|
||||
for routine in module_routines(snapshot, module_lineage, nodes_by_lineage):
|
||||
if receiver_key and not routine.export:
|
||||
continue
|
||||
items.append(
|
||||
BslCompletionItemResponse(
|
||||
label=routine.name,
|
||||
kind="FUNCTION" if routine.kind == "FUNCTION" else "PROCEDURE",
|
||||
detail=f"{module.qualified_name}{' · Export' if routine.export else ''}",
|
||||
insert_text=f"{routine.name}()",
|
||||
)
|
||||
)
|
||||
return items
|
||||
|
||||
|
||||
def module_sources_for_object(
|
||||
snapshot: SirSnapshot,
|
||||
qualified_name: str,
|
||||
owner_node_kinds: set[NodeKind],
|
||||
) -> list[ModuleSourceResponse]:
|
||||
nodes_by_lineage = {node.lineage_id: node for node in snapshot.nodes}
|
||||
selected_routine = next(
|
||||
(
|
||||
node
|
||||
for node in snapshot.nodes
|
||||
if node.kind in {NodeKind.PROCEDURE, NodeKind.FUNCTION}
|
||||
and (node.qualified_name == qualified_name or node.name == qualified_name)
|
||||
),
|
||||
None,
|
||||
)
|
||||
if selected_routine is not None:
|
||||
module_edge = next(
|
||||
(
|
||||
edge
|
||||
for edge in snapshot.edges
|
||||
if edge.kind == EdgeKind.DECLARES and edge.target_lineage == selected_routine.lineage_id
|
||||
),
|
||||
None,
|
||||
)
|
||||
if module_edge is not None:
|
||||
module = nodes_by_lineage.get(module_edge.source_lineage)
|
||||
if module is not None:
|
||||
return module_source_response(snapshot, module, nodes_by_lineage)
|
||||
|
||||
selected_command = next(
|
||||
(
|
||||
node
|
||||
for node in snapshot.nodes
|
||||
if node.kind in {NodeKind.COMMAND, NodeKind.FORM, NodeKind.FORM_ELEMENT}
|
||||
and (node.qualified_name == qualified_name or node.name == qualified_name)
|
||||
),
|
||||
None,
|
||||
)
|
||||
if selected_command is not None:
|
||||
handler_edge = next(
|
||||
(
|
||||
edge
|
||||
for edge in snapshot.edges
|
||||
if edge.kind == EdgeKind.HANDLES and edge.source_lineage == selected_command.lineage_id
|
||||
),
|
||||
None,
|
||||
)
|
||||
if handler_edge is not None:
|
||||
routine = nodes_by_lineage.get(handler_edge.target_lineage)
|
||||
module_edge = next(
|
||||
(
|
||||
edge
|
||||
for edge in snapshot.edges
|
||||
if edge.kind == EdgeKind.DECLARES and routine is not None and edge.target_lineage == routine.lineage_id
|
||||
),
|
||||
None,
|
||||
)
|
||||
if module_edge is not None:
|
||||
module = nodes_by_lineage.get(module_edge.source_lineage)
|
||||
if module is not None:
|
||||
return module_source_response(snapshot, module, nodes_by_lineage)
|
||||
|
||||
selected_module = next(
|
||||
(
|
||||
node
|
||||
for node in snapshot.nodes
|
||||
if node.kind == NodeKind.MODULE
|
||||
and (
|
||||
node.qualified_name == qualified_name
|
||||
or node.name == qualified_name
|
||||
or node.source_ref.source_path == qualified_name
|
||||
or module_metadata_qualified_name(snapshot, node, nodes_by_lineage) == qualified_name
|
||||
)
|
||||
),
|
||||
None,
|
||||
)
|
||||
if selected_module is not None:
|
||||
return module_source_response(snapshot, selected_module, nodes_by_lineage)
|
||||
owner = next(
|
||||
(
|
||||
node
|
||||
for node in snapshot.nodes
|
||||
if node.qualified_name == qualified_name and node.kind in owner_node_kinds
|
||||
),
|
||||
None,
|
||||
)
|
||||
if owner is None:
|
||||
return []
|
||||
module_edges = [
|
||||
edge
|
||||
for edge in snapshot.edges
|
||||
if edge.kind == EdgeKind.CONTAINS
|
||||
and edge.source_lineage == owner.lineage_id
|
||||
and edge.attributes.get("link_type") == "METADATA_MODULE"
|
||||
]
|
||||
modules: list[ModuleSourceResponse] = []
|
||||
for edge in module_edges:
|
||||
module = nodes_by_lineage.get(edge.target_lineage)
|
||||
if module is None or module.kind != NodeKind.MODULE:
|
||||
continue
|
||||
routines = module_routines(snapshot, module.lineage_id, nodes_by_lineage)
|
||||
module_role = str(edge.attributes.get("module_role") or module.attributes.get("module_role") or "MODULE")
|
||||
modules.append(
|
||||
ModuleSourceResponse(
|
||||
name=module.name,
|
||||
qualified_name=module.qualified_name,
|
||||
module_role=module_role,
|
||||
owner_qualified_name=str(module.attributes.get("owner_qualified_name") or owner.qualified_name or "") or None,
|
||||
owner_kind=str(module.attributes.get("owner_kind") or owner.kind.value or "") or None,
|
||||
object_part=str(
|
||||
edge.attributes.get("object_part")
|
||||
or module.attributes.get("object_part")
|
||||
or module_object_part_for_response(module_role, str(module.attributes.get("form_name") or ""))
|
||||
),
|
||||
form_name=str(edge.attributes.get("form_name") or module.attributes.get("form_name") or "") or None,
|
||||
form_qualified_name=str(module.attributes.get("form_qualified_name") or "") or None,
|
||||
source_path=module.source_ref.source_path,
|
||||
source_text=str(module.attributes.get("source_text", "")),
|
||||
routines_count=len(routines),
|
||||
routines=routines,
|
||||
)
|
||||
)
|
||||
return sorted(modules, key=lambda item: (item.module_role, item.name))
|
||||
|
||||
|
||||
def module_source_response(
|
||||
snapshot: SirSnapshot,
|
||||
module,
|
||||
nodes_by_lineage: dict[str, object],
|
||||
) -> list[ModuleSourceResponse]:
|
||||
owner_edge = next(
|
||||
(
|
||||
edge
|
||||
for edge in snapshot.edges
|
||||
if edge.kind == EdgeKind.CONTAINS
|
||||
and edge.target_lineage == module.lineage_id
|
||||
and edge.attributes.get("link_type") == "METADATA_MODULE"
|
||||
),
|
||||
None,
|
||||
)
|
||||
routines = module_routines(snapshot, module.lineage_id, nodes_by_lineage)
|
||||
owner = nodes_by_lineage.get(owner_edge.source_lineage) if owner_edge else None
|
||||
module_role = str((owner_edge.attributes.get("module_role") if owner_edge else None) or module.attributes.get("module_role") or "MODULE")
|
||||
return [
|
||||
ModuleSourceResponse(
|
||||
name=module.name,
|
||||
qualified_name=module.qualified_name,
|
||||
module_role=module_role,
|
||||
owner_qualified_name=str(module.attributes.get("owner_qualified_name") or getattr(owner, "qualified_name", "") or "") or None,
|
||||
owner_kind=str(module.attributes.get("owner_kind") or getattr(getattr(owner, "kind", None), "value", "") or "") or None,
|
||||
object_part=str(
|
||||
(owner_edge.attributes.get("object_part") if owner_edge else None)
|
||||
or module.attributes.get("object_part")
|
||||
or module_object_part_for_response(module_role, str(module.attributes.get("form_name") or ""))
|
||||
),
|
||||
form_name=str((owner_edge.attributes.get("form_name") if owner_edge else None) or module.attributes.get("form_name") or "") or None,
|
||||
form_qualified_name=str(module.attributes.get("form_qualified_name") or "") or None,
|
||||
source_path=module.source_ref.source_path,
|
||||
source_text=str(module.attributes.get("source_text", "")),
|
||||
routines_count=len(routines),
|
||||
routines=routines,
|
||||
)
|
||||
]
|
||||
|
||||
|
||||
def module_metadata_qualified_name(
|
||||
snapshot: SirSnapshot,
|
||||
module,
|
||||
nodes_by_lineage: dict[str, object],
|
||||
) -> str | None:
|
||||
owner_edge = next(
|
||||
(
|
||||
edge
|
||||
for edge in snapshot.edges
|
||||
if edge.kind == EdgeKind.CONTAINS
|
||||
and edge.target_lineage == module.lineage_id
|
||||
and edge.attributes.get("link_type") == "METADATA_MODULE"
|
||||
),
|
||||
None,
|
||||
)
|
||||
if owner_edge is None:
|
||||
return None
|
||||
owner = nodes_by_lineage.get(owner_edge.source_lineage)
|
||||
if owner is None:
|
||||
return None
|
||||
role = str(owner_edge.attributes.get("module_role", "MODULE"))
|
||||
form_name = str(owner_edge.attributes.get("form_name", ""))
|
||||
suffix = {
|
||||
"OBJECT_MODULE": "МодульОбъекта",
|
||||
"MANAGER_MODULE": "МодульМенеджера",
|
||||
"RECORD_SET_MODULE": "МодульНабораЗаписей",
|
||||
"FORM_MODULE": f"Форма.{form_name}.Модуль" if form_name else "МодульФормы",
|
||||
"MODULE": "Модуль",
|
||||
}.get(role, module.name)
|
||||
return f"{owner.qualified_name}.{suffix}"
|
||||
|
||||
|
||||
def module_routines(
|
||||
snapshot: SirSnapshot,
|
||||
module_lineage: str,
|
||||
nodes_by_lineage: dict[str, object],
|
||||
) -> list[ModuleRoutineResponse]:
|
||||
routines: list[ModuleRoutineResponse] = []
|
||||
for edge in snapshot.edges:
|
||||
if edge.kind != EdgeKind.DECLARES or edge.source_lineage != module_lineage:
|
||||
continue
|
||||
routine = nodes_by_lineage.get(edge.target_lineage)
|
||||
if routine is None or getattr(routine, "kind", None) not in {NodeKind.PROCEDURE, NodeKind.FUNCTION}:
|
||||
continue
|
||||
calls = _routine_relation_values(snapshot, nodes_by_lineage, routine.lineage_id, EdgeKind.CALLS)
|
||||
queries = _routine_relation_values(snapshot, nodes_by_lineage, routine.lineage_id, EdgeKind.OWNS_QUERY)
|
||||
writes = _routine_relation_values(snapshot, nodes_by_lineage, routine.lineage_id, EdgeKind.WRITES)
|
||||
impact_level, impact_reasons = _routine_impact_markers(calls, queries, writes)
|
||||
routines.append(
|
||||
ModuleRoutineResponse(
|
||||
name=routine.name,
|
||||
kind=routine.kind.value,
|
||||
line_start=routine.source_ref.line_start,
|
||||
line_end=routine.source_ref.line_end,
|
||||
export=bool(routine.attributes.get("export", False)),
|
||||
calls_count=len(calls),
|
||||
queries_count=len(queries),
|
||||
writes_count=len(writes),
|
||||
calls=calls,
|
||||
queries=queries,
|
||||
writes=writes,
|
||||
impact_level=impact_level,
|
||||
impact_reasons=impact_reasons,
|
||||
)
|
||||
)
|
||||
return sorted(routines, key=lambda item: item.line_start or 0)
|
||||
|
||||
|
||||
def _routine_impact_markers(calls: list[str], queries: list[str], writes: list[str]) -> tuple[str, list[str]]:
|
||||
reasons: list[str] = []
|
||||
if writes:
|
||||
reasons.append("writes data")
|
||||
if queries:
|
||||
reasons.append("reads query tables")
|
||||
if len(calls) >= 3:
|
||||
reasons.append("fan-out calls")
|
||||
if writes and (queries or len(calls) >= 2):
|
||||
level = "HIGH"
|
||||
elif writes or queries or len(calls) >= 3:
|
||||
level = "MEDIUM"
|
||||
else:
|
||||
level = "LOW"
|
||||
return level, reasons
|
||||
|
||||
|
||||
def _routine_relation_values(
|
||||
snapshot: SirSnapshot,
|
||||
nodes_by_lineage: dict[str, object],
|
||||
routine_lineage: str,
|
||||
relation: EdgeKind,
|
||||
) -> list[str]:
|
||||
values: list[str] = []
|
||||
for edge in snapshot.edges:
|
||||
if edge.kind != relation or edge.source_lineage != routine_lineage:
|
||||
continue
|
||||
target = nodes_by_lineage.get(edge.target_lineage)
|
||||
if target is None:
|
||||
continue
|
||||
if relation == EdgeKind.OWNS_QUERY:
|
||||
query_text = str(target.attributes.get("query_text", "")).strip()
|
||||
values.append(query_text or target.name)
|
||||
else:
|
||||
values.append(target.qualified_name or target.name)
|
||||
return values
|
||||
Reference in New Issue
Block a user