Extract normalized object navigation service
CI / python (push) Has been cancelled
CI / rust (push) Has been cancelled

This commit is contained in:
2026-05-17 21:46:22 +03:00
parent ff8f9a6dd4
commit b67d734aa4
3 changed files with 251 additions and 216 deletions
+12 -216
View File
@@ -125,6 +125,18 @@ from api_server.normalized_project_service import (
normalized_all_groups as _normalized_all_groups,
normalized_project_summary as _normalized_project_summary,
)
from api_server.normalized_object_models import (
BslCompletionItemResponse,
ModuleRoutineResponse,
ModuleSourceResponse,
NormalizedObjectDetail,
)
from api_server.normalized_object_service import (
module_object_part_for_response as _module_object_part_for_response,
normalized_bsl_completion_items as _normalized_bsl_completion_items,
normalized_module_sources_for_object as _normalized_module_sources_for_object,
normalized_object_detail as _normalized_object_detail,
)
from impact_engine import object_impact, routine_impact
from incremental_indexer import rebuild_changed_file
from integration_topology import IntegrationKind, build_integration_topology
@@ -929,50 +941,6 @@ class ImportSyncPreview(BaseModel):
items: list[ImportSyncDiffItem] = Field(default_factory=list)
class NormalizedObjectDetail(BaseModel):
project_id: str | None = None
group_name: str
object: MetadataObject
class ModuleRoutineResponse(BaseModel):
name: str
kind: str
line_start: int | None = None
line_end: int | None = None
export: bool = False
calls_count: int = 0
queries_count: int = 0
writes_count: int = 0
calls: list[str] = Field(default_factory=list)
queries: list[str] = Field(default_factory=list)
writes: list[str] = Field(default_factory=list)
impact_level: str = "LOW"
impact_reasons: list[str] = Field(default_factory=list)
class ModuleSourceResponse(BaseModel):
name: str
qualified_name: str
module_role: str = "MODULE"
owner_qualified_name: str | None = None
owner_kind: str | None = None
object_part: str | None = None
form_name: str | None = None
form_qualified_name: str | None = None
source_path: str
source_text: str
routines_count: int = 0
routines: list[ModuleRoutineResponse] = Field(default_factory=list)
class BslCompletionItemResponse(BaseModel):
label: str
kind: str = "VALUE"
detail: str | None = None
insert_text: str | None = None
class ImportSummary(BaseModel):
source: ImportSourceKind
mode: ImportMode = ImportMode.FULL_REPLACE
@@ -7516,111 +7484,6 @@ def _import_quality_response(project_id: str) -> ImportQualityResponse:
)
def _normalized_object_detail(normalized: NormalizedProject, qualified_name: str) -> NormalizedObjectDetail | None:
for group in normalized.configuration.groups:
for item in group.objects:
if item.qualified_name == qualified_name:
return NormalizedObjectDetail(
project_id=normalized.project_id,
group_name=group.name,
object=item,
)
return None
def _normalized_module_sources_for_object(normalized: NormalizedProject, qualified_name: str) -> list[ModuleSourceResponse]:
normalized_query = qualified_name.strip().casefold()
if not normalized_query:
return []
selected_module = None
selected_owner = None
selected_object = None
for group in _normalized_all_groups(normalized):
for item in group.objects:
if item.qualified_name.casefold() == normalized_query or item.name.casefold() == normalized_query:
selected_object = item
break
for module in item.modules:
module_keys = {
str(module.qualified_name or "").casefold(),
str(module.name or "").casefold(),
str(module.source_path or "").casefold(),
}
if normalized_query in module_keys:
selected_module = module
selected_owner = item
break
if selected_object is not None or selected_module is not None:
break
if selected_object is not None or selected_module is not None:
break
if selected_module is not None:
return [_normalized_module_source_response(selected_module, selected_owner)]
if selected_object is None:
return []
return sorted(
[_normalized_module_source_response(module, selected_object) for module in selected_object.modules],
key=lambda item: (item.module_role, item.name),
)
def _normalized_bsl_completion_items(
normalized: NormalizedProject,
receiver: str | None,
qualified_name: str | None,
) -> list[BslCompletionItemResponse]:
receiver_key = (receiver or "").strip().casefold()
qualified_key = (qualified_name or "").strip().casefold()
items: list[BslCompletionItemResponse] = []
for group in _normalized_all_groups(normalized):
for metadata_object in group.objects:
object_names = {
metadata_object.name.casefold(),
metadata_object.qualified_name.casefold(),
}
if receiver_key and receiver_key in object_names:
items.extend(
BslCompletionItemResponse(
label=part.name,
kind=_completion_kind_for_part(part.kind),
detail=f"{metadata_object.qualified_name}: {part.kind}",
insert_text=part.name,
)
for part in [
*metadata_object.attributes,
*metadata_object.resources,
*metadata_object.dimensions,
*metadata_object.tabular_sections,
*metadata_object.commands,
]
)
for module in metadata_object.modules:
module_names = {
str(module.name or "").casefold(),
str(module.qualified_name or "").casefold(),
str(module.source_path or "").casefold(),
f"{metadata_object.name}.{module.name}".casefold(),
f"{metadata_object.qualified_name}.{module.name}".casefold(),
}
if receiver_key and receiver_key not in module_names and receiver_key not in object_names:
continue
if not receiver_key and qualified_key and qualified_key not in module_names and qualified_key not in object_names:
continue
source_text = str((module.attributes or {}).get("source_text", ""))
for routine in _normalized_module_routines(source_text):
if receiver_key and not routine.export:
continue
items.append(
BslCompletionItemResponse(
label=routine.name,
kind="FUNCTION" if routine.kind == "FUNCTION" else "PROCEDURE",
detail=f"{module.qualified_name or module.name}{' · Export' if routine.export else ''}",
insert_text=f"{routine.name}()",
)
)
return items
def _snapshot_bsl_completion_items(snapshot: SirSnapshot, receiver: str | None) -> list[BslCompletionItemResponse]:
receiver_key = (receiver or "").strip().casefold()
nodes_by_lineage = {node.lineage_id: node for node in snapshot.nodes}
@@ -7654,73 +7517,6 @@ def _snapshot_bsl_completion_items(snapshot: SirSnapshot, receiver: str | None)
return items
def _completion_kind_for_part(kind: str) -> str:
normalized = kind.upper()
if normalized in {"ATTRIBUTE", "RESOURCE", "DIMENSION", "FIELD"}:
return "PROPERTY"
if normalized in {"COMMAND", "METHOD", "OPERATION"}:
return "METHOD"
if normalized in {"TABULAR_SECTION", "TABLE"}:
return "COLLECTION"
return "VALUE"
def _normalized_module_source_response(module, owner) -> ModuleSourceResponse:
attributes = module.attributes or {}
source_text = str(attributes.get("source_text", ""))
routines = _normalized_module_routines(source_text)
module_role = str(module.module_kind or attributes.get("module_role") or "MODULE")
return ModuleSourceResponse(
name=module.name,
qualified_name=module.qualified_name or module.name,
module_role=module_role,
owner_qualified_name=str(attributes.get("owner_qualified_name") or getattr(owner, "qualified_name", "") or "") or None,
owner_kind=str(attributes.get("owner_kind") or getattr(owner, "object_kind", "") or "") or None,
object_part=str(attributes.get("object_part") or _module_object_part_for_response(module_role, str(attributes.get("form_name") or ""))),
form_name=str(attributes.get("form_name") or "") or None,
form_qualified_name=str(attributes.get("form_qualified_name") or "") or None,
source_path=module.source_path or "",
source_text=source_text,
routines_count=len(routines),
routines=routines,
)
def _module_object_part_for_response(module_role: str, form_name: str = "") -> str:
return {
"OBJECT_MODULE": "object.module",
"MANAGER_MODULE": "object.manager",
"RECORD_SET_MODULE": "object.record_set",
"FORM_MODULE": f"form.{form_name}.module" if form_name else "form.module",
"MODULE": "module",
}.get(module_role, "module")
def _normalized_module_routines(source_text: str) -> list[ModuleRoutineResponse]:
if not source_text:
return []
declarations: list[tuple[int, re.Match[str]]] = []
pattern = re.compile(r"^\s*(Процедура|Функция|Procedure|Function)\s+([A-Za-zА-Яа-яЁё_][\wА-Яа-яЁё]*)\s*\(([^)]*)\)\s*(.*)$", re.IGNORECASE | re.MULTILINE)
for match in pattern.finditer(source_text):
line_start = source_text.count("\n", 0, match.start()) + 1
declarations.append((line_start, match))
routines: list[ModuleRoutineResponse] = []
for index, (line_start, match) in enumerate(declarations):
line_end = declarations[index + 1][0] - 1 if index + 1 < len(declarations) else len(source_text.splitlines())
kind_label = match.group(1).casefold()
tail = match.group(4) or ""
routines.append(
ModuleRoutineResponse(
name=match.group(2),
kind="FUNCTION" if kind_label in {"функция", "function"} else "PROCEDURE",
line_start=line_start,
line_end=line_end,
export=bool(re.search(r"\b(Экспорт|Export)\b", tail, re.IGNORECASE)),
)
)
return routines
def _module_sources_for_object(snapshot: SirSnapshot, qualified_name: str) -> list[ModuleSourceResponse]:
nodes_by_lineage = {node.lineage_id: node for node in snapshot.nodes}
selected_routine = next(