diff --git a/services/api-server/src/api_server/main.py b/services/api-server/src/api_server/main.py index 92120cd..006a096 100644 --- a/services/api-server/src/api_server/main.py +++ b/services/api-server/src/api_server/main.py @@ -100,6 +100,27 @@ from api_server.metadata_tree_controller import ( metadata_tree_path as _metadata_tree_path, metadata_tree_search as _metadata_tree_search, ) +from api_server.metadata_tree_builder import ( + _configuration_like_empty_root, + _metadata_child_count_index, + _metadata_node_for_search_result, + _metadata_search_rank, + _metadata_tree_children_for_node, + _metadata_tree_path_for_node, + _metadata_tree_path_steps, + _normalized_all_groups, + _normalized_metadata_tree_children_for_node, + _project_metadata_tree_response as _builder_project_metadata_tree_response, + _project_metadata_tree_response_from_normalized as _builder_project_metadata_tree_response_from_normalized, + _is_metadata_tree_search_node, +) +from api_server.metadata_tree_models import ( + MetadataTreeChildrenResponse, + MetadataTreeNodeResponse, + MetadataTreePathResponse, + MetadataTreeSearchResponse, + ProjectMetadataTreeResponse, +) from impact_engine import object_impact, routine_impact from incremental_indexer import rebuild_changed_file from integration_topology import IntegrationKind, build_integration_topology @@ -119,8 +140,6 @@ from operations_core import ( from one_c_normalizer import ( COMMON_BRANCH_CHILDREN, METADATA_CHILD_OBJECT_SPECS, - METADATA_TYPE_BY_BRANCH, - METADATA_TYPE_BY_CODE, METADATA_TYPE_CONTEXT_ACTIONS, METADATA_TYPE_DESCRIPTIONS, METADATA_TYPE_DOCUMENTATION_URLS, @@ -266,20 +285,6 @@ _MODULE_OWNER_NODE_KINDS = { if _EVENT_SUBSCRIPTION_KIND is not None: _MODULE_OWNER_NODE_KINDS.add(_EVENT_SUBSCRIPTION_KIND) -_METADATA_ICON_BY_NODE_KIND = { - NodeKind.EXCHANGE_PLAN: "exchange-plan", - NodeKind.SCHEDULED_JOB: "scheduled-job", - NodeKind.ATTRIBUTE: "attribute", - NodeKind.COMMAND: "command", - NodeKind.FORM: "form", - NodeKind.FORM_ELEMENT: "form", - NodeKind.ROLE: "role", - NodeKind.TABULAR_SECTION: "tabular", -} -if _EVENT_SUBSCRIPTION_KIND is not None: - _METADATA_ICON_BY_NODE_KIND[_EVENT_SUBSCRIPTION_KIND] = "event" - - class ProjectSetupStatus(str, Enum): NOT_CONFIGURED = "NOT_CONFIGURED" IMPORT_REQUIRED = "IMPORT_REQUIRED" @@ -1100,53 +1105,6 @@ class MetadataCatalogResponse(BaseModel): child_object_types: list[MetadataChildObjectSpecResponse] = Field(default_factory=list) -class MetadataTreeNodeResponse(BaseModel): - id: str - label: str - kind: str - icon: str - qualified_name: str | None = None - count: int = 0 - loaded_count: int = 0 - has_more: bool = False - children: list["MetadataTreeNodeResponse"] = Field(default_factory=list) - - -class ProjectMetadataTreeResponse(BaseModel): - project_id: str - root: MetadataTreeNodeResponse - - -class MetadataTreeChildrenResponse(BaseModel): - project_id: str - parent_id: str - offset: int = 0 - limit: int = 50 - total: int = 0 - has_more: bool = False - children: list[MetadataTreeNodeResponse] = Field(default_factory=list) - - -class MetadataTreeSearchResponse(BaseModel): - project_id: str - q: str - total: int = 0 - results: list[MetadataTreeNodeResponse] = Field(default_factory=list) - - -class MetadataTreePathStepResponse(BaseModel): - parent_id: str - child_id: str - offset: int = 0 - - -class MetadataTreePathResponse(BaseModel): - project_id: str - node_id: str - path: list[str] = Field(default_factory=list) - steps: list[MetadataTreePathStepResponse] = Field(default_factory=list) - - class FlowchartNodeResponse(BaseModel): id: str label: str @@ -4129,154 +4087,6 @@ def _persist_authoring_rollback( return stored, path.as_posix() -_METADATA_SPEC_PREFIXES = { - "COMMON_MODULE": "ОбщийМодуль.", - "CONSTANT": "Константа.", - "CATALOG": "Справочник.", - "DOCUMENT": "Документ.", - "DOCUMENT_JOURNAL": "ЖурналДокументов.", - "ENUM": "Перечисление.", - "REPORT": "Отчет.", - "DATA_PROCESSOR": "Обработка.", - "CHART_OF_CHARACTERISTIC_TYPES": "ПланВидовХарактеристик.", - "CHART_OF_ACCOUNTS": "ПланСчетов.", - "CHART_OF_CALCULATION_TYPES": "ПланВидовРасчета.", - "INFORMATION_REGISTER": "РегистрСведений.", - "ACCUMULATION_REGISTER": "РегистрНакопления.", - "ACCOUNTING_REGISTER": "РегистрБухгалтерии.", - "CALCULATION_REGISTER": "РегистрРасчета.", - "BUSINESS_PROCESS": "БизнесПроцесс.", - "TASK": "Задача.", - "EXTERNAL_DATA_SOURCE": "ВнешнийИсточникДанных.", - "EXCHANGE_PLAN": "ПланОбмена.", - "EVENT_SUBSCRIPTION": "ПодпискаНаСобытие.", - "SCHEDULED_JOB": "РегламентноеЗадание.", -} - - -_METADATA_SPEC_NODE_KINDS = { - "COMMON_MODULE": {NodeKind.COMMON_MODULE}, - "CATALOG": {NodeKind.CATALOG}, - "DOCUMENT": {NodeKind.DOCUMENT}, - "INFORMATION_REGISTER": {NodeKind.REGISTER}, - "ACCUMULATION_REGISTER": {NodeKind.REGISTER}, - "ACCOUNTING_REGISTER": {NodeKind.REGISTER}, - "CALCULATION_REGISTER": {NodeKind.REGISTER}, - "BUSINESS_PROCESS": {NodeKind.BUSINESS_PROCESS}, - "TASK": {NodeKind.TASK}, - "EXCHANGE_PLAN": {NodeKind.EXCHANGE_PLAN}, - "SCHEDULED_JOB": {NodeKind.SCHEDULED_JOB}, -} -if _EVENT_SUBSCRIPTION_KIND is not None: - _METADATA_SPEC_NODE_KINDS["EVENT_SUBSCRIPTION"] = {_EVENT_SUBSCRIPTION_KIND} - - -def _common_branch_spec_code(label: str) -> str | None: - spec = METADATA_TYPE_BY_BRANCH.get(label) - if spec is None: - return None - if spec.code in {"COMMON", "EXTENSION"}: - return None - return spec.code - - -def _metadata_spec_by_code(spec_code: str): - return METADATA_TYPE_BY_CODE.get(spec_code) - - -def _metadata_tree_node( - node_id: str, - label: str, - kind: str, - icon: str, - *, - qualified_name: str | None = None, - children: list[MetadataTreeNodeResponse] | None = None, - count: int | None = None, - has_more: bool = False, -) -> MetadataTreeNodeResponse: - child_nodes = children or [] - return MetadataTreeNodeResponse( - id=node_id, - label=label, - kind=kind, - icon=icon, - qualified_name=qualified_name, - count=count if count is not None else sum(child.count or 1 for child in child_nodes), - loaded_count=len(child_nodes), - has_more=has_more, - children=child_nodes, - ) - - -def _configuration_like_empty_root(root_id: str, label: str, kind: str) -> MetadataTreeNodeResponse: - common_children = [ - _metadata_tree_node(f"{root_id}.common.{label}", label, "COMMON_BRANCH", _metadata_icon_for_common_branch(label)) - for label in COMMON_BRANCH_CHILDREN - ] - spec_branches = [ - _metadata_tree_node(f"{root_id}.branch.{spec.code}", spec.tree_branch, spec.code, spec.icon, count=0) - for spec in METADATA_TYPE_SPECS - if spec.code not in {"COMMON", "EXTENSION"} and spec.tree_branch not in COMMON_BRANCH_CHILDREN - ] - return _metadata_tree_node( - root_id, - label, - kind, - "configuration", - children=[ - _metadata_tree_node(f"{root_id}.info", "Сведения", "CONFIGURATION_INFO", "configuration"), - _metadata_tree_node(f"{root_id}.common", "Общие", "COMMON", "common", children=common_children), - *spec_branches, - ], - ) - - -def _metadata_common_branch( - snapshot: SirSnapshot, - label: str, - object_limit_per_branch: int, -) -> MetadataTreeNodeResponse: - spec_code = _common_branch_spec_code(label) - if spec_code is None: - return _metadata_tree_node( - f"common.{label}", - label, - "COMMON_BRANCH", - _metadata_icon_for_common_branch(label), - count=0, - has_more=False, - ) - - spec = _metadata_spec_by_code(spec_code) - if spec is None: - return _metadata_tree_node( - f"common.{label}", - label, - "COMMON_BRANCH", - _metadata_icon_for_common_branch(label), - count=0, - has_more=False, - ) - - object_nodes = sorted( - [node for node in snapshot.nodes if _node_matches_metadata_spec(node, spec)], - key=lambda item: item.qualified_name, - ) - visible_nodes = object_nodes[:object_limit_per_branch] if object_limit_per_branch > 0 else [] - child_count_index = _metadata_child_count_index(snapshot, [node.lineage_id for node in visible_nodes]) - children = [_metadata_object_tree_node(snapshot, node, spec, child_count_index) for node in visible_nodes] - return _metadata_tree_node( - f"common.{label}", - label, - "COMMON_BRANCH", - _metadata_icon_for_common_branch(label), - children=children, - count=len(object_nodes), - has_more=len(visible_nodes) < len(object_nodes), - ) - - def _conditional_configuration_like_roots(project_id: str | None) -> list[MetadataTreeNodeResponse]: if not project_id: return [] @@ -4292,51 +4102,10 @@ def _conditional_configuration_like_roots(project_id: str | None) -> list[Metada def _project_metadata_tree_response(snapshot: SirSnapshot, object_limit_per_branch: int = 200) -> MetadataTreeNodeResponse: - common_children = [ - _metadata_common_branch(snapshot, label, object_limit_per_branch) - for label in COMMON_BRANCH_CHILDREN - ] - spec_branches = [ - _metadata_tree_branch_for_spec(snapshot, spec, object_limit_per_branch) - for spec in METADATA_TYPE_SPECS - if spec.code not in {"COMMON", "EXTENSION"} and spec.tree_branch not in COMMON_BRANCH_CHILDREN - ] - main_configuration = _metadata_tree_node( - "main-configuration", - "Основная конфигурация", - "MAIN_CONFIGURATION", - "configuration", - children=[ - _metadata_tree_node("main-configuration.info", "Сведения", "CONFIGURATION_INFO", "configuration"), - _metadata_tree_node("common", "Общие", "COMMON", "common", children=common_children), - *spec_branches, - ], - ) - sfera = _metadata_tree_node( - "sfera", - "SFERA", - "SFERA_ROOT", - "service", - children=[ - _metadata_tree_node("sfera-objects", "Наши объекты SFERA", "SFERA_OBJECTS", "service"), - *[ - _metadata_tree_node(f"sfera.{label}", label, "SFERA_SECTION", "service") - for label in ("Блок-схема", "Задачи разработки", "Проверки", "Версии", "Инциденты", "Знания", "Настройки") - ], - ], - ) - return _metadata_tree_node( - f"project.{snapshot.project_id}", - "Проект", - "PROJECT", - "project", - children=[ - main_configuration, - _metadata_tree_node("extension.placeholder", "Расширение: <Имя>", "EXTENSION", "extension"), - *_conditional_configuration_like_roots(snapshot.project_id), - sfera, - _metadata_tree_node("environments", "Среды", "ENVIRONMENTS", "service"), - ], + return _builder_project_metadata_tree_response( + snapshot, + object_limit_per_branch=object_limit_per_branch, + extra_roots=_conditional_configuration_like_roots(snapshot.project_id), ) @@ -4344,657 +4113,11 @@ def _project_metadata_tree_response_from_normalized( normalized: NormalizedProject, object_limit_per_branch: int = 200, ) -> MetadataTreeNodeResponse: - main_configuration = _normalized_configuration_tree_root( - "main-configuration", - "Основная конфигурация", - "MAIN_CONFIGURATION", - normalized.configuration.groups, - object_limit_per_branch, + return _builder_project_metadata_tree_response_from_normalized( + normalized, + object_limit_per_branch=object_limit_per_branch, + extra_roots=_conditional_configuration_like_roots(normalized.project_id), ) - visible_extensions = ( - normalized.configuration.extensions[:object_limit_per_branch] - if object_limit_per_branch > 0 - else normalized.configuration.extensions - ) - extension_roots = [ - _normalized_configuration_tree_root( - f"extension.{stable_hash(extension.name)}", - f"Расширение: {extension.name}", - "EXTENSION", - extension.groups, - object_limit_per_branch, - icon="extension", - info={ - "qualified_name": extension.qualified_name, - "version": extension.version, - **dict(getattr(extension, "attributes", {}) or {}), - }, - ) - for extension in visible_extensions - ] - if not extension_roots: - extension_roots = [ - _metadata_tree_node("extension.placeholder", "Расширение: <Имя>", "EXTENSION", "extension") - ] - sfera = _metadata_tree_node( - "sfera", - "SFERA", - "SFERA_ROOT", - "service", - children=[ - _metadata_tree_node("sfera-objects", "Наши объекты SFERA", "SFERA_OBJECTS", "service"), - *[ - _metadata_tree_node(f"sfera.{label}", label, "SFERA_SECTION", "service") - for label in ("Блок-схема", "Задачи разработки", "Проверки", "Версии", "Инциденты", "Знания", "Настройки") - ], - ], - ) - return _metadata_tree_node( - f"project.{normalized.project_id or 'normalized'}", - "Проект", - "PROJECT", - "project", - children=[ - main_configuration, - *extension_roots, - *_conditional_configuration_like_roots(normalized.project_id), - sfera, - _metadata_tree_node("environments", "Среды", "ENVIRONMENTS", "service"), - ], - ) - - -def _normalized_configuration_tree_root( - root_id: str, - label: str, - kind: str, - groups, - object_limit_per_branch: int, - *, - icon: str = "configuration", - info: dict | None = None, -) -> MetadataTreeNodeResponse: - common_children = [ - _normalized_common_branch(groups, common_label, object_limit_per_branch, root_id=root_id) - for common_label in COMMON_BRANCH_CHILDREN - ] - group_branches = [ - _normalized_group_tree_branch(group, object_limit_per_branch, root_id=root_id) - for group in groups - if group.name != "Расширения" and group.name not in COMMON_BRANCH_CHILDREN - ] - info_label = "Сведения" - if info: - version = info.get("version") or info.get("Version") - if version: - info_label = f"Сведения ({version})" - return _metadata_tree_node( - root_id, - label, - kind, - icon, - children=[ - _metadata_tree_node(f"{root_id}.info", info_label, "CONFIGURATION_INFO", icon), - _metadata_tree_node(f"{root_id}.common", "Общие", "COMMON", "common", children=common_children), - *group_branches, - ], - ) - - -def _normalized_group_tree_branch(group, object_limit_per_branch: int, *, root_id: str = "main-configuration") -> MetadataTreeNodeResponse: - visible_objects = group.objects[:object_limit_per_branch] if object_limit_per_branch > 0 else [] - node_id = _normalized_branch_node_id(root_id, group.name) - return _metadata_tree_node( - node_id, - group.name, - ",".join(group.object_kinds), - "metadata", - children=[_normalized_object_tree_node(item) for item in visible_objects], - count=len(group.objects), - has_more=len(visible_objects) < len(group.objects), - ) - - -def _normalized_common_branch(groups, label: str, object_limit_per_branch: int, *, root_id: str = "main-configuration") -> MetadataTreeNodeResponse: - node_id = _normalized_common_node_id(root_id, label) - group = next((item for item in groups if item.name == label), None) - if group is None: - return _metadata_tree_node( - node_id, - label, - "COMMON_BRANCH", - _metadata_icon_for_common_branch(label), - count=0, - has_more=False, - ) - - visible_objects = group.objects[:object_limit_per_branch] if object_limit_per_branch > 0 else [] - return _metadata_tree_node( - node_id, - label, - "COMMON_BRANCH", - _metadata_icon_for_common_branch(label), - children=[_normalized_object_tree_node(item) for item in visible_objects], - count=len(group.objects), - has_more=len(visible_objects) < len(group.objects), - ) - - -def _normalized_object_tree_node(item) -> MetadataTreeNodeResponse: - spec = METADATA_TYPE_BY_CODE.get(item.object_kind) - expected_groups = spec.child_groups if spec is not None else _NORMALIZED_CHILD_GROUPS_BY_LABEL.keys() - child_groups = [ - (label, len(getattr(item, field_name, [])), icon) - for label in expected_groups - for field_name, icon in [_NORMALIZED_CHILD_GROUPS_BY_LABEL.get(label, ("", "metadata"))] - if field_name and len(getattr(item, field_name, [])) > 0 - ] - children = [ - _metadata_tree_node( - f"normalized.{stable_hash(item.qualified_name)}.{label}", - label, - "METADATA_CHILD_GROUP", - icon, - count=count, - has_more=count > 0, - ) - for label, count, icon in child_groups - if count > 0 - ] - return _metadata_tree_node( - f"normalized.{stable_hash(item.qualified_name)}", - item.name, - item.object_kind, - _normalized_object_icon(item.object_kind), - qualified_name=item.qualified_name, - children=children, - count=len(children), - ) - - -def _normalized_metadata_tree_children_for_node( - normalized: NormalizedProject, - *, - node_id: str, - offset: int, - limit: int, -) -> tuple[list[MetadataTreeNodeResponse], int] | None: - if node_id.startswith("normalized.branch."): - group = _normalized_group_by_branch_id(normalized, node_id) - if group is None: - raise HTTPException(status_code=404, detail=f"Metadata branch not found: {node_id}") - return _slice_normalized_objects(group.objects, offset, limit) - - if node_id.startswith("common."): - group = _normalized_group_by_name(normalized, node_id.removeprefix("common.")) - if group is None: - return [], 0 - return _slice_normalized_objects(group.objects, offset, limit) - - extension_scope = _normalized_extension_scope_for_node(normalized, node_id) - if extension_scope is not None: - extension, scoped_id = extension_scope - if scoped_id.startswith("branch."): - group = _normalized_group_by_branch_id_for_groups(extension.groups, scoped_id) - if group is None: - raise HTTPException(status_code=404, detail=f"Extension metadata branch not found: {node_id}") - return _slice_normalized_objects(group.objects, offset, limit) - if scoped_id.startswith("common."): - group = _normalized_group_by_name_for_groups(extension.groups, scoped_id.removeprefix("common.")) - if group is None: - return [], 0 - return _slice_normalized_objects(group.objects, offset, limit) - - child_group = _normalized_child_group_for_node(normalized, node_id) - if child_group is not None: - parts, icon = child_group - page = parts[offset : offset + limit] - return [_normalized_part_tree_node(part, icon) for part in page], len(parts) - - if node_id.startswith("normalized."): - raise HTTPException(status_code=404, detail=f"Metadata tree node not found: {node_id}") - - return None - - -def _normalized_group_by_branch_id(normalized: NormalizedProject, node_id: str): - return _normalized_group_by_branch_id_for_groups(normalized.configuration.groups, node_id.removeprefix("normalized.")) - - -def _normalized_group_by_branch_id_for_groups(groups, node_id: str): - return next( - ( - group - for group in groups - if node_id == f"branch.{stable_hash(group.name)}" - ), - None, - ) - - -def _normalized_group_by_name(normalized: NormalizedProject, name: str): - return _normalized_group_by_name_for_groups(normalized.configuration.groups, name) - - -def _normalized_group_by_name_for_groups(groups, name: str): - return next((group for group in groups if group.name == name), None) - - -def _normalized_extension_scope_for_node(normalized: NormalizedProject, node_id: str): - prefix = "extension." - if not node_id.startswith(prefix): - return None - payload = node_id.removeprefix(prefix) - if "." not in payload: - return None - extension_hash, scoped_id = payload.split(".", 1) - extension = next( - (item for item in normalized.configuration.extensions if stable_hash(item.name) == extension_hash), - None, - ) - if extension is None: - return None - return extension, scoped_id - - -def _normalized_branch_node_id(root_id: str, group_name: str) -> str: - if root_id == "main-configuration": - return f"normalized.branch.{stable_hash(group_name)}" - return f"{root_id}.branch.{stable_hash(group_name)}" - - -def _normalized_common_node_id(root_id: str, label: str) -> str: - if root_id == "main-configuration": - return f"common.{label}" - return f"{root_id}.common.{label}" - - -def _slice_normalized_objects(objects, offset: int, limit: int) -> tuple[list[MetadataTreeNodeResponse], int]: - page = objects[offset : offset + limit] - return [_normalized_object_tree_node(item) for item in page], len(objects) - - -def _normalized_child_group_for_node(normalized: NormalizedProject, node_id: str): - prefix = "normalized." - if not node_id.startswith(prefix): - return None - payload = node_id.removeprefix(prefix) - if "." not in payload: - return None - object_hash, label = payload.split(".", 1) - item = next( - ( - candidate - for group in _normalized_all_groups(normalized) - for candidate in group.objects - if stable_hash(candidate.qualified_name) == object_hash - ), - None, - ) - if item is None: - return None - group = _NORMALIZED_CHILD_GROUPS_BY_LABEL.get(label) - if group is None: - return None - field_name, icon = group - return getattr(item, field_name, []), icon - - -def _normalized_all_groups(normalized: NormalizedProject): - groups = list(normalized.configuration.groups) - for extension in normalized.configuration.extensions: - groups.extend(extension.groups) - return groups - - -_NORMALIZED_CHILD_GROUPS_BY_LABEL = { - "Реквизиты": ("attributes", "attribute"), - "Измерения": ("dimensions", "attribute"), - "Ресурсы": ("resources", "attribute"), - "Табличные части": ("tabular_sections", "table"), - "Формы": ("forms", "form"), - "Команды": ("commands", "command"), - "Макеты": ("layouts", "layout"), - "Табличные документы": ("tabular_documents", "table"), - "СКД": ("data_composition_schemas", "layout"), - "Варианты отчета": ("report_variants", "report"), - "Настройки": ("report_settings", "settings"), - "Хранилище вариантов": ("report_storages", "settings"), - "Хранилище настроек": ("report_storages", "settings"), - "Справка": ("help_items", "common"), - "Модули": ("modules", "module"), - "Модуль": ("modules", "module"), - "Модуль формы": ("modules", "module"), - "Модуль объекта": ("modules", "module"), - "Модуль менеджера": ("modules", "module"), - "Модуль набора записей": ("modules", "module"), - "Права": ("rights", "role"), - "События": ("events", "event"), - "Движения": ("movements", "movement"), - "Значения": ("values", "enum"), - "Предопределенные данные": ("predefined", "metadata"), - "Шаблоны URL": ("url_templates", "http"), - "Методы": ("methods", "module"), - "Метод": ("methods", "module"), - "Операции": ("operations", "service"), - "Параметры": ("parameters", "attribute"), - "Каналы": ("channels", "service"), - "Сообщения": ("messages", "service"), - "Таблицы": ("tables", "table"), - "Кубы": ("cubes", "table"), - "Функции": ("methods", "module"), - "Графы": ("fields", "attribute"), - "Элементы": ("fields", "attribute"), -} - - -def _normalized_part_tree_node(part, fallback_icon: str) -> MetadataTreeNodeResponse: - qualified_name = part.qualified_name or part.name - children = [_normalized_part_tree_node(child, _normalized_part_icon(child, fallback_icon)) for child in getattr(part, "children", [])] - part_key = f"{qualified_name}.{part.kind}.{part.source_path or ''}" - return _metadata_tree_node( - f"normalized.part.{stable_hash(part_key)}", - part.name, - part.kind, - _normalized_part_icon(part, fallback_icon), - qualified_name=qualified_name, - children=children, - count=len(children), - ) - - -def _normalized_part_icon(part, fallback_icon: str) -> str: - kind = getattr(part, "kind", "").upper() - if kind == "FORM": - return "form" - if kind == "COMMAND": - return "command" - if kind == "MODULE": - return "module" - if kind in {"METHOD", "OPERATION"}: - return "module" - if kind in {"DATA_COMPOSITION_SCHEMA", "LAYOUT"}: - return "layout" - if kind in {"REPORT_VARIANT", "REPORT"}: - return "report" - if kind in {"REPORT_SETTING", "REPORT_STORAGE"}: - return "settings" - if kind == "HELP": - return "common" - if kind in {"URL_TEMPLATE", "HTTP_SERVICE"}: - return "http" - if kind in {"PARAMETER", "FIELD", "DIMENSION", "RESOURCE"}: - return "attribute" - if kind in {"ENUM_VALUE", "PREDEFINED"}: - return "enum" - if kind in {"CHANNEL", "MESSAGE"}: - return "service" - if kind in {"RIGHTS", "ROLE"}: - return "role" - if "TABULAR" in kind: - return "table" - if "EVENT" in kind: - return "event" - return fallback_icon - - -def _normalized_object_icon(object_kind: str) -> str: - kind = object_kind.upper() - if "EXCHANGE_PLAN" in kind or "ПЛАНОБМЕНА" in kind: - return "exchange-plan" - if "SCHEDULED_JOB" in kind or "РЕГЛАМЕНТНОЕЗАДАНИЕ" in kind: - return "scheduled-job" - if "EVENT_SUBSCRIPTION" in kind or "ПОДПИСКА" in kind: - return "event" - if "CATALOG" in kind: - return "catalog" - if "DOCUMENT" in kind: - return "document" - if "REGISTER" in kind: - return "register" - if "REPORT" in kind: - return "report" - if "PROCESSING" in kind: - return "processing" - if "FORM" in kind: - return "form" - if "COMMAND" in kind: - return "command" - if "TASK" in kind: - return "task" - if "ENUM" in kind: - return "enum" - if "COMMON MODULE" in kind or "COMMON_MODULE" in kind: - return "module" - if "EXTENSION" in kind: - return "extension" - return "metadata" - - -def _metadata_tree_branch_for_spec(snapshot: SirSnapshot, spec, object_limit_per_branch: int) -> MetadataTreeNodeResponse: - object_nodes = sorted( - [node for node in snapshot.nodes if _node_matches_metadata_spec(node, spec)], - key=lambda item: item.qualified_name, - ) - visible_nodes = object_nodes[:object_limit_per_branch] if object_limit_per_branch > 0 else [] - child_count_index = _metadata_child_count_index(snapshot, [node.lineage_id for node in visible_nodes]) - children = [_metadata_object_tree_node(snapshot, node, spec, child_count_index) for node in visible_nodes] - return _metadata_tree_node( - f"branch.{spec.code}", - spec.tree_branch, - spec.code, - spec.icon, - children=children, - count=len(object_nodes), - has_more=len(visible_nodes) < len(object_nodes), - ) - - -def _metadata_object_tree_node(snapshot: SirSnapshot, node, spec, child_count_index: dict[str, Counter[EdgeKind]] | None = None) -> MetadataTreeNodeResponse: - groups = (*spec.child_groups, *spec.module_kinds, "Версии", "Проверки", "Инциденты", "Знания") - return _metadata_tree_node( - node.lineage_id, - node.name, - node.kind.value, - spec.icon, - qualified_name=node.qualified_name, - children=[ - _metadata_tree_node( - f"{node.lineage_id}.{group}", - group, - "METADATA_CHILD_GROUP", - _metadata_icon_for_child_group(group), - count=_metadata_child_group_count_from_index(child_count_index, node.lineage_id, group), - has_more=_metadata_child_group_count_from_index(child_count_index, node.lineage_id, group) > 0, - ) - for group in groups - ], - count=len(groups), - ) - - -def _metadata_tree_children_for_node( - snapshot: SirSnapshot, - *, - node_id: str, - offset: int, - limit: int, -) -> tuple[list[MetadataTreeNodeResponse], int]: - if node_id.startswith("branch."): - spec_code = node_id.removeprefix("branch.") - spec = _metadata_spec_by_code(spec_code) - if spec is None: - raise HTTPException(status_code=404, detail=f"Metadata branch not found: {node_id}") - object_nodes = sorted( - [node for node in snapshot.nodes if _node_matches_metadata_spec(node, spec)], - key=lambda item: item.qualified_name, - ) - page = object_nodes[offset : offset + limit] - child_count_index = _metadata_child_count_index(snapshot, [node.lineage_id for node in page]) - return [_metadata_object_tree_node(snapshot, node, spec, child_count_index) for node in page], len(object_nodes) - - if node_id == "common": - children = [ - _metadata_common_branch(snapshot, label, 200) - for label in COMMON_BRANCH_CHILDREN - ] - return _slice_metadata_children(children, offset, limit) - - if node_id.startswith("common."): - return _common_branch_children(snapshot, node_id.removeprefix("common."), offset, limit) - - group_owner_id, group_name = _split_metadata_group_node_id(node_id) - if group_owner_id and group_name: - return _metadata_child_group_children(snapshot, group_owner_id, group_name, offset, limit) - - owner_node = _find_snapshot_node(snapshot, node_id) - if owner_node is not None: - spec = _metadata_spec_for_node(owner_node) - if spec is None: - return [], 0 - children = [ - _metadata_tree_node( - f"{owner_node.lineage_id}.{group}", - group, - "METADATA_CHILD_GROUP", - _metadata_icon_for_child_group(group), - count=_metadata_child_group_count(snapshot, owner_node.lineage_id, group), - has_more=_metadata_child_group_count(snapshot, owner_node.lineage_id, group) > 0, - ) - for group in (*spec.child_groups, *spec.module_kinds, "Версии", "Проверки", "Инциденты", "Знания") - ] - return _slice_metadata_children(children, offset, limit) - - raise HTTPException(status_code=404, detail=f"Metadata tree node not found: {node_id}") - - -def _slice_metadata_children( - children: list[MetadataTreeNodeResponse], - offset: int, - limit: int, -) -> tuple[list[MetadataTreeNodeResponse], int]: - return children[offset : offset + limit], len(children) - - -def _common_branch_children( - snapshot: SirSnapshot, - label: str, - offset: int, - limit: int, -) -> tuple[list[MetadataTreeNodeResponse], int]: - spec_code = _common_branch_spec_code(label) - if spec_code is None: - return [], 0 - spec = _metadata_spec_by_code(spec_code) - if spec is None: - return [], 0 - object_nodes = sorted( - [node for node in snapshot.nodes if _node_matches_metadata_spec(node, spec)], - key=lambda item: item.qualified_name, - ) - page = object_nodes[offset : offset + limit] - return [_metadata_object_tree_node(snapshot, node, spec) for node in page], len(object_nodes) - - -def _split_metadata_group_node_id(node_id: str) -> tuple[str | None, str | None]: - known_groups = { - group - for spec in METADATA_TYPE_SPECS - for group in (*spec.child_groups, *spec.module_kinds, "Версии", "Проверки", "Инциденты", "Знания") - } - for group in sorted(known_groups, key=len, reverse=True): - suffix = f".{group}" - if node_id.endswith(suffix): - return node_id[: -len(suffix)], group - return None, None - - -def _metadata_child_group_children( - snapshot: SirSnapshot, - owner_id: str, - group_name: str, - offset: int, - limit: int, -) -> tuple[list[MetadataTreeNodeResponse], int]: - edge_kinds = _edge_kinds_for_metadata_group(group_name) - if not edge_kinds: - return [], 0 - target_ids = [ - edge.target_lineage - for edge in snapshot.edges - if edge.source_lineage == owner_id - and edge.kind in edge_kinds - and (edge.kind != EdgeKind.CONTAINS or edge.attributes.get("link_type") in {"METADATA_MODULE", "FORM_MODULE"}) - ] - nodes_by_id = {node.lineage_id: node for node in snapshot.nodes} - child_nodes = sorted( - [nodes_by_id[target_id] for target_id in target_ids if target_id in nodes_by_id], - key=lambda item: item.qualified_name, - ) - if group_name == "Процедуры": - child_nodes = [node for node in child_nodes if node.kind == NodeKind.PROCEDURE] - elif group_name == "Функции": - child_nodes = [node for node in child_nodes if node.kind == NodeKind.FUNCTION] - page = child_nodes[offset : offset + limit] - return [_metadata_leaf_tree_node(snapshot, node) for node in page], len(child_nodes) - - -def _edge_kinds_for_metadata_group(group_name: str) -> set[EdgeKind]: - lowered = group_name.lower() - if any(token in lowered for token in ("реквизит", "измерен", "ресурс", "граф", "значен")): - return {EdgeKind.HAS_ATTRIBUTE} - if "таблич" in lowered: - return {EdgeKind.HAS_TABULAR_SECTION} - if "форм" in lowered: - return {EdgeKind.HAS_FORM} - if "команд" in lowered: - return {EdgeKind.HAS_COMMAND} - if "права" in lowered: - return {EdgeKind.HAS_ROLE} - if "модул" in lowered: - return {EdgeKind.CONTAINS} - if "процедур" in lowered or "функц" in lowered: - return {EdgeKind.DECLARES} - return set() - - -def _metadata_leaf_tree_node(snapshot: SirSnapshot, node) -> MetadataTreeNodeResponse: - child_groups = [] - if node.kind == NodeKind.TABULAR_SECTION: - child_groups.append("Реквизиты") - if node.kind == NodeKind.FORM: - child_groups.extend(["Элементы", "Команды", "События", "Модуль формы"]) - if node.kind == NodeKind.MODULE: - child_groups.extend(["Процедуры", "Функции"]) - return _metadata_tree_node( - node.lineage_id, - node.name, - node.kind.value, - _metadata_icon_for_node_kind(node.kind), - qualified_name=node.qualified_name, - children=[ - _metadata_tree_node( - f"{node.lineage_id}.{group}", - group, - "METADATA_CHILD_GROUP", - _metadata_icon_for_child_group(group), - has_more=True, - ) - for group in child_groups - ], - count=len(child_groups), - ) - - -def _metadata_node_for_search_result( - snapshot: SirSnapshot, - node, - child_count_index: dict[str, Counter[EdgeKind]] | None = None, -) -> MetadataTreeNodeResponse: - spec = _metadata_spec_for_node(node) - if spec: - return _metadata_object_tree_node(snapshot, node, spec, child_count_index) - return _metadata_leaf_tree_node(snapshot, node) _FLOWCHART_KIND_LABELS = { @@ -5289,293 +4412,6 @@ def _flowchart_logical_edge_label(edge_kind: EdgeKind, source, target, logical_s return base -def _metadata_tree_path_for_node(snapshot: SirSnapshot, node_id: str) -> list[str]: - node = _find_snapshot_node(snapshot, node_id) - if node is None: - group_owner_id, group_name = _split_metadata_group_node_id(node_id) - if group_owner_id and group_name: - owner_node = _find_snapshot_node(snapshot, group_owner_id) - if owner_node is None: - return [] - return [*_metadata_tree_path_for_node(snapshot, group_owner_id), node_id] - return [] - - spec = _metadata_spec_for_node(node) - if spec is not None: - common_branch = _common_branch_label_for_spec(spec.code) - if common_branch: - return ["main-configuration", "common", f"common.{common_branch}", node.lineage_id] - return ["main-configuration", f"branch.{spec.code}", node.lineage_id] - - parent_edges = [edge for edge in snapshot.edges if edge.target_lineage == node.lineage_id] - parent_edge = next((edge for edge in parent_edges if edge.attributes.get("link_type") == "FORM_MODULE"), None) - if parent_edge is None: - parent_edge = next(iter(parent_edges), None) - if parent_edge is None: - return [node.lineage_id] - owner_node = _find_snapshot_node(snapshot, parent_edge.source_lineage) - if owner_node is None: - return [node.lineage_id] - group_name = _metadata_group_name_for_edge(parent_edge) - return [*_metadata_tree_path_for_node(snapshot, owner_node.lineage_id), f"{owner_node.lineage_id}.{group_name}", node.lineage_id] - - -def _metadata_tree_path_steps(snapshot: SirSnapshot, path: list[str]) -> list[MetadataTreePathStepResponse]: - return [ - MetadataTreePathStepResponse( - parent_id=parent_id, - child_id=child_id, - offset=_metadata_tree_child_offset(snapshot, parent_id, child_id), - ) - for parent_id, child_id in zip(path, path[1:]) - ] - - -def _metadata_tree_child_offset(snapshot: SirSnapshot, parent_id: str, child_id: str) -> int: - children = _metadata_tree_child_ids(snapshot, parent_id) - try: - return children.index(child_id) - except ValueError: - return 0 - - -def _metadata_tree_child_ids(snapshot: SirSnapshot, parent_id: str) -> list[str]: - if parent_id == "main-configuration": - return [ - "main-configuration.info", - "common", - *[ - f"branch.{spec.code}" - for spec in METADATA_TYPE_SPECS - if spec.code not in {"COMMON", "EXTENSION"} and spec.tree_branch not in COMMON_BRANCH_CHILDREN - ], - ] - if parent_id == "common": - return [f"common.{label}" for label in COMMON_BRANCH_CHILDREN] - if parent_id.startswith("branch."): - spec_code = parent_id.removeprefix("branch.") - spec = _metadata_spec_by_code(spec_code) - if spec is None: - return [] - return [ - node.lineage_id - for node in sorted( - [node for node in snapshot.nodes if _node_matches_metadata_spec(node, spec)], - key=lambda item: item.qualified_name, - ) - ] - if parent_id.startswith("common."): - spec_code = _common_branch_spec_code(parent_id.removeprefix("common.")) - spec = _metadata_spec_by_code(spec_code) - if spec is None: - return [] - return [ - node.lineage_id - for node in sorted( - [node for node in snapshot.nodes if _node_matches_metadata_spec(node, spec)], - key=lambda item: item.qualified_name, - ) - ] - - group_owner_id, group_name = _split_metadata_group_node_id(parent_id) - if group_owner_id and group_name: - return _metadata_child_group_child_ids(snapshot, group_owner_id, group_name) - - owner_node = _find_snapshot_node(snapshot, parent_id) - if owner_node is None: - return [] - spec = _metadata_spec_for_node(owner_node) - if spec is None: - return [] - return [ - f"{owner_node.lineage_id}.{group}" - for group in (*spec.child_groups, *spec.module_kinds, "Версии", "Проверки", "Инциденты", "Знания") - ] - - -def _metadata_child_group_child_ids(snapshot: SirSnapshot, owner_id: str, group_name: str) -> list[str]: - edge_kinds = _edge_kinds_for_metadata_group(group_name) - if not edge_kinds: - return [] - target_ids = [ - edge.target_lineage - for edge in snapshot.edges - if edge.source_lineage == owner_id and edge.kind in edge_kinds - ] - nodes_by_id = {node.lineage_id: node for node in snapshot.nodes} - return [ - node.lineage_id - for node in sorted( - [nodes_by_id[target_id] for target_id in target_ids if target_id in nodes_by_id], - key=lambda item: item.qualified_name, - ) - ] - - -def _common_branch_label_for_spec(spec_code: str) -> str | None: - if spec_code in {"COMMON", "EXTENSION"}: - return None - spec = METADATA_TYPE_BY_CODE.get(spec_code) - if spec is not None and spec.tree_branch in COMMON_BRANCH_CHILDREN: - return spec.tree_branch - for label in COMMON_BRANCH_CHILDREN: - branch = METADATA_TYPE_BY_BRANCH.get(label) - if branch is not None and branch.code == spec_code and branch.code not in {"COMMON", "EXTENSION"}: - return label - return None - - -def _metadata_group_name_for_edge(edge) -> str: - edge_kind = getattr(edge, "kind", edge) - if edge_kind == EdgeKind.CONTAINS and getattr(edge, "attributes", {}).get("link_type") == "FORM_MODULE": - return "Модуль формы" - return { - EdgeKind.HAS_ATTRIBUTE: "Реквизиты", - EdgeKind.HAS_TABULAR_SECTION: "Табличные части", - EdgeKind.HAS_FORM: "Формы", - EdgeKind.HAS_COMMAND: "Команды", - EdgeKind.HAS_ROLE: "Права", - EdgeKind.CONTAINS: "Модуль", - EdgeKind.DECLARES: "Процедуры", - }.get(edge_kind, "Связи") - - -def _is_metadata_tree_search_node(node) -> bool: - return _is_top_level_metadata_node(node) or node.kind in { - NodeKind.ATTRIBUTE, - NodeKind.COMMAND, - NodeKind.FORM, - NodeKind.FORM_ELEMENT, - NodeKind.MODULE, - NodeKind.PROCEDURE, - NodeKind.FUNCTION, - NodeKind.ROLE, - NodeKind.TABULAR_SECTION, - } - - -def _metadata_search_rank(node, normalized_query: str) -> tuple[int, int, str]: - name = node.name.casefold() - qualified = node.qualified_name.casefold() - if name == normalized_query: - relevance = 0 - elif name.startswith(normalized_query): - relevance = 1 - elif normalized_query in name: - relevance = 2 - elif qualified.startswith(normalized_query): - relevance = 3 - else: - relevance = 4 - depth = 0 if _is_top_level_metadata_node(node) else 1 - return relevance, depth, node.qualified_name - - -def _metadata_child_group_count(snapshot: SirSnapshot, owner_id: str, group_name: str) -> int: - edge_kinds = _edge_kinds_for_metadata_group(group_name) - if not edge_kinds: - return 0 - return sum( - 1 - for edge in snapshot.edges - if edge.source_lineage == owner_id - and edge.kind in edge_kinds - and (edge.kind != EdgeKind.CONTAINS or edge.attributes.get("link_type") in {"METADATA_MODULE", "FORM_MODULE"}) - ) - - -def _metadata_child_group_count_from_index( - child_count_index: dict[str, Counter[EdgeKind]] | None, - owner_id: str, - group_name: str, -) -> int: - edge_kinds = _edge_kinds_for_metadata_group(group_name) - if not edge_kinds or child_count_index is None: - return 0 - counts = child_count_index.get(owner_id) - if counts is None: - return 0 - return sum(counts[kind] for kind in edge_kinds) - - -def _metadata_child_count_index(snapshot: SirSnapshot, owner_ids: list[str]) -> dict[str, Counter[EdgeKind]]: - owner_set = set(owner_ids) - result: dict[str, Counter[EdgeKind]] = {owner_id: Counter() for owner_id in owner_set} - for edge in snapshot.edges: - if edge.source_lineage in owner_set and ( - edge.kind != EdgeKind.CONTAINS or edge.attributes.get("link_type") in {"METADATA_MODULE", "FORM_MODULE"} - ): - result[edge.source_lineage][edge.kind] += 1 - return result - - -def _find_snapshot_node(snapshot: SirSnapshot, lineage_id: str): - return next((node for node in snapshot.nodes if node.lineage_id == lineage_id), None) - - -def _metadata_spec_for_node(node): - for spec in METADATA_TYPE_SPECS: - if _node_matches_metadata_spec(node, spec): - return spec - return None - - -def _metadata_icon_for_node_kind(kind: NodeKind) -> str: - return _METADATA_ICON_BY_NODE_KIND.get(kind, "folder") - - -def _node_matches_metadata_spec(node, spec) -> bool: - if not _is_top_level_metadata_node(node): - return False - allowed_kinds = _METADATA_SPEC_NODE_KINDS.get(spec.code) - if allowed_kinds is not None and node.kind not in allowed_kinds: - return False - prefix = _METADATA_SPEC_PREFIXES.get(spec.code) - return bool(prefix and node.qualified_name.casefold().startswith(prefix.casefold())) - - -def _is_top_level_metadata_node(node) -> bool: - return node.qualified_name.count(".") == 1 - - -def _metadata_icon_for_common_branch(label: str) -> str: - lowered = label.lower() - if "планы обмена" in lowered or "план обмена" in lowered: - return "exchange-plan" - if "подписки на события" in lowered or "подписка на событие" in lowered: - return "event" - if "регламентные задания" in lowered or "регламентное задание" in lowered: - return "scheduled-job" - if "модул" in label.lower(): - return "module" - if "форм" in label.lower(): - return "form" - if "команд" in label.lower(): - return "command" - if "web" in label.lower() or "http" in label.lower() or "сервис" in label.lower(): - return "web" - if "роль" in label.lower(): - return "role" - return "folder" - - -def _metadata_icon_for_child_group(label: str) -> str: - lowered = label.lower() - if "реквизит" in lowered or "измерен" in lowered or "ресурс" in lowered: - return "attribute" - if "таблич" in lowered: - return "table" - if "форм" in lowered: - return "form" - if "команд" in lowered: - return "command" - if "модуль" in lowered: - return "module" - if "права" in lowered: - return "role" - return "folder" - - _METADATA_OBJECT_KIND_PREFIXES = { "COMMON_MODULE": "ОбщийМодуль", "COMMON MODULE": "ОбщийМодуль", diff --git a/services/api-server/src/api_server/metadata_tree_builder.py b/services/api-server/src/api_server/metadata_tree_builder.py new file mode 100644 index 0000000..ca4140e --- /dev/null +++ b/services/api-server/src/api_server/metadata_tree_builder.py @@ -0,0 +1,1178 @@ +from __future__ import annotations + +from collections import Counter + +from fastapi import HTTPException + +from api_server.metadata_tree_models import MetadataTreeNodeResponse, MetadataTreePathStepResponse +from one_c_normalizer import ( + COMMON_BRANCH_CHILDREN, + METADATA_TYPE_BY_BRANCH, + METADATA_TYPE_BY_CODE, + METADATA_TYPE_SPECS, + NormalizedProject, +) +from sir import EdgeKind, NodeKind, SirSnapshot, stable_hash + +_EVENT_SUBSCRIPTION_KIND = getattr(NodeKind, "EVENT_SUBSCRIPTION", None) + +_METADATA_ICON_BY_NODE_KIND = { + NodeKind.EXCHANGE_PLAN: "exchange-plan", + NodeKind.SCHEDULED_JOB: "scheduled-job", + NodeKind.ATTRIBUTE: "attribute", + NodeKind.COMMAND: "command", + NodeKind.FORM: "form", + NodeKind.FORM_ELEMENT: "form", + NodeKind.ROLE: "role", + NodeKind.TABULAR_SECTION: "tabular", +} +if _EVENT_SUBSCRIPTION_KIND is not None: + _METADATA_ICON_BY_NODE_KIND[_EVENT_SUBSCRIPTION_KIND] = "event" + + +_METADATA_SPEC_PREFIXES = { + "COMMON_MODULE": "ОбщийМодуль.", + "CONSTANT": "Константа.", + "CATALOG": "Справочник.", + "DOCUMENT": "Документ.", + "DOCUMENT_JOURNAL": "ЖурналДокументов.", + "ENUM": "Перечисление.", + "REPORT": "Отчет.", + "DATA_PROCESSOR": "Обработка.", + "CHART_OF_CHARACTERISTIC_TYPES": "ПланВидовХарактеристик.", + "CHART_OF_ACCOUNTS": "ПланСчетов.", + "CHART_OF_CALCULATION_TYPES": "ПланВидовРасчета.", + "INFORMATION_REGISTER": "РегистрСведений.", + "ACCUMULATION_REGISTER": "РегистрНакопления.", + "ACCOUNTING_REGISTER": "РегистрБухгалтерии.", + "CALCULATION_REGISTER": "РегистрРасчета.", + "BUSINESS_PROCESS": "БизнесПроцесс.", + "TASK": "Задача.", + "EXTERNAL_DATA_SOURCE": "ВнешнийИсточникДанных.", + "EXCHANGE_PLAN": "ПланОбмена.", + "EVENT_SUBSCRIPTION": "ПодпискаНаСобытие.", + "SCHEDULED_JOB": "РегламентноеЗадание.", +} + + +_METADATA_SPEC_NODE_KINDS = { + "COMMON_MODULE": {NodeKind.COMMON_MODULE}, + "CATALOG": {NodeKind.CATALOG}, + "DOCUMENT": {NodeKind.DOCUMENT}, + "INFORMATION_REGISTER": {NodeKind.REGISTER}, + "ACCUMULATION_REGISTER": {NodeKind.REGISTER}, + "ACCOUNTING_REGISTER": {NodeKind.REGISTER}, + "CALCULATION_REGISTER": {NodeKind.REGISTER}, + "BUSINESS_PROCESS": {NodeKind.BUSINESS_PROCESS}, + "TASK": {NodeKind.TASK}, + "EXCHANGE_PLAN": {NodeKind.EXCHANGE_PLAN}, + "SCHEDULED_JOB": {NodeKind.SCHEDULED_JOB}, +} +if _EVENT_SUBSCRIPTION_KIND is not None: + _METADATA_SPEC_NODE_KINDS["EVENT_SUBSCRIPTION"] = {_EVENT_SUBSCRIPTION_KIND} + + +def _common_branch_spec_code(label: str) -> str | None: + spec = METADATA_TYPE_BY_BRANCH.get(label) + if spec is None: + return None + if spec.code in {"COMMON", "EXTENSION"}: + return None + return spec.code + + +def _metadata_spec_by_code(spec_code: str): + return METADATA_TYPE_BY_CODE.get(spec_code) + + +def _metadata_tree_node( + node_id: str, + label: str, + kind: str, + icon: str, + *, + qualified_name: str | None = None, + children: list[MetadataTreeNodeResponse] | None = None, + count: int | None = None, + has_more: bool = False, +) -> MetadataTreeNodeResponse: + child_nodes = children or [] + return MetadataTreeNodeResponse( + id=node_id, + label=label, + kind=kind, + icon=icon, + qualified_name=qualified_name, + count=count if count is not None else sum(child.count or 1 for child in child_nodes), + loaded_count=len(child_nodes), + has_more=has_more, + children=child_nodes, + ) + + +def _configuration_like_empty_root(root_id: str, label: str, kind: str) -> MetadataTreeNodeResponse: + common_children = [ + _metadata_tree_node(f"{root_id}.common.{label}", label, "COMMON_BRANCH", _metadata_icon_for_common_branch(label)) + for label in COMMON_BRANCH_CHILDREN + ] + spec_branches = [ + _metadata_tree_node(f"{root_id}.branch.{spec.code}", spec.tree_branch, spec.code, spec.icon, count=0) + for spec in METADATA_TYPE_SPECS + if spec.code not in {"COMMON", "EXTENSION"} and spec.tree_branch not in COMMON_BRANCH_CHILDREN + ] + return _metadata_tree_node( + root_id, + label, + kind, + "configuration", + children=[ + _metadata_tree_node(f"{root_id}.info", "Сведения", "CONFIGURATION_INFO", "configuration"), + _metadata_tree_node(f"{root_id}.common", "Общие", "COMMON", "common", children=common_children), + *spec_branches, + ], + ) + + +def _metadata_common_branch( + snapshot: SirSnapshot, + label: str, + object_limit_per_branch: int, +) -> MetadataTreeNodeResponse: + spec_code = _common_branch_spec_code(label) + if spec_code is None: + return _metadata_tree_node( + f"common.{label}", + label, + "COMMON_BRANCH", + _metadata_icon_for_common_branch(label), + count=0, + has_more=False, + ) + + spec = _metadata_spec_by_code(spec_code) + if spec is None: + return _metadata_tree_node( + f"common.{label}", + label, + "COMMON_BRANCH", + _metadata_icon_for_common_branch(label), + count=0, + has_more=False, + ) + + object_nodes = sorted( + [node for node in snapshot.nodes if _node_matches_metadata_spec(node, spec)], + key=lambda item: item.qualified_name, + ) + visible_nodes = object_nodes[:object_limit_per_branch] if object_limit_per_branch > 0 else [] + child_count_index = _metadata_child_count_index(snapshot, [node.lineage_id for node in visible_nodes]) + children = [_metadata_object_tree_node(snapshot, node, spec, child_count_index) for node in visible_nodes] + return _metadata_tree_node( + f"common.{label}", + label, + "COMMON_BRANCH", + _metadata_icon_for_common_branch(label), + children=children, + count=len(object_nodes), + has_more=len(visible_nodes) < len(object_nodes), + ) + + +def _project_metadata_tree_response( + snapshot: SirSnapshot, + object_limit_per_branch: int = 200, + *, + extra_roots: list[MetadataTreeNodeResponse] | None = None, +) -> MetadataTreeNodeResponse: + common_children = [ + _metadata_common_branch(snapshot, label, object_limit_per_branch) + for label in COMMON_BRANCH_CHILDREN + ] + spec_branches = [ + _metadata_tree_branch_for_spec(snapshot, spec, object_limit_per_branch) + for spec in METADATA_TYPE_SPECS + if spec.code not in {"COMMON", "EXTENSION"} and spec.tree_branch not in COMMON_BRANCH_CHILDREN + ] + main_configuration = _metadata_tree_node( + "main-configuration", + "Основная конфигурация", + "MAIN_CONFIGURATION", + "configuration", + children=[ + _metadata_tree_node("main-configuration.info", "Сведения", "CONFIGURATION_INFO", "configuration"), + _metadata_tree_node("common", "Общие", "COMMON", "common", children=common_children), + *spec_branches, + ], + ) + sfera = _metadata_tree_node( + "sfera", + "SFERA", + "SFERA_ROOT", + "service", + children=[ + _metadata_tree_node("sfera-objects", "Наши объекты SFERA", "SFERA_OBJECTS", "service"), + *[ + _metadata_tree_node(f"sfera.{label}", label, "SFERA_SECTION", "service") + for label in ("Блок-схема", "Задачи разработки", "Проверки", "Версии", "Инциденты", "Знания", "Настройки") + ], + ], + ) + return _metadata_tree_node( + f"project.{snapshot.project_id}", + "Проект", + "PROJECT", + "project", + children=[ + main_configuration, + _metadata_tree_node("extension.placeholder", "Расширение: <Имя>", "EXTENSION", "extension"), + *(extra_roots or []), + sfera, + _metadata_tree_node("environments", "Среды", "ENVIRONMENTS", "service"), + ], + ) + + +def _project_metadata_tree_response_from_normalized( + normalized: NormalizedProject, + object_limit_per_branch: int = 200, + *, + extra_roots: list[MetadataTreeNodeResponse] | None = None, +) -> MetadataTreeNodeResponse: + main_configuration = _normalized_configuration_tree_root( + "main-configuration", + "Основная конфигурация", + "MAIN_CONFIGURATION", + normalized.configuration.groups, + object_limit_per_branch, + ) + visible_extensions = ( + normalized.configuration.extensions[:object_limit_per_branch] + if object_limit_per_branch > 0 + else normalized.configuration.extensions + ) + extension_roots = [ + _normalized_configuration_tree_root( + f"extension.{stable_hash(extension.name)}", + f"Расширение: {extension.name}", + "EXTENSION", + extension.groups, + object_limit_per_branch, + icon="extension", + info={ + "qualified_name": extension.qualified_name, + "version": extension.version, + **dict(getattr(extension, "attributes", {}) or {}), + }, + ) + for extension in visible_extensions + ] + if not extension_roots: + extension_roots = [ + _metadata_tree_node("extension.placeholder", "Расширение: <Имя>", "EXTENSION", "extension") + ] + sfera = _metadata_tree_node( + "sfera", + "SFERA", + "SFERA_ROOT", + "service", + children=[ + _metadata_tree_node("sfera-objects", "Наши объекты SFERA", "SFERA_OBJECTS", "service"), + *[ + _metadata_tree_node(f"sfera.{label}", label, "SFERA_SECTION", "service") + for label in ("Блок-схема", "Задачи разработки", "Проверки", "Версии", "Инциденты", "Знания", "Настройки") + ], + ], + ) + return _metadata_tree_node( + f"project.{normalized.project_id or 'normalized'}", + "Проект", + "PROJECT", + "project", + children=[ + main_configuration, + *extension_roots, + *(extra_roots or []), + sfera, + _metadata_tree_node("environments", "Среды", "ENVIRONMENTS", "service"), + ], + ) + + +def _normalized_configuration_tree_root( + root_id: str, + label: str, + kind: str, + groups, + object_limit_per_branch: int, + *, + icon: str = "configuration", + info: dict | None = None, +) -> MetadataTreeNodeResponse: + common_children = [ + _normalized_common_branch(groups, common_label, object_limit_per_branch, root_id=root_id) + for common_label in COMMON_BRANCH_CHILDREN + ] + group_branches = [ + _normalized_group_tree_branch(group, object_limit_per_branch, root_id=root_id) + for group in groups + if group.name != "Расширения" and group.name not in COMMON_BRANCH_CHILDREN + ] + info_label = "Сведения" + if info: + version = info.get("version") or info.get("Version") + if version: + info_label = f"Сведения ({version})" + return _metadata_tree_node( + root_id, + label, + kind, + icon, + children=[ + _metadata_tree_node(f"{root_id}.info", info_label, "CONFIGURATION_INFO", icon), + _metadata_tree_node(f"{root_id}.common", "Общие", "COMMON", "common", children=common_children), + *group_branches, + ], + ) + + +def _normalized_group_tree_branch(group, object_limit_per_branch: int, *, root_id: str = "main-configuration") -> MetadataTreeNodeResponse: + visible_objects = group.objects[:object_limit_per_branch] if object_limit_per_branch > 0 else [] + node_id = _normalized_branch_node_id(root_id, group.name) + return _metadata_tree_node( + node_id, + group.name, + ",".join(group.object_kinds), + "metadata", + children=[_normalized_object_tree_node(item) for item in visible_objects], + count=len(group.objects), + has_more=len(visible_objects) < len(group.objects), + ) + + +def _normalized_common_branch(groups, label: str, object_limit_per_branch: int, *, root_id: str = "main-configuration") -> MetadataTreeNodeResponse: + node_id = _normalized_common_node_id(root_id, label) + group = next((item for item in groups if item.name == label), None) + if group is None: + return _metadata_tree_node( + node_id, + label, + "COMMON_BRANCH", + _metadata_icon_for_common_branch(label), + count=0, + has_more=False, + ) + + visible_objects = group.objects[:object_limit_per_branch] if object_limit_per_branch > 0 else [] + return _metadata_tree_node( + node_id, + label, + "COMMON_BRANCH", + _metadata_icon_for_common_branch(label), + children=[_normalized_object_tree_node(item) for item in visible_objects], + count=len(group.objects), + has_more=len(visible_objects) < len(group.objects), + ) + + +def _normalized_object_tree_node(item) -> MetadataTreeNodeResponse: + spec = METADATA_TYPE_BY_CODE.get(item.object_kind) + expected_groups = spec.child_groups if spec is not None else _NORMALIZED_CHILD_GROUPS_BY_LABEL.keys() + child_groups = [ + (label, len(getattr(item, field_name, [])), icon) + for label in expected_groups + for field_name, icon in [_NORMALIZED_CHILD_GROUPS_BY_LABEL.get(label, ("", "metadata"))] + if field_name and len(getattr(item, field_name, [])) > 0 + ] + children = [ + _metadata_tree_node( + f"normalized.{stable_hash(item.qualified_name)}.{label}", + label, + "METADATA_CHILD_GROUP", + icon, + count=count, + has_more=count > 0, + ) + for label, count, icon in child_groups + if count > 0 + ] + return _metadata_tree_node( + f"normalized.{stable_hash(item.qualified_name)}", + item.name, + item.object_kind, + _normalized_object_icon(item.object_kind), + qualified_name=item.qualified_name, + children=children, + count=len(children), + ) + + +def _normalized_metadata_tree_children_for_node( + normalized: NormalizedProject, + *, + node_id: str, + offset: int, + limit: int, +) -> tuple[list[MetadataTreeNodeResponse], int] | None: + if node_id.startswith("normalized.branch."): + group = _normalized_group_by_branch_id(normalized, node_id) + if group is None: + raise HTTPException(status_code=404, detail=f"Metadata branch not found: {node_id}") + return _slice_normalized_objects(group.objects, offset, limit) + + if node_id.startswith("common."): + group = _normalized_group_by_name(normalized, node_id.removeprefix("common.")) + if group is None: + return [], 0 + return _slice_normalized_objects(group.objects, offset, limit) + + extension_scope = _normalized_extension_scope_for_node(normalized, node_id) + if extension_scope is not None: + extension, scoped_id = extension_scope + if scoped_id.startswith("branch."): + group = _normalized_group_by_branch_id_for_groups(extension.groups, scoped_id) + if group is None: + raise HTTPException(status_code=404, detail=f"Extension metadata branch not found: {node_id}") + return _slice_normalized_objects(group.objects, offset, limit) + if scoped_id.startswith("common."): + group = _normalized_group_by_name_for_groups(extension.groups, scoped_id.removeprefix("common.")) + if group is None: + return [], 0 + return _slice_normalized_objects(group.objects, offset, limit) + + child_group = _normalized_child_group_for_node(normalized, node_id) + if child_group is not None: + parts, icon = child_group + page = parts[offset : offset + limit] + return [_normalized_part_tree_node(part, icon) for part in page], len(parts) + + if node_id.startswith("normalized."): + raise HTTPException(status_code=404, detail=f"Metadata tree node not found: {node_id}") + + return None + + +def _normalized_group_by_branch_id(normalized: NormalizedProject, node_id: str): + return _normalized_group_by_branch_id_for_groups(normalized.configuration.groups, node_id.removeprefix("normalized.")) + + +def _normalized_group_by_branch_id_for_groups(groups, node_id: str): + return next( + ( + group + for group in groups + if node_id == f"branch.{stable_hash(group.name)}" + ), + None, + ) + + +def _normalized_group_by_name(normalized: NormalizedProject, name: str): + return _normalized_group_by_name_for_groups(normalized.configuration.groups, name) + + +def _normalized_group_by_name_for_groups(groups, name: str): + return next((group for group in groups if group.name == name), None) + + +def _normalized_extension_scope_for_node(normalized: NormalizedProject, node_id: str): + prefix = "extension." + if not node_id.startswith(prefix): + return None + payload = node_id.removeprefix(prefix) + if "." not in payload: + return None + extension_hash, scoped_id = payload.split(".", 1) + extension = next( + (item for item in normalized.configuration.extensions if stable_hash(item.name) == extension_hash), + None, + ) + if extension is None: + return None + return extension, scoped_id + + +def _normalized_branch_node_id(root_id: str, group_name: str) -> str: + if root_id == "main-configuration": + return f"normalized.branch.{stable_hash(group_name)}" + return f"{root_id}.branch.{stable_hash(group_name)}" + + +def _normalized_common_node_id(root_id: str, label: str) -> str: + if root_id == "main-configuration": + return f"common.{label}" + return f"{root_id}.common.{label}" + + +def _slice_normalized_objects(objects, offset: int, limit: int) -> tuple[list[MetadataTreeNodeResponse], int]: + page = objects[offset : offset + limit] + return [_normalized_object_tree_node(item) for item in page], len(objects) + + +def _normalized_child_group_for_node(normalized: NormalizedProject, node_id: str): + prefix = "normalized." + if not node_id.startswith(prefix): + return None + payload = node_id.removeprefix(prefix) + if "." not in payload: + return None + object_hash, label = payload.split(".", 1) + item = next( + ( + candidate + for group in _normalized_all_groups(normalized) + for candidate in group.objects + if stable_hash(candidate.qualified_name) == object_hash + ), + None, + ) + if item is None: + return None + group = _NORMALIZED_CHILD_GROUPS_BY_LABEL.get(label) + if group is None: + return None + field_name, icon = group + return getattr(item, field_name, []), icon + + +def _normalized_all_groups(normalized: NormalizedProject): + groups = list(normalized.configuration.groups) + for extension in normalized.configuration.extensions: + groups.extend(extension.groups) + return groups + + +_NORMALIZED_CHILD_GROUPS_BY_LABEL = { + "Реквизиты": ("attributes", "attribute"), + "Измерения": ("dimensions", "attribute"), + "Ресурсы": ("resources", "attribute"), + "Табличные части": ("tabular_sections", "table"), + "Формы": ("forms", "form"), + "Команды": ("commands", "command"), + "Макеты": ("layouts", "layout"), + "Табличные документы": ("tabular_documents", "table"), + "СКД": ("data_composition_schemas", "layout"), + "Варианты отчета": ("report_variants", "report"), + "Настройки": ("report_settings", "settings"), + "Хранилище вариантов": ("report_storages", "settings"), + "Хранилище настроек": ("report_storages", "settings"), + "Справка": ("help_items", "common"), + "Модули": ("modules", "module"), + "Модуль": ("modules", "module"), + "Модуль формы": ("modules", "module"), + "Модуль объекта": ("modules", "module"), + "Модуль менеджера": ("modules", "module"), + "Модуль набора записей": ("modules", "module"), + "Права": ("rights", "role"), + "События": ("events", "event"), + "Движения": ("movements", "movement"), + "Значения": ("values", "enum"), + "Предопределенные данные": ("predefined", "metadata"), + "Шаблоны URL": ("url_templates", "http"), + "Методы": ("methods", "module"), + "Метод": ("methods", "module"), + "Операции": ("operations", "service"), + "Параметры": ("parameters", "attribute"), + "Каналы": ("channels", "service"), + "Сообщения": ("messages", "service"), + "Таблицы": ("tables", "table"), + "Кубы": ("cubes", "table"), + "Функции": ("methods", "module"), + "Графы": ("fields", "attribute"), + "Элементы": ("fields", "attribute"), +} + + +def _normalized_part_tree_node(part, fallback_icon: str) -> MetadataTreeNodeResponse: + qualified_name = part.qualified_name or part.name + children = [_normalized_part_tree_node(child, _normalized_part_icon(child, fallback_icon)) for child in getattr(part, "children", [])] + part_key = f"{qualified_name}.{part.kind}.{part.source_path or ''}" + return _metadata_tree_node( + f"normalized.part.{stable_hash(part_key)}", + part.name, + part.kind, + _normalized_part_icon(part, fallback_icon), + qualified_name=qualified_name, + children=children, + count=len(children), + ) + + +def _normalized_part_icon(part, fallback_icon: str) -> str: + kind = getattr(part, "kind", "").upper() + if kind == "FORM": + return "form" + if kind == "COMMAND": + return "command" + if kind == "MODULE": + return "module" + if kind in {"METHOD", "OPERATION"}: + return "module" + if kind in {"DATA_COMPOSITION_SCHEMA", "LAYOUT"}: + return "layout" + if kind in {"REPORT_VARIANT", "REPORT"}: + return "report" + if kind in {"REPORT_SETTING", "REPORT_STORAGE"}: + return "settings" + if kind == "HELP": + return "common" + if kind in {"URL_TEMPLATE", "HTTP_SERVICE"}: + return "http" + if kind in {"PARAMETER", "FIELD", "DIMENSION", "RESOURCE"}: + return "attribute" + if kind in {"ENUM_VALUE", "PREDEFINED"}: + return "enum" + if kind in {"CHANNEL", "MESSAGE"}: + return "service" + if kind in {"RIGHTS", "ROLE"}: + return "role" + if "TABULAR" in kind: + return "table" + if "EVENT" in kind: + return "event" + return fallback_icon + + +def _normalized_object_icon(object_kind: str) -> str: + kind = object_kind.upper() + if "EXCHANGE_PLAN" in kind or "ПЛАНОБМЕНА" in kind: + return "exchange-plan" + if "SCHEDULED_JOB" in kind or "РЕГЛАМЕНТНОЕЗАДАНИЕ" in kind: + return "scheduled-job" + if "EVENT_SUBSCRIPTION" in kind or "ПОДПИСКА" in kind: + return "event" + if "CATALOG" in kind: + return "catalog" + if "DOCUMENT" in kind: + return "document" + if "REGISTER" in kind: + return "register" + if "REPORT" in kind: + return "report" + if "PROCESSING" in kind: + return "processing" + if "FORM" in kind: + return "form" + if "COMMAND" in kind: + return "command" + if "TASK" in kind: + return "task" + if "ENUM" in kind: + return "enum" + if "COMMON MODULE" in kind or "COMMON_MODULE" in kind: + return "module" + if "EXTENSION" in kind: + return "extension" + return "metadata" + + +def _metadata_tree_branch_for_spec(snapshot: SirSnapshot, spec, object_limit_per_branch: int) -> MetadataTreeNodeResponse: + object_nodes = sorted( + [node for node in snapshot.nodes if _node_matches_metadata_spec(node, spec)], + key=lambda item: item.qualified_name, + ) + visible_nodes = object_nodes[:object_limit_per_branch] if object_limit_per_branch > 0 else [] + child_count_index = _metadata_child_count_index(snapshot, [node.lineage_id for node in visible_nodes]) + children = [_metadata_object_tree_node(snapshot, node, spec, child_count_index) for node in visible_nodes] + return _metadata_tree_node( + f"branch.{spec.code}", + spec.tree_branch, + spec.code, + spec.icon, + children=children, + count=len(object_nodes), + has_more=len(visible_nodes) < len(object_nodes), + ) + + +def _metadata_object_tree_node(snapshot: SirSnapshot, node, spec, child_count_index: dict[str, Counter[EdgeKind]] | None = None) -> MetadataTreeNodeResponse: + groups = (*spec.child_groups, *spec.module_kinds, "Версии", "Проверки", "Инциденты", "Знания") + return _metadata_tree_node( + node.lineage_id, + node.name, + node.kind.value, + spec.icon, + qualified_name=node.qualified_name, + children=[ + _metadata_tree_node( + f"{node.lineage_id}.{group}", + group, + "METADATA_CHILD_GROUP", + _metadata_icon_for_child_group(group), + count=_metadata_child_group_count_from_index(child_count_index, node.lineage_id, group), + has_more=_metadata_child_group_count_from_index(child_count_index, node.lineage_id, group) > 0, + ) + for group in groups + ], + count=len(groups), + ) + + +def _metadata_tree_children_for_node( + snapshot: SirSnapshot, + *, + node_id: str, + offset: int, + limit: int, +) -> tuple[list[MetadataTreeNodeResponse], int]: + if node_id.startswith("branch."): + spec_code = node_id.removeprefix("branch.") + spec = _metadata_spec_by_code(spec_code) + if spec is None: + raise HTTPException(status_code=404, detail=f"Metadata branch not found: {node_id}") + object_nodes = sorted( + [node for node in snapshot.nodes if _node_matches_metadata_spec(node, spec)], + key=lambda item: item.qualified_name, + ) + page = object_nodes[offset : offset + limit] + child_count_index = _metadata_child_count_index(snapshot, [node.lineage_id for node in page]) + return [_metadata_object_tree_node(snapshot, node, spec, child_count_index) for node in page], len(object_nodes) + + if node_id == "common": + children = [ + _metadata_common_branch(snapshot, label, 200) + for label in COMMON_BRANCH_CHILDREN + ] + return _slice_metadata_children(children, offset, limit) + + if node_id.startswith("common."): + return _common_branch_children(snapshot, node_id.removeprefix("common."), offset, limit) + + group_owner_id, group_name = _split_metadata_group_node_id(node_id) + if group_owner_id and group_name: + return _metadata_child_group_children(snapshot, group_owner_id, group_name, offset, limit) + + owner_node = _find_snapshot_node(snapshot, node_id) + if owner_node is not None: + spec = _metadata_spec_for_node(owner_node) + if spec is None: + return [], 0 + children = [ + _metadata_tree_node( + f"{owner_node.lineage_id}.{group}", + group, + "METADATA_CHILD_GROUP", + _metadata_icon_for_child_group(group), + count=_metadata_child_group_count(snapshot, owner_node.lineage_id, group), + has_more=_metadata_child_group_count(snapshot, owner_node.lineage_id, group) > 0, + ) + for group in (*spec.child_groups, *spec.module_kinds, "Версии", "Проверки", "Инциденты", "Знания") + ] + return _slice_metadata_children(children, offset, limit) + + raise HTTPException(status_code=404, detail=f"Metadata tree node not found: {node_id}") + + +def _slice_metadata_children( + children: list[MetadataTreeNodeResponse], + offset: int, + limit: int, +) -> tuple[list[MetadataTreeNodeResponse], int]: + return children[offset : offset + limit], len(children) + + +def _common_branch_children( + snapshot: SirSnapshot, + label: str, + offset: int, + limit: int, +) -> tuple[list[MetadataTreeNodeResponse], int]: + spec_code = _common_branch_spec_code(label) + if spec_code is None: + return [], 0 + spec = _metadata_spec_by_code(spec_code) + if spec is None: + return [], 0 + object_nodes = sorted( + [node for node in snapshot.nodes if _node_matches_metadata_spec(node, spec)], + key=lambda item: item.qualified_name, + ) + page = object_nodes[offset : offset + limit] + return [_metadata_object_tree_node(snapshot, node, spec) for node in page], len(object_nodes) + + +def _split_metadata_group_node_id(node_id: str) -> tuple[str | None, str | None]: + known_groups = { + group + for spec in METADATA_TYPE_SPECS + for group in (*spec.child_groups, *spec.module_kinds, "Версии", "Проверки", "Инциденты", "Знания") + } + for group in sorted(known_groups, key=len, reverse=True): + suffix = f".{group}" + if node_id.endswith(suffix): + return node_id[: -len(suffix)], group + return None, None + + +def _metadata_child_group_children( + snapshot: SirSnapshot, + owner_id: str, + group_name: str, + offset: int, + limit: int, +) -> tuple[list[MetadataTreeNodeResponse], int]: + edge_kinds = _edge_kinds_for_metadata_group(group_name) + if not edge_kinds: + return [], 0 + target_ids = [ + edge.target_lineage + for edge in snapshot.edges + if edge.source_lineage == owner_id + and edge.kind in edge_kinds + and (edge.kind != EdgeKind.CONTAINS or edge.attributes.get("link_type") in {"METADATA_MODULE", "FORM_MODULE"}) + ] + nodes_by_id = {node.lineage_id: node for node in snapshot.nodes} + child_nodes = sorted( + [nodes_by_id[target_id] for target_id in target_ids if target_id in nodes_by_id], + key=lambda item: item.qualified_name, + ) + if group_name == "Процедуры": + child_nodes = [node for node in child_nodes if node.kind == NodeKind.PROCEDURE] + elif group_name == "Функции": + child_nodes = [node for node in child_nodes if node.kind == NodeKind.FUNCTION] + page = child_nodes[offset : offset + limit] + return [_metadata_leaf_tree_node(snapshot, node) for node in page], len(child_nodes) + + +def _edge_kinds_for_metadata_group(group_name: str) -> set[EdgeKind]: + lowered = group_name.lower() + if any(token in lowered for token in ("реквизит", "измерен", "ресурс", "граф", "значен")): + return {EdgeKind.HAS_ATTRIBUTE} + if "таблич" in lowered: + return {EdgeKind.HAS_TABULAR_SECTION} + if "форм" in lowered: + return {EdgeKind.HAS_FORM} + if "команд" in lowered: + return {EdgeKind.HAS_COMMAND} + if "права" in lowered: + return {EdgeKind.HAS_ROLE} + if "модул" in lowered: + return {EdgeKind.CONTAINS} + if "процедур" in lowered or "функц" in lowered: + return {EdgeKind.DECLARES} + return set() + + +def _metadata_leaf_tree_node(snapshot: SirSnapshot, node) -> MetadataTreeNodeResponse: + child_groups = [] + if node.kind == NodeKind.TABULAR_SECTION: + child_groups.append("Реквизиты") + if node.kind == NodeKind.FORM: + child_groups.extend(["Элементы", "Команды", "События", "Модуль формы"]) + if node.kind == NodeKind.MODULE: + child_groups.extend(["Процедуры", "Функции"]) + return _metadata_tree_node( + node.lineage_id, + node.name, + node.kind.value, + _metadata_icon_for_node_kind(node.kind), + qualified_name=node.qualified_name, + children=[ + _metadata_tree_node( + f"{node.lineage_id}.{group}", + group, + "METADATA_CHILD_GROUP", + _metadata_icon_for_child_group(group), + has_more=True, + ) + for group in child_groups + ], + count=len(child_groups), + ) + + +def _metadata_node_for_search_result( + snapshot: SirSnapshot, + node, + child_count_index: dict[str, Counter[EdgeKind]] | None = None, +) -> MetadataTreeNodeResponse: + spec = _metadata_spec_for_node(node) + if spec: + return _metadata_object_tree_node(snapshot, node, spec, child_count_index) + return _metadata_leaf_tree_node(snapshot, node) + + +def _metadata_tree_path_for_node(snapshot: SirSnapshot, node_id: str) -> list[str]: + node = _find_snapshot_node(snapshot, node_id) + if node is None: + group_owner_id, group_name = _split_metadata_group_node_id(node_id) + if group_owner_id and group_name: + owner_node = _find_snapshot_node(snapshot, group_owner_id) + if owner_node is None: + return [] + return [*_metadata_tree_path_for_node(snapshot, group_owner_id), node_id] + return [] + + spec = _metadata_spec_for_node(node) + if spec is not None: + common_branch = _common_branch_label_for_spec(spec.code) + if common_branch: + return ["main-configuration", "common", f"common.{common_branch}", node.lineage_id] + return ["main-configuration", f"branch.{spec.code}", node.lineage_id] + + parent_edges = [edge for edge in snapshot.edges if edge.target_lineage == node.lineage_id] + parent_edge = next((edge for edge in parent_edges if edge.attributes.get("link_type") == "FORM_MODULE"), None) + if parent_edge is None: + parent_edge = next(iter(parent_edges), None) + if parent_edge is None: + return [node.lineage_id] + owner_node = _find_snapshot_node(snapshot, parent_edge.source_lineage) + if owner_node is None: + return [node.lineage_id] + group_name = _metadata_group_name_for_edge(parent_edge) + return [*_metadata_tree_path_for_node(snapshot, owner_node.lineage_id), f"{owner_node.lineage_id}.{group_name}", node.lineage_id] + + +def _metadata_tree_path_steps(snapshot: SirSnapshot, path: list[str]) -> list[MetadataTreePathStepResponse]: + return [ + MetadataTreePathStepResponse( + parent_id=parent_id, + child_id=child_id, + offset=_metadata_tree_child_offset(snapshot, parent_id, child_id), + ) + for parent_id, child_id in zip(path, path[1:]) + ] + + +def _metadata_tree_child_offset(snapshot: SirSnapshot, parent_id: str, child_id: str) -> int: + children = _metadata_tree_child_ids(snapshot, parent_id) + try: + return children.index(child_id) + except ValueError: + return 0 + + +def _metadata_tree_child_ids(snapshot: SirSnapshot, parent_id: str) -> list[str]: + if parent_id == "main-configuration": + return [ + "main-configuration.info", + "common", + *[ + f"branch.{spec.code}" + for spec in METADATA_TYPE_SPECS + if spec.code not in {"COMMON", "EXTENSION"} and spec.tree_branch not in COMMON_BRANCH_CHILDREN + ], + ] + if parent_id == "common": + return [f"common.{label}" for label in COMMON_BRANCH_CHILDREN] + if parent_id.startswith("branch."): + spec_code = parent_id.removeprefix("branch.") + spec = _metadata_spec_by_code(spec_code) + if spec is None: + return [] + return [ + node.lineage_id + for node in sorted( + [node for node in snapshot.nodes if _node_matches_metadata_spec(node, spec)], + key=lambda item: item.qualified_name, + ) + ] + if parent_id.startswith("common."): + spec_code = _common_branch_spec_code(parent_id.removeprefix("common.")) + spec = _metadata_spec_by_code(spec_code) + if spec is None: + return [] + return [ + node.lineage_id + for node in sorted( + [node for node in snapshot.nodes if _node_matches_metadata_spec(node, spec)], + key=lambda item: item.qualified_name, + ) + ] + + group_owner_id, group_name = _split_metadata_group_node_id(parent_id) + if group_owner_id and group_name: + return _metadata_child_group_child_ids(snapshot, group_owner_id, group_name) + + owner_node = _find_snapshot_node(snapshot, parent_id) + if owner_node is None: + return [] + spec = _metadata_spec_for_node(owner_node) + if spec is None: + return [] + return [ + f"{owner_node.lineage_id}.{group}" + for group in (*spec.child_groups, *spec.module_kinds, "Версии", "Проверки", "Инциденты", "Знания") + ] + + +def _metadata_child_group_child_ids(snapshot: SirSnapshot, owner_id: str, group_name: str) -> list[str]: + edge_kinds = _edge_kinds_for_metadata_group(group_name) + if not edge_kinds: + return [] + target_ids = [ + edge.target_lineage + for edge in snapshot.edges + if edge.source_lineage == owner_id and edge.kind in edge_kinds + ] + nodes_by_id = {node.lineage_id: node for node in snapshot.nodes} + return [ + node.lineage_id + for node in sorted( + [nodes_by_id[target_id] for target_id in target_ids if target_id in nodes_by_id], + key=lambda item: item.qualified_name, + ) + ] + + +def _common_branch_label_for_spec(spec_code: str) -> str | None: + if spec_code in {"COMMON", "EXTENSION"}: + return None + spec = METADATA_TYPE_BY_CODE.get(spec_code) + if spec is not None and spec.tree_branch in COMMON_BRANCH_CHILDREN: + return spec.tree_branch + for label in COMMON_BRANCH_CHILDREN: + branch = METADATA_TYPE_BY_BRANCH.get(label) + if branch is not None and branch.code == spec_code and branch.code not in {"COMMON", "EXTENSION"}: + return label + return None + + +def _metadata_group_name_for_edge(edge) -> str: + edge_kind = getattr(edge, "kind", edge) + if edge_kind == EdgeKind.CONTAINS and getattr(edge, "attributes", {}).get("link_type") == "FORM_MODULE": + return "Модуль формы" + return { + EdgeKind.HAS_ATTRIBUTE: "Реквизиты", + EdgeKind.HAS_TABULAR_SECTION: "Табличные части", + EdgeKind.HAS_FORM: "Формы", + EdgeKind.HAS_COMMAND: "Команды", + EdgeKind.HAS_ROLE: "Права", + EdgeKind.CONTAINS: "Модуль", + EdgeKind.DECLARES: "Процедуры", + }.get(edge_kind, "Связи") + + +def _is_metadata_tree_search_node(node) -> bool: + return _is_top_level_metadata_node(node) or node.kind in { + NodeKind.ATTRIBUTE, + NodeKind.COMMAND, + NodeKind.FORM, + NodeKind.FORM_ELEMENT, + NodeKind.MODULE, + NodeKind.PROCEDURE, + NodeKind.FUNCTION, + NodeKind.ROLE, + NodeKind.TABULAR_SECTION, + } + + +def _metadata_search_rank(node, normalized_query: str) -> tuple[int, int, str]: + name = node.name.casefold() + qualified = node.qualified_name.casefold() + if name == normalized_query: + relevance = 0 + elif name.startswith(normalized_query): + relevance = 1 + elif normalized_query in name: + relevance = 2 + elif qualified.startswith(normalized_query): + relevance = 3 + else: + relevance = 4 + depth = 0 if _is_top_level_metadata_node(node) else 1 + return relevance, depth, node.qualified_name + + +def _metadata_child_group_count(snapshot: SirSnapshot, owner_id: str, group_name: str) -> int: + edge_kinds = _edge_kinds_for_metadata_group(group_name) + if not edge_kinds: + return 0 + return sum( + 1 + for edge in snapshot.edges + if edge.source_lineage == owner_id + and edge.kind in edge_kinds + and (edge.kind != EdgeKind.CONTAINS or edge.attributes.get("link_type") in {"METADATA_MODULE", "FORM_MODULE"}) + ) + + +def _metadata_child_group_count_from_index( + child_count_index: dict[str, Counter[EdgeKind]] | None, + owner_id: str, + group_name: str, +) -> int: + edge_kinds = _edge_kinds_for_metadata_group(group_name) + if not edge_kinds or child_count_index is None: + return 0 + counts = child_count_index.get(owner_id) + if counts is None: + return 0 + return sum(counts[kind] for kind in edge_kinds) + + +def _metadata_child_count_index(snapshot: SirSnapshot, owner_ids: list[str]) -> dict[str, Counter[EdgeKind]]: + owner_set = set(owner_ids) + result: dict[str, Counter[EdgeKind]] = {owner_id: Counter() for owner_id in owner_set} + for edge in snapshot.edges: + if edge.source_lineage in owner_set and ( + edge.kind != EdgeKind.CONTAINS or edge.attributes.get("link_type") in {"METADATA_MODULE", "FORM_MODULE"} + ): + result[edge.source_lineage][edge.kind] += 1 + return result + + +def _find_snapshot_node(snapshot: SirSnapshot, lineage_id: str): + return next((node for node in snapshot.nodes if node.lineage_id == lineage_id), None) + + +def _metadata_spec_for_node(node): + for spec in METADATA_TYPE_SPECS: + if _node_matches_metadata_spec(node, spec): + return spec + return None + + +def _metadata_icon_for_node_kind(kind: NodeKind) -> str: + return _METADATA_ICON_BY_NODE_KIND.get(kind, "folder") + + +def _node_matches_metadata_spec(node, spec) -> bool: + if not _is_top_level_metadata_node(node): + return False + allowed_kinds = _METADATA_SPEC_NODE_KINDS.get(spec.code) + if allowed_kinds is not None and node.kind not in allowed_kinds: + return False + prefix = _METADATA_SPEC_PREFIXES.get(spec.code) + return bool(prefix and node.qualified_name.casefold().startswith(prefix.casefold())) + + +def _is_top_level_metadata_node(node) -> bool: + return node.qualified_name.count(".") == 1 + + +def _metadata_icon_for_common_branch(label: str) -> str: + lowered = label.lower() + if "планы обмена" in lowered or "план обмена" in lowered: + return "exchange-plan" + if "подписки на события" in lowered or "подписка на событие" in lowered: + return "event" + if "регламентные задания" in lowered or "регламентное задание" in lowered: + return "scheduled-job" + if "модул" in label.lower(): + return "module" + if "форм" in label.lower(): + return "form" + if "команд" in label.lower(): + return "command" + if "web" in label.lower() or "http" in label.lower() or "сервис" in label.lower(): + return "web" + if "роль" in label.lower(): + return "role" + return "folder" + + +def _metadata_icon_for_child_group(label: str) -> str: + lowered = label.lower() + if "реквизит" in lowered or "измерен" in lowered or "ресурс" in lowered: + return "attribute" + if "таблич" in lowered: + return "table" + if "форм" in lowered: + return "form" + if "команд" in lowered: + return "command" + if "модуль" in lowered: + return "module" + if "права" in lowered: + return "role" + return "folder" diff --git a/services/api-server/src/api_server/metadata_tree_models.py b/services/api-server/src/api_server/metadata_tree_models.py new file mode 100644 index 0000000..dd4ca6a --- /dev/null +++ b/services/api-server/src/api_server/metadata_tree_models.py @@ -0,0 +1,50 @@ +from __future__ import annotations + +from pydantic import BaseModel, Field + + +class MetadataTreeNodeResponse(BaseModel): + id: str + label: str + kind: str + icon: str + qualified_name: str | None = None + count: int = 0 + loaded_count: int = 0 + has_more: bool = False + children: list["MetadataTreeNodeResponse"] = Field(default_factory=list) + + +class ProjectMetadataTreeResponse(BaseModel): + project_id: str + root: MetadataTreeNodeResponse + + +class MetadataTreeChildrenResponse(BaseModel): + project_id: str + parent_id: str + offset: int = 0 + limit: int = 50 + total: int = 0 + has_more: bool = False + children: list[MetadataTreeNodeResponse] = Field(default_factory=list) + + +class MetadataTreeSearchResponse(BaseModel): + project_id: str + q: str + total: int = 0 + results: list[MetadataTreeNodeResponse] = Field(default_factory=list) + + +class MetadataTreePathStepResponse(BaseModel): + parent_id: str + child_id: str + offset: int = 0 + + +class MetadataTreePathResponse(BaseModel): + project_id: str + node_id: str + path: list[str] = Field(default_factory=list) + steps: list[MetadataTreePathStepResponse] = Field(default_factory=list)