from __future__ import annotations from collections import Counter from html import escape from typing import Iterable from urllib.parse import quote from api_server.html5 import _enum_text, _metric from sir import NodeKind _HTML5_OBJECT_CONTEXT_KINDS = { NodeKind.CATALOG.value, NodeKind.DOCUMENT.value, NodeKind.REGISTER.value, NodeKind.COMMON_MODULE.value, NodeKind.CONSTANT.value, NodeKind.DOCUMENT_JOURNAL.value, NodeKind.ENUM.value, NodeKind.REPORT.value, NodeKind.DATA_PROCESSOR.value, NodeKind.FORM.value, NodeKind.CHART_OF_CHARACTERISTIC_TYPES.value, NodeKind.CHART_OF_ACCOUNTS.value, NodeKind.CHART_OF_CALCULATION_TYPES.value, NodeKind.EXCHANGE_PLAN.value, NodeKind.EXTERNAL_DATA_SOURCE.value, NodeKind.SCHEDULED_JOB.value, NodeKind.BUSINESS_PROCESS.value, NodeKind.TASK.value, } if hasattr(NodeKind, "EVENT_SUBSCRIPTION"): _HTML5_OBJECT_CONTEXT_KINDS.add(NodeKind.EVENT_SUBSCRIPTION.value) def render_html5_flowchart( project_id: str, flowchart: object | None, *, focus: str | None = None, depth: int = 1, oob: bool = False, ) -> str: normalized_depth = min(max(depth, 1), 3) hx_url = _flowchart_url(project_id, focus, normalized_depth) oob_attr = ' hx-swap-oob="outerHTML"' if oob else "" live_attr = ' sse-swap="project-flowchart"' if focus is None else "" if flowchart is None: return f"""
Карта связей

Сервер собирает граф проекта.

""" nodes = getattr(flowchart, "nodes", []) or [] edges = getattr(flowchart, "edges", []) or [] mode = str(getattr(flowchart, "mode", "overview")) body = "".join(_flowchart_edge_item(project_id, item, nodes, normalized_depth) for item in edges[:10]) if not body: body = "".join(_flowchart_node_item(project_id, item, normalized_depth) for item in nodes[:10]) if not body: body = '

Связи проекта не найдены

' return f"""
Карта связей · {escape(mode)}
{_flowchart_depth_actions(project_id, focus, normalized_depth)}
{_metric("Nodes", len(nodes))} {_metric("Edges", len(edges))} {_metric("Total nodes", getattr(flowchart, "total_nodes", 0))} {_metric("Total edges", getattr(flowchart, "total_edges", 0))}
{body}
""" def render_html5_project_report(project_id: str, report: dict | None) -> str: if report is None: return f"""
Отчет проекта

Сервер готовит сводку проекта.

""" metrics = [ ("Objects", report.get("node_count", 0)), ("Edges", report.get("edge_count", 0)), ("Procedures", report.get("procedure_count", 0)), ("Queries", report.get("query_count", 0)), ("Writes", report.get("write_count", 0)), ("Roles", report.get("role_count", 0)), ("Unowned", report.get("unowned_object_count", 0)), ("Sensitive", report.get("unclassified_sensitive_count", 0)), ] return f"""
Отчет проекта
{_project_summary(report)}
{''.join(_metric(label, value) for label, value in metrics)}
""" def _project_summary(report: dict) -> str: objects = int(report.get("node_count", 0) or 0) procedures = int(report.get("procedure_count", 0) or 0) queries = int(report.get("query_count", 0) or 0) writes = int(report.get("write_count", 0) or 0) unowned = int(report.get("unowned_object_count", 0) or 0) sensitive = int(report.get("unclassified_sensitive_count", 0) or 0) risk_total = unowned + sensitive bits = [ f"{objects} objects", f"{procedures} procedures", f"{queries} queries", f"{writes} writes", f"{risk_total} risk signals", ] if unowned: bits.append(f"{unowned} unowned") if sensitive: bits.append(f"{sensitive} sensitive") return f"""

{escape(" · ".join(bits))}

""" def render_html5_object_report( project_id: str, impact: object, *, access: object | None = None, privacy: object | None = None, runtime: Iterable[object] | None = None, integrations: Iterable[object] | None = None, oob: bool = False, ) -> str: obj = getattr(impact, "object", None) name = getattr(obj, "qualified_name", None) or getattr(obj, "name", None) or getattr(impact, "object_name", "object") grants = getattr(access, "grants", []) if access is not None else [] markers = getattr(privacy, "markers", []) if privacy is not None else [] runtime_items = list(runtime or []) integration_items = list(integrations or []) metrics = [ ("Routines", len(getattr(impact, "routines", []) or [])), ("Commands", len(getattr(impact, "commands", []) or [])), ("Reads", len(getattr(impact, "query_tables", []) or [])), ("Writes", len(getattr(impact, "writes", []) or [])), ("Roles", len(grants) or len(getattr(impact, "roles", []) or [])), ("Runtime", len(runtime_items)), ("Privacy", len(markers)), ("Integrations", len(integration_items)), ] oob_attr = ' hx-swap-oob="outerHTML"' if oob else "" return f"""
Отчет объекта
{escape(str(name))} server focused summary
{_object_report_summary( len(getattr(impact, "routines", []) or []), len(getattr(impact, "commands", []) or []), len(getattr(impact, "query_tables", []) or []), len(getattr(impact, "writes", []) or []), len(grants) or len(getattr(impact, "roles", []) or []), len(runtime_items), len(markers), len(integration_items), )}
{''.join(_metric(label, value) for label, value in metrics)}
""" def _object_report_summary( routines: int, commands: int, reads: int, writes: int, roles: int, runtime: int, privacy: int, integrations: int, ) -> str: impact_links = reads + writes bits = [ f"{routines} routines", f"{commands} commands", f"{impact_links} data links", f"{roles} roles", ] if runtime: bits.append(f"{runtime} runtime signals") if privacy: bits.append(f"{privacy} privacy markers") if integrations: bits.append(f"{integrations} integrations") return f"""

{escape(" · ".join(bits))}

""" def render_html5_review( project_id: str, findings: list[dict] | None, *, title: str = "Review", oob: bool = False, ) -> str: oob_attr = ' hx-swap-oob="outerHTML"' if oob else "" live_attr = ' sse-swap="project-review"' if title == "Review" and not oob else "" if findings is None: return f"""
{escape(title)}

Сервер готовит findings.

""" if not findings: body = '

Findings не найдены

' else: body = "".join(_review_item(project_id, finding) for finding in findings[:12]) return f"""
{escape(title)} · {len(findings)}
{_review_summary(findings)}
{body}
""" def render_html5_object_context( project_id: str, schema: object | None, impact: object | None, access: object | None = None, ui: object | None = None, runtime: Iterable[object] | None = None, knowledge: Iterable[object] | None = None, privacy: object | None = None, integrations: Iterable[object] | None = None, flowchart: object | None = None, mode: str = "overview", ) -> str: if schema is None or impact is None: return f"""
Object context

Выберите объект метаданных, чтобы сервер собрал schema/impact контекст проекта {escape(project_id)}.

""" obj = getattr(schema, "object", None) or getattr(impact, "object", None) name = getattr(obj, "qualified_name", None) or getattr(obj, "name", None) or "object" attributes = getattr(schema, "attributes", []) or [] sections = getattr(schema, "tabular_sections", []) or [] modules = getattr(impact, "modules", []) or [] routines = getattr(impact, "routines", []) or [] forms = getattr(impact, "forms", []) or [] commands = getattr(impact, "commands", []) or [] roles = getattr(impact, "roles", []) or [] jobs = getattr(impact, "jobs", []) or [] callees = getattr(impact, "callees", []) or [] query_tables = getattr(impact, "query_tables", []) or [] writes = getattr(impact, "writes", []) or [] grants = getattr(access, "grants", []) if access is not None else [] ui_forms = getattr(ui, "forms", []) if ui is not None else [] runtime_items = list(runtime or []) knowledge_items = list(knowledge or []) privacy_markers = getattr(privacy, "markers", []) if privacy is not None else [] integration_items = list(integrations or []) flow_nodes = getattr(flowchart, "nodes", []) if flowchart is not None else [] flow_edges = getattr(flowchart, "edges", []) if flowchart is not None else [] normalized_mode = mode if mode in {"overview", "schema", "impact", "privacy"} else "overview" if normalized_mode == "schema": compact_body = ( ''.join(_named_node_item("attr", item) for item in attributes[:12]) or '

Реквизиты не найдены

' ) compact_body += ''.join(_tabular_section_item(item) for item in sections[:8]) compact_body += ''.join(_ui_form_item(project_id, item) for item in ui_forms[:8]) elif normalized_mode == "impact": compact_body = ''.join(_integration_endpoint_item(item) for item in integration_items[:8]) compact_body += ''.join(_named_node_item("command", item) for item in commands[:8]) compact_body += ''.join(_named_node_item("read", item) for item in query_tables[:8]) compact_body += ''.join(_named_node_item("write", item) for item in writes[:8]) compact_body += ''.join(_named_node_item("call", item) for item in callees[:8]) compact_body += ''.join(_flowchart_edge_item(project_id, item, flow_nodes, 1) for item in flow_edges[:8]) compact_body += ''.join(_named_node_item("routine", item) for item in routines[:8]) compact_body += ''.join(_named_node_item("job", item) for item in jobs[:6]) compact_body = compact_body or '

Impact-связи не найдены

' elif normalized_mode == "privacy": compact_body = ''.join(_role_access_item(item) for item in grants[:12]) compact_body += ''.join(_privacy_marker_item(item) for item in privacy_markers[:12]) compact_body = compact_body or '

Доступы и privacy-маркеры не найдены

' else: compact_body = ( ''.join(_named_node_item("attr", item) for item in attributes[:6]) or '

Реквизиты не найдены

' ) compact_body += ''.join(_tabular_section_item(item) for item in sections[:4]) compact_body += ''.join(_ui_form_item(project_id, item) for item in ui_forms[:4]) compact_body += ''.join(_role_access_item(item) for item in grants[:6]) compact_body += ''.join(_integration_endpoint_item(item) for item in integration_items[:4]) compact_body += ''.join(_named_node_item("command", item) for item in commands[:6]) compact_body += ''.join(_named_node_item("read", item) for item in query_tables[:4]) compact_body += ''.join(_named_node_item("write", item) for item in writes[:4]) compact_body += ''.join(_named_node_item("call", item) for item in callees[:6]) compact_body += ''.join(_flowchart_edge_item(project_id, item, flow_nodes, 1) for item in flow_edges[:8]) compact_body += ''.join(_runtime_summary_item(item) for item in runtime_items[:6]) compact_body += ''.join(_knowledge_record_item(item) for item in knowledge_items[:6]) compact_body += ''.join(_privacy_marker_item(item) for item in privacy_markers[:6]) compact_body += ''.join(_named_node_item("routine", item) for item in routines[:6]) compact_body += ''.join(_named_node_item("job", item) for item in jobs[:4]) return f"""
Object context · {escape(normalized_mode)}
{_object_breadcrumb(str(name))}
{escape(str(name))} {escape(str(getattr(obj, "kind", "object")))}
{_object_action_links(project_id, str(name), getattr(obj, "lineage_id", ""), modules, normalized_mode)} {_object_summary( len(attributes), len(sections), len(commands), len(query_tables), len(writes), len(callees), len(integration_items), len(grants) or len(roles), len(runtime_items), len(privacy_markers), )}
{_metric("Attrs", len(attributes))} {_metric("Tables", len(sections))} {_metric("Modules", len(modules))} {_metric("Routines", len(routines))} {_metric("Forms", len(ui_forms) or len(forms))} {_metric("Commands", len(commands))} {_metric("Roles", len(grants) or len(roles))} {_metric("Reads", len(query_tables))} {_metric("Writes", len(writes))} {_metric("Calls", len(callees))} {_metric("Integrations", len(integration_items))} {_metric("Graph nodes", len(flow_nodes))} {_metric("Graph edges", len(flow_edges))} {_metric("Runtime", len(runtime_items))} {_metric("Knowledge", len(knowledge_items))} {_metric("Privacy", len(privacy_markers))}
{compact_body}
""" def _review_summary(findings: list[dict]) -> str: severities = Counter(str(item.get("severity") or item.get("level") or "INFO") for item in findings) titles = Counter(str(item.get("title") or item.get("code") or "Finding") for item in findings) severity_text = ", ".join(f"{name}: {count}" for name, count in sorted(severities.items())) or "no severities" title_text = ", ".join(f"{name}: {count}" for name, count in sorted(titles.items())[:4]) or "no findings" return f"""

{escape(str(len(findings)))} findings · {escape(severity_text)} · {escape(title_text)}

""" def _review_item(project_id: str, finding: dict) -> str: title = str(finding.get("title") or finding.get("code") or "Finding") severity = str(finding.get("severity") or finding.get("level") or "INFO") message = str(finding.get("message") or finding.get("description") or "") source_path = str(finding.get("source_path") or finding.get("path") or "") line = finding.get("line_start") or finding.get("line") location = f"{source_path}:{line}" if source_path and line else source_path source_link = ( f""" Source """ if source_path else "" ) return f"""
{escape(title)} {escape(severity)} {escape(message or location or "no details")} {source_link}
""" def _named_node_item(label: str, node: object) -> str: name = getattr(node, "qualified_name", None) or getattr(node, "name", "") kind = getattr(node, "kind", label) return f"""
{escape(str(name))} {escape(str(kind))}
""" def _object_breadcrumb(object_name: str) -> str: parts = [part for part in object_name.split(".") if part] if not parts: return "" items = "".join(f"{escape(part)}" for part in parts) return f'' def _object_summary( attributes: int, sections: int, commands: int, reads: int, writes: int, calls: int, integrations: int, access_rules: int, runtime_signals: int, privacy_markers: int, ) -> str: impact_total = reads + writes + calls status_bits = [ f"{attributes} attrs", f"{sections} tables", f"{commands} commands", f"{impact_total} impact links", f"{access_rules} access rules", ] if integrations: status_bits.append(f"{integrations} integrations") if runtime_signals: status_bits.append(f"{runtime_signals} runtime signals") if privacy_markers: status_bits.append(f"{privacy_markers} privacy markers") return f"""

{escape(" · ".join(status_bits))}

""" def _object_action_links( project_id: str, object_name: str, lineage_id: object, modules: Iterable[object], active_mode: str, ) -> str: quoted_project = quote(project_id) quoted_object = quote(object_name, safe="") lineage = str(lineage_id or "") module_links = "".join(_module_action_link(quoted_project, module) for module in _sorted_object_modules(modules)) symbol_link = ( f'Symbol'.format( project=quoted_project, lineage=quote(lineage, safe=""), ) if lineage else "" ) def active_attrs(mode: str) -> str: return ' aria-current="page" data-html5-object-action-active="true"' if mode == active_mode else "" return f""" """ def _sorted_object_modules(modules: Iterable[object]) -> list[object]: priority = { "OBJECT_MODULE": 0, "MANAGER_MODULE": 1, "RECORD_SET_MODULE": 2, "FORM_MODULE": 3, "MODULE": 9, } return sorted( list(modules), key=lambda module: ( priority.get(_module_role(module), 8), str((getattr(module, "attributes", {}) or {}).get("form_name") or ""), str(getattr(module, "qualified_name", "") or getattr(module, "name", "")), ), ) def _module_action_link(quoted_project: str, module: object) -> str: lineage = str(getattr(module, "lineage_id", "") or "") if not lineage: return "" quoted_lineage = quote(lineage, safe="") label = _module_action_label(module) return f""" {escape(label)} """ def _module_action_label(module: object) -> str: attributes = getattr(module, "attributes", {}) or {} role = _module_role(module) if role == "OBJECT_MODULE": return "Модуль объекта" if role == "MANAGER_MODULE": return "Модуль менеджера" if role == "RECORD_SET_MODULE": return "Модуль набора" if role == "FORM_MODULE": form_name = str(attributes.get("form_name") or "") return f"Модуль формы {form_name}" if form_name else "Модуль формы" return str(getattr(module, "name", None) or "Модуль") def _module_role(module: object) -> str: attributes = getattr(module, "attributes", {}) or {} return str(attributes.get("module_role") or attributes.get("role") or getattr(module, "module_role", "") or "MODULE") def _tabular_section_item(section: object) -> str: tabular_section = getattr(section, "tabular_section", None) columns = getattr(section, "columns", []) or [] name = getattr(tabular_section, "qualified_name", None) or getattr(tabular_section, "name", "") return f"""
{escape(str(name))} {escape(str(len(columns)))} columns
""" def _role_access_item(grant: object) -> str: role = getattr(grant, "role", None) permissions = getattr(grant, "permissions", {}) or {} role_name = getattr(role, "qualified_name", None) or getattr(role, "name", "role") enabled = [ str(key) for key, value in sorted(permissions.items()) if str(value).lower() in {"true", "1", "yes", "да"} ] permission_text = ", ".join(enabled) if enabled else "permissions unavailable" return f"""
{escape(str(role_name))} {escape(permission_text)}
""" def _ui_form_item(project_id: str, form_semantics: object) -> str: form = getattr(form_semantics, "form", None) commands = getattr(form_semantics, "commands", []) or [] elements = getattr(form_semantics, "elements", []) or [] handlers = getattr(form_semantics, "command_handlers", {}) or {} form_name = getattr(form, "qualified_name", None) or getattr(form, "name", "form") form_lineage = str(getattr(form, "lineage_id", "") or "") command_names = [ str(getattr(command, "name", getattr(command, "qualified_name", ""))) for command in commands[:3] ] handler_names = [ str(getattr(handler, "name", getattr(handler, "qualified_name", ""))) for handler in list(handlers.values())[:3] ] details = [] if command_names: details.append("cmd: " + ", ".join(command_names)) if handler_names: details.append("handler: " + ", ".join(handler_names)) if elements: details.append(f"{len(elements)} elements") return f"""
{escape(str(form_name))} {escape(" · ".join(details) or "UI metadata")} Редактор формы
""" def _runtime_summary_item(item: object) -> str: node = getattr(item, "node", None) name = getattr(node, "qualified_name", None) or getattr(node, "name", "runtime") signal_count = getattr(item, "signal_count", 0) error_count = getattr(item, "error_count", 0) max_duration = getattr(item, "max_duration_ms", None) duration_text = f" · max {max_duration} ms" if max_duration is not None else "" return f"""
{escape(str(name))} {escape(str(signal_count))} signals · {escape(str(error_count))} errors{escape(duration_text)}
""" def _knowledge_record_item(record: object) -> str: title = str(getattr(record, "title", "knowledge")) scope = _enum_text(getattr(record, "scope", "")) body = str(getattr(record, "body", "") or "") record_id = str(getattr(record, "record_id", "")) return f"""
{escape(title)} {escape(scope)} · {escape(record_id)} · {escape(body[:120])}
""" def _privacy_marker_item(marker: object) -> str: classification = _enum_text(getattr(marker, "classification", "")) reason = str(getattr(marker, "reason", "") or "") target_id = str(getattr(marker, "target_id", "") or "target unavailable") return f"""
{escape(classification or "privacy")} {escape(reason or target_id)}
""" def _integration_endpoint_item(endpoint: object) -> str: name = str(getattr(endpoint, "name", "") or "integration") kind = _enum_text(getattr(endpoint, "kind", "UNKNOWN")) direction = str(getattr(endpoint, "direction", "UNKNOWN") or "UNKNOWN") owner = str(getattr(endpoint, "owner", "") or "owner unavailable") return f"""
{escape(name)} {escape(kind)} · {escape(direction)} · {escape(owner)}
""" def _object_context_url(project_id: str, name: str) -> str: return f"/html5/projects/{quote(project_id)}/objects/context/{quote(name, safe='')}" def _flowchart_focus_link(project_id: str, name: str, depth: int) -> str: url = _flowchart_url(project_id, name, depth) return f""" {escape(name)} """ def _flowchart_context_link(project_id: str, name: str, kind: str) -> str: if kind not in _HTML5_OBJECT_CONTEXT_KINDS: return "" url = _object_context_url(project_id, name) return f""" Context """ def _flowchart_edge_item(project_id: str, edge: object, nodes: Iterable[object], depth: int) -> str: node_names = { str(getattr(node, "id", "")): str(getattr(node, "qualified_name", "") or getattr(node, "label", "")) for node in nodes } node_kinds = { str(getattr(node, "id", "")): str(getattr(node, "kind", "") or "NODE") for node in nodes } source = node_names.get(str(getattr(edge, "source", "")), str(getattr(edge, "source", ""))) target = node_names.get(str(getattr(edge, "target", "")), str(getattr(edge, "target", ""))) source_kind = node_kinds.get(str(getattr(edge, "source", "")), "") target_kind = node_kinds.get(str(getattr(edge, "target", "")), "") label = str(getattr(edge, "label", "") or getattr(edge, "kind", "") or "link") kind = str(getattr(edge, "kind", "") or "FLOW") return f"""
{escape(label)} {_flowchart_focus_link(project_id, source, depth)} -> {_flowchart_focus_link(project_id, target, depth)} · {escape(kind)} {_flowchart_context_link(project_id, source, source_kind)} {_flowchart_context_link(project_id, target, target_kind)}
""" def _flowchart_url(project_id: str, focus: str | None, depth: int) -> str: params = [] if focus: params.append(f"focus={quote(focus, safe='')}") params.append(f"depth={depth}") return f"/html5/projects/{quote(project_id)}/flowchart?{'&'.join(params)}" def _flowchart_depth_actions(project_id: str, focus: str | None, active_depth: int) -> str: buttons = [] for depth in [1, 2, 3]: active = ' aria-current="page" data-html5-object-action-active="true"' if depth == active_depth else "" url = _flowchart_url(project_id, focus, depth) buttons.append( f""" Depth {depth} """ ) return f'' def _flowchart_node_item(project_id: str, node: object, depth: int) -> str: name = str(getattr(node, "qualified_name", "") or getattr(node, "label", "") or "node") kind = str(getattr(node, "kind", "") or "NODE") level = getattr(node, "level", 0) count = getattr(node, "count", 1) url = _flowchart_url(project_id, name, depth) return f"""
{escape(name)} {escape(kind)} · level {escape(str(level))} · count {escape(str(count))} {_flowchart_context_link(project_id, name, kind)}
"""