Initial SFERA platform baseline

This commit is contained in:
2026-05-16 19:03:49 +03:00
commit 3b845c8fce
282 changed files with 55045 additions and 0 deletions
@@ -0,0 +1,314 @@
from __future__ import annotations
from pydantic import BaseModel
from integration_topology import IntegrationKind, build_integration_topology
from query_intelligence import tables_with_read_write_conflicts
from sir import DiagnosticSeverity, EdgeKind, NodeKind, SirSnapshot
class ReviewFinding(BaseModel):
finding_id: str
title: str
severity: DiagnosticSeverity
message: str
source_path: str | None = None
line_start: int | None = None
def review_snapshot(snapshot: SirSnapshot) -> list[ReviewFinding]:
findings: list[ReviewFinding] = []
nodes_by_lineage = {node.lineage_id: node for node in snapshot.nodes}
for diagnostic in snapshot.diagnostics:
findings.append(
ReviewFinding(
finding_id=f"finding.{diagnostic.diagnostic_id}",
title=diagnostic.code,
severity=diagnostic.severity,
message=diagnostic.message,
source_path=diagnostic.source_ref.source_path if diagnostic.source_ref else None,
line_start=diagnostic.source_ref.line_start if diagnostic.source_ref else None,
)
)
for reference in snapshot.unresolved_references:
findings.append(
ReviewFinding(
finding_id=f"finding.{reference.reference_id}",
title=f"Unresolved {reference.kind.value.lower()}",
severity=DiagnosticSeverity.WARNING,
message=f"Unresolved reference to {reference.target_name}",
source_path=reference.source_ref.source_path,
line_start=reference.source_ref.line_start,
)
)
secured_lineages = {
edge.target_lineage
for edge in snapshot.edges
if edge.kind == EdgeKind.GRANTS_ACCESS
}
access_target_kinds = {
NodeKind.CATALOG,
NodeKind.DOCUMENT,
NodeKind.REGISTER,
NodeKind.COMMON_MODULE,
NodeKind.EXCHANGE_PLAN,
NodeKind.SCHEDULED_JOB,
NodeKind.BUSINESS_PROCESS,
NodeKind.TASK,
}
for node in snapshot.nodes:
if node.kind in access_target_kinds and node.lineage_id not in secured_lineages:
findings.append(
ReviewFinding(
finding_id=f"finding.security.unsecured.{node.lineage_id}",
title="Missing 1C role access",
severity=DiagnosticSeverity.WARNING,
message=f"Object {node.qualified_name} has no role access grants",
source_path=node.source_ref.source_path,
line_start=node.source_ref.line_start,
)
)
handled_command_lineages = {
edge.source_lineage
for edge in snapshot.edges
if edge.kind == EdgeKind.HANDLES
}
for node in snapshot.nodes:
if node.kind != NodeKind.COMMAND or node.lineage_id in handled_command_lineages:
continue
handler_name = _command_handler_name(node.attributes)
if not handler_name:
continue
findings.append(
ReviewFinding(
finding_id=f"finding.ui.unresolved_handler.{node.lineage_id}",
title="Unresolved 1C command handler",
severity=DiagnosticSeverity.WARNING,
message=f"Command {node.qualified_name} references missing handler {handler_name}",
source_path=node.source_ref.source_path,
line_start=node.source_ref.line_start,
)
)
handled_form_handlers = {
(edge.source_lineage, str(edge.attributes.get("handler_name", "")).casefold())
for edge in snapshot.edges
if edge.kind == EdgeKind.HANDLES
}
for node in snapshot.nodes:
if node.kind != NodeKind.FORM:
continue
for _, handler_name in _form_handler_names(node.attributes):
if (node.lineage_id, handler_name.casefold()) in handled_form_handlers:
continue
findings.append(
ReviewFinding(
finding_id=f"finding.ui.unresolved_form_handler.{node.lineage_id}.{handler_name}",
title="Unresolved 1C form event handler",
severity=DiagnosticSeverity.WARNING,
message=f"Form {node.qualified_name} references missing event handler {handler_name}",
source_path=node.source_ref.source_path,
line_start=node.source_ref.line_start,
)
)
findings.extend(_document_posting_write_review(snapshot, nodes_by_lineage))
tabular_sections_with_columns = {
edge.source_lineage
for edge in snapshot.edges
if edge.kind == EdgeKind.HAS_ATTRIBUTE
}
tabular_section_columns: dict[str, list] = {}
for edge in snapshot.edges:
if edge.kind != EdgeKind.HAS_ATTRIBUTE:
continue
source = nodes_by_lineage.get(edge.source_lineage)
target = nodes_by_lineage.get(edge.target_lineage)
if source is None or target is None:
continue
if source.kind == NodeKind.TABULAR_SECTION and target.kind == NodeKind.ATTRIBUTE:
tabular_section_columns.setdefault(source.lineage_id, []).append(target)
for node in snapshot.nodes:
if node.kind != NodeKind.TABULAR_SECTION or node.lineage_id in tabular_sections_with_columns:
continue
findings.append(
ReviewFinding(
finding_id=f"finding.schema.empty_tabular_section.{node.lineage_id}",
title="Empty 1C tabular section",
severity=DiagnosticSeverity.WARNING,
message=f"Tabular section {node.qualified_name} has no columns",
source_path=node.source_ref.source_path,
line_start=node.source_ref.line_start,
)
)
for node in snapshot.nodes:
if node.kind != NodeKind.TABULAR_SECTION:
continue
columns = tabular_section_columns.get(node.lineage_id, [])
if not columns or _has_subject_column(columns):
continue
findings.append(
ReviewFinding(
finding_id=f"finding.schema.no_subject_column.{node.lineage_id}",
title="No subject column in 1C tabular section",
severity=DiagnosticSeverity.INFO,
message=(
f"Tabular section {node.qualified_name} has columns, "
"but no common subject column such as Номенклатура, Товар, Услуга, Объект, or Ссылка"
),
source_path=node.source_ref.source_path,
line_start=node.source_ref.line_start,
)
)
for endpoint in build_integration_topology(snapshot).endpoints:
if endpoint.direction == "OUTBOUND" and endpoint.kind in {
IntegrationKind.HTTP_SERVICE,
IntegrationKind.WEB_SERVICE,
IntegrationKind.COM_CONNECTOR,
}:
findings.append(
ReviewFinding(
finding_id=f"finding.integration.{endpoint.endpoint_id}",
title="External integration endpoint",
severity=DiagnosticSeverity.INFO,
message=f"{endpoint.kind.value} integration {endpoint.name} is used by {endpoint.owner}",
source_path=None,
line_start=None,
)
)
for usage in tables_with_read_write_conflicts(snapshot):
findings.append(
ReviewFinding(
finding_id=f"finding.query.read_write_conflict.{usage.table.lineage_id}",
title="Register read/write dependency",
severity=DiagnosticSeverity.INFO,
message=(
f"{usage.table.qualified_name} is read by "
f"{', '.join(reader.name for reader in usage.readers)} and written by "
f"{', '.join(writer.name for writer in usage.writers)}"
),
source_path=usage.table.source_ref.source_path,
line_start=usage.table.source_ref.line_start,
)
)
return findings
def _document_posting_write_review(
snapshot: SirSnapshot,
nodes_by_lineage: dict[str, object],
) -> list[ReviewFinding]:
module_lineages_by_document = {
edge.source_lineage: edge.target_lineage
for edge in snapshot.edges
if edge.kind == EdgeKind.CONTAINS
and nodes_by_lineage.get(edge.source_lineage) is not None
and getattr(nodes_by_lineage[edge.source_lineage], "kind", None) == NodeKind.DOCUMENT
and nodes_by_lineage.get(edge.target_lineage) is not None
and getattr(nodes_by_lineage[edge.target_lineage], "kind", None) == NodeKind.MODULE
}
routines_by_module: dict[str, list] = {}
routines_with_writes = {
edge.source_lineage
for edge in snapshot.edges
if edge.kind == EdgeKind.WRITES
}
for edge in snapshot.edges:
if edge.kind != EdgeKind.DECLARES:
continue
routine = nodes_by_lineage.get(edge.target_lineage)
if routine is None or getattr(routine, "kind", None) not in {NodeKind.PROCEDURE, NodeKind.FUNCTION}:
continue
routines_by_module.setdefault(edge.source_lineage, []).append(routine)
findings: list[ReviewFinding] = []
for document_lineage, module_lineage in module_lineages_by_document.items():
document = nodes_by_lineage[document_lineage]
posting_routines = [
routine
for routine in routines_by_module.get(module_lineage, [])
if routine.name.casefold() in {"проведение", "posting"}
]
for routine in posting_routines:
if routine.lineage_id in routines_with_writes:
continue
findings.append(
ReviewFinding(
finding_id=f"finding.document.posting_no_writes.{routine.lineage_id}",
title="Document posting has no register writes",
severity=DiagnosticSeverity.INFO,
message=(
f"Document {document.qualified_name} posting routine "
f"{routine.name} does not write registers"
),
source_path=routine.source_ref.source_path,
line_start=routine.source_ref.line_start,
)
)
return findings
def _command_handler_name(attributes: dict) -> str:
for key in (
"action",
"Action",
"handler",
"Handler",
"method",
"Method",
"methodName",
"MethodName",
"Действие",
"Обработчик",
"Метод",
"ИмяМетода",
):
value = attributes.get(key)
if value:
return str(value).split(".")[-1]
return ""
def _form_handler_names(attributes: dict) -> list[tuple[str, str]]:
handler_keys = {
"oncreate",
"onopen",
"onclose",
"beforeclose",
"beforewrite",
"afterwrite",
"onread",
"onchange",
"event",
"handler",
"method",
"methodname",
"присозданиинсервере",
"присозданиинаклиенте",
"приоткрытии",
"передзакрытием",
"призакрытии",
"передзаписью",
"призаписи",
"причтении",
"приизменении",
"событие",
"обработчик",
"метод",
"имяметода",
}
handlers: list[tuple[str, str]] = []
for key, value in attributes.items():
if value and str(key).casefold() in handler_keys:
handlers.append((str(key), str(value).split(".")[-1]))
return handlers
def _has_subject_column(columns: list) -> bool:
subject_names = {"номенклатура", "товар", "товары", "услуга", "услуги", "объект", "ссылка"}
for column in columns:
name = column.name.casefold()
if name in subject_names:
return True
return False
__all__ = ["ReviewFinding", "review_snapshot"]