Files
sfera/packages/semantic-kernel/src/semantic_kernel/__init__.py
T
m af900e4e34
CI / python (push) Has been cancelled
CI / rust (push) Has been cancelled
Extract managed form elements from XML
2026-05-21 06:10:05 +03:00

1504 lines
56 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import hashlib
import json
import os
import re
import shutil
import subprocess
from dataclasses import dataclass, field
from pathlib import Path
import xml.etree.ElementTree as ET
from one_c_normalizer import parse_one_c_xml_file
from sir import (
Diagnostic,
DiagnosticSeverity,
EdgeKind,
NodeKind,
SemanticEdge,
SemanticNode,
SirSnapshot,
SnapshotMetadata,
SourceRef,
ReferenceKind,
UnresolvedReference,
compute_snapshot_hash,
make_lineage_id,
make_semantic_id,
validate_snapshot,
)
_ROUTINE_START_RE = re.compile(
r"^\s*(?P<kind>Процедура|Procedure|Функция|Function)\s+"
r"(?P<name>[A-Za-zА-Яа-я_][\wА-Яа-я]*)",
re.IGNORECASE,
)
_ROUTINE_END_RE = re.compile(r"^\s*(КонецПроцедуры|EndProcedure|КонецФункции|EndFunction)\b", re.IGNORECASE)
_CALL_RE = re.compile(r"^\s*(?P<name>[A-Za-zА-Яа-я_][\wА-Яа-я]*)\s*\([^)]*\)\s*;")
_ASSIGNMENT_CALL_RE = re.compile(
r"=\s*(?P<name>[A-Za-zА-Яа-я_][\wА-Яа-я]*)\s*\([^)]*\)\s*;",
re.IGNORECASE,
)
_CONDITION_CALL_RE = re.compile(
r"^\s*(Если|If)\b.*?(?<!\.)(?P<name>[A-Za-zА-Яа-я_][\wА-Яа-я]*)\s*\([^)]*\).*?(Тогда|Then)\b",
re.IGNORECASE,
)
_OBJECT_CREATE_RE = re.compile(
r"^\s*(?P<var>[A-Za-zА-Яа-я_][\wА-Яа-я]*)\s*=\s*"
r"(?P<factory>Справочники|Catalogs|Документы|Documents|"
r"РегистрыСведений|InformationRegisters|РегистрыНакопления|AccumulationRegisters|"
r"РегистрыБухгалтерии|AccountingRegisters|РегистрыРасчета|CalculationRegisters)\."
r"(?P<name>[A-Za-zА-Яа-я_][\wА-Яа-я]*)\."
r"(?P<method>СоздатьЭлемент|CreateItem|СоздатьДокумент|CreateDocument|"
r"СоздатьНаборЗаписей|CreateRecordSet)\s*\(",
re.IGNORECASE,
)
_QUERY_TEXT_RE = re.compile(r"\.(Текст|Text)\b", re.IGNORECASE)
_INLINE_NEW_QUERY_RE = re.compile(r"(Новый\s+Запрос|New\s+Query)\s*\(\s*\"(?P<query>.*?)\"\s*\)", re.IGNORECASE)
_FROM_RE = re.compile(r"^\s*(ИЗ|FROM)\s*$", re.IGNORECASE)
_URL_RE = re.compile(r"https?://[^\"'\s;]+", re.IGNORECASE)
_RUST_PARSER_ENV = "SFERA_BSL_PARSER"
_METADATA_OWNER_KINDS = {
NodeKind.CATALOG,
NodeKind.DOCUMENT,
NodeKind.REGISTER,
NodeKind.COMMON_MODULE,
NodeKind.CONSTANT,
NodeKind.DOCUMENT_JOURNAL,
NodeKind.ENUM,
NodeKind.REPORT,
NodeKind.DATA_PROCESSOR,
NodeKind.CHART_OF_CHARACTERISTIC_TYPES,
NodeKind.CHART_OF_ACCOUNTS,
NodeKind.CHART_OF_CALCULATION_TYPES,
NodeKind.EXCHANGE_PLAN,
NodeKind.EXTERNAL_DATA_SOURCE,
NodeKind.SCHEDULED_JOB,
NodeKind.BUSINESS_PROCESS,
NodeKind.TASK,
}
_PATH_METADATA_ALIASES = {
"catalogs": ("Справочник", NodeKind.CATALOG),
"справочники": ("Справочник", NodeKind.CATALOG),
"documents": ("Документ", NodeKind.DOCUMENT),
"документы": ("Документ", NodeKind.DOCUMENT),
"constants": ("Константа", NodeKind.CONSTANT),
"константы": ("Константа", NodeKind.CONSTANT),
"documentjournals": ("ЖурналДокументов", NodeKind.DOCUMENT_JOURNAL),
"журналыдокументов": ("ЖурналДокументов", NodeKind.DOCUMENT_JOURNAL),
"enums": ("Перечисление", NodeKind.ENUM),
"перечисления": ("Перечисление", NodeKind.ENUM),
"reports": ("Отчет", NodeKind.REPORT),
"отчеты": ("Отчет", NodeKind.REPORT),
"dataprocessors": ("Обработка", NodeKind.DATA_PROCESSOR),
"обработки": ("Обработка", NodeKind.DATA_PROCESSOR),
"chartsofcharacteristictypes": ("ПланВидовХарактеристик", NodeKind.CHART_OF_CHARACTERISTIC_TYPES),
"планывидовхарактеристик": ("ПланВидовХарактеристик", NodeKind.CHART_OF_CHARACTERISTIC_TYPES),
"chartsofaccounts": ("ПланСчетов", NodeKind.CHART_OF_ACCOUNTS),
"планысчетов": ("ПланСчетов", NodeKind.CHART_OF_ACCOUNTS),
"chartsofcalculationtypes": ("ПланВидовРасчета", NodeKind.CHART_OF_CALCULATION_TYPES),
"планывидоврасчета": ("ПланВидовРасчета", NodeKind.CHART_OF_CALCULATION_TYPES),
"accumulationregisters": ("РегистрНакопления", NodeKind.REGISTER),
"регистрынакопления": ("РегистрНакопления", NodeKind.REGISTER),
"informationregisters": ("РегистрСведений", NodeKind.REGISTER),
"регистрысведений": ("РегистрСведений", NodeKind.REGISTER),
"accountingregisters": ("РегистрБухгалтерии", NodeKind.REGISTER),
"регистрыбухгалтерии": ("РегистрБухгалтерии", NodeKind.REGISTER),
"calculationregisters": ("РегистрРасчета", NodeKind.REGISTER),
"регистрырасчета": ("РегистрРасчета", NodeKind.REGISTER),
"commonmodules": ("ОбщийМодуль", NodeKind.COMMON_MODULE),
"общиемодули": ("ОбщийМодуль", NodeKind.COMMON_MODULE),
"exchangeplans": ("ПланОбмена", NodeKind.EXCHANGE_PLAN),
"планыобмена": ("ПланОбмена", NodeKind.EXCHANGE_PLAN),
"externaldatasources": ("ВнешнийИсточникДанных", NodeKind.EXTERNAL_DATA_SOURCE),
"внешниеисточникиданных": ("ВнешнийИсточникДанных", NodeKind.EXTERNAL_DATA_SOURCE),
"scheduledjobs": ("РегламентноеЗадание", NodeKind.SCHEDULED_JOB),
"регламентныезадания": ("РегламентноеЗадание", NodeKind.SCHEDULED_JOB),
"businessprocesses": ("БизнесПроцесс", NodeKind.BUSINESS_PROCESS),
"бизнеспроцессы": ("БизнесПроцесс", NodeKind.BUSINESS_PROCESS),
"tasks": ("Задача", NodeKind.TASK),
"задачи": ("Задача", NodeKind.TASK),
}
@dataclass(frozen=True)
class ParsedRoutine:
name: str
is_function: bool
export: bool
line_start: int
line_end: int
calls: tuple[tuple[str, int], ...] = ()
queries: tuple["ParsedQuery", ...] = ()
writes: tuple["ParsedWrite", ...] = ()
@dataclass(frozen=True)
class ParsedQuery:
text: str
tables: tuple[str, ...]
line_start: int
line_end: int
@dataclass(frozen=True)
class ParsedWrite:
target: str
write_type: str
line: int
@dataclass
class _RoutineBuilder:
name: str
is_function: bool
export: bool
line_start: int
line_end: int
calls: list[tuple[str, int]] = field(default_factory=list)
queries: list[ParsedQuery] = field(default_factory=list)
writes: list[ParsedWrite] = field(default_factory=list)
def freeze(self) -> ParsedRoutine:
return ParsedRoutine(
name=self.name,
is_function=self.is_function,
export=self.export,
line_start=self.line_start,
line_end=self.line_end,
calls=tuple(self.calls),
queries=tuple(self.queries),
writes=tuple(self.writes),
)
def index_project(path: str | Path, *, project_id: str | None = None, structure_only: bool = False) -> SirSnapshot:
root = Path(path)
source_files = [] if structure_only else (
[root] if root.is_file() and root.suffix.lower() == ".bsl" else sorted(root.rglob("*.bsl"))
)
xml_suffixes = {".xml", ".mdo", ".form"}
xml_root = root / "src" if structure_only and (root / "src").exists() else root
xml_files = [root] if root.is_file() and root.suffix.lower() in xml_suffixes else sorted(
file for file in xml_root.rglob("*") if file.is_file() and file.suffix.lower() in xml_suffixes
)
project = project_id or root.stem or "sfera"
nodes: list[SemanticNode] = []
edges: list[SemanticEdge] = []
unresolved_references: list[UnresolvedReference] = []
routine_by_name: dict[str, SemanticNode] = {}
routine_by_source_and_name: dict[tuple[str, str], SemanticNode] = {}
module_nodes: dict[str, SemanticNode] = {}
routines_by_source: dict[str, list[ParsedRoutine]] = {}
diagnostics: list[Diagnostic] = []
metadata_nodes: list[SemanticNode] = []
scheduled_job_nodes: list[SemanticNode] = []
command_nodes: list[SemanticNode] = []
form_nodes: list[SemanticNode] = []
role_rights: list[dict] = []
for source_file in source_files:
text = _read_text_file(source_file)
source_path = source_file.as_posix()
source_hash = _source_hash(text)
routines = parse_bsl_module_file(source_file, text)
diagnostics.extend(_bsl_parse_diagnostics(source_path, text, source_hash))
routines_by_source[source_path] = routines
module_name = source_file.stem
module_key = f"{project}:{source_path}"
module = _node(
NodeKind.MODULE,
module_name,
module_name,
module_key,
SourceRef(source_path=source_path, source_hash=source_hash),
{"source_text": text},
)
module_nodes[source_path] = module
nodes.append(module)
integration_nodes, integration_edges = _module_integration_graph(module, text, source_path, source_hash)
nodes.extend(integration_nodes)
edges.extend(integration_edges)
for routine in routines:
routine_kind = NodeKind.FUNCTION if routine.is_function else NodeKind.PROCEDURE
qualified_name = f"{module_name}.{routine.name}"
routine_node = _node(
routine_kind,
routine.name,
qualified_name,
f"{module_key}:{routine.name}",
SourceRef(
source_path=source_path,
line_start=routine.line_start,
line_end=routine.line_end,
column_start=1,
source_hash=source_hash,
),
{"export": routine.export},
)
nodes.append(routine_node)
routine_key = routine.name.casefold()
routine_by_name.setdefault(routine_key, routine_node)
routine_by_source_and_name[(source_path, routine_key)] = routine_node
edges.append(_edge(EdgeKind.DECLARES, module, routine_node, source_path, routine.line_start))
for index, query in enumerate(routine.queries, start=1):
query_node = _node(
NodeKind.QUERY,
f"{routine.name}.query{index}",
f"{qualified_name}.query{index}",
f"{module_key}:{routine.name}:query:{index}",
SourceRef(
source_path=source_path,
line_start=query.line_start,
line_end=query.line_end,
source_hash=source_hash,
),
{"query_text": query.text},
)
nodes.append(query_node)
edges.append(_edge(EdgeKind.OWNS_QUERY, routine_node, query_node, source_path, query.line_start))
for table in query.tables:
table_node = _table_node(table, source_path, source_hash)
if table_node.lineage_id not in {node.lineage_id for node in nodes}:
nodes.append(table_node)
edges.append(_edge(EdgeKind.READS_TABLE, query_node, table_node, source_path, query.line_start))
for write in routine.writes:
target_node = _write_target_node(write, source_path, source_hash)
if target_node.lineage_id not in {node.lineage_id for node in nodes}:
nodes.append(target_node)
edges.append(
_edge(
EdgeKind.WRITES,
routine_node,
target_node,
source_path,
write.line,
{"write_type": write.write_type},
)
)
for source_file in xml_files:
source_path = source_file.as_posix()
text = _read_text_file(source_file)
source_hash = _source_hash(text)
parent_by_prefix: dict[str, SemanticNode] = {}
try:
xml_objects = parse_one_c_xml_file(source_file)
except (OSError, UnicodeDecodeError, ET.ParseError) as error:
diagnostics.append(
_diagnostic(
"XML_PARSE_ERROR",
DiagnosticSeverity.ERROR,
f"Cannot parse 1C metadata XML: {error}",
source_path,
1,
source_hash,
)
)
continue
for xml_object in xml_objects:
if xml_object.object_kind == "RIGHT":
role_rights.append(xml_object.attributes)
continue
kind = _xml_node_kind(xml_object.object_kind)
if kind is None:
continue
node = _node(
kind,
xml_object.name,
xml_object.qualified_name,
f"{project}:{source_path}:{xml_object.object_kind}:{xml_object.qualified_name}",
SourceRef(source_path=source_path, source_hash=source_hash),
xml_object.attributes,
)
nodes.append(node)
if kind in _METADATA_OWNER_KINDS:
metadata_nodes.append(node)
if kind == NodeKind.SCHEDULED_JOB:
scheduled_job_nodes.append(node)
if kind == NodeKind.COMMAND:
command_nodes.append(node)
if kind == NodeKind.FORM:
form_nodes.append(node)
parent = _find_xml_parent(parent_by_prefix, node.qualified_name)
if parent is not None:
edges.append(_edge(_xml_edge_kind(kind), parent, node, source_path, 1))
if kind in {
NodeKind.CATALOG,
NodeKind.DOCUMENT,
NodeKind.REGISTER,
NodeKind.COMMON_MODULE,
NodeKind.CONSTANT,
NodeKind.DOCUMENT_JOURNAL,
NodeKind.ENUM,
NodeKind.REPORT,
NodeKind.DATA_PROCESSOR,
NodeKind.CHART_OF_CHARACTERISTIC_TYPES,
NodeKind.CHART_OF_ACCOUNTS,
NodeKind.CHART_OF_CALCULATION_TYPES,
NodeKind.EXCHANGE_PLAN,
NodeKind.EXTERNAL_DATA_SOURCE,
NodeKind.SCHEDULED_JOB,
NodeKind.BUSINESS_PROCESS,
NodeKind.TASK,
NodeKind.ROLE,
NodeKind.FORM,
NodeKind.TABULAR_SECTION,
}:
parent_by_prefix[node.qualified_name] = node
edges.extend(_link_metadata_to_modules(root, module_nodes, metadata_nodes, form_nodes))
edges.extend(_link_role_rights(nodes, role_rights))
edges.extend(_link_scheduled_jobs_to_routines(scheduled_job_nodes, routine_by_name))
edges.extend(_link_commands_to_handlers(command_nodes, routine_by_name))
edges.extend(_link_forms_to_handlers(form_nodes, routine_by_name))
for source_file in source_files:
source_path = source_file.as_posix()
module = module_nodes[source_path]
for routine in routines_by_source[source_path]:
caller = _find_declared_routine(nodes, module.name, routine.name)
if caller is None:
continue
for callee_name, line in routine.calls:
callee_key = callee_name.casefold()
callee = routine_by_source_and_name.get((source_path, callee_key)) or routine_by_name.get(callee_key)
if callee is not None:
edges.append(_edge(EdgeKind.CALLS, caller, callee, source_path, line))
else:
unresolved_references.append(
UnresolvedReference(
reference_id=f"ref.{_stable_hash(f'{caller.lineage_id}:{callee_name}:{line}')}",
kind=ReferenceKind.CALL,
source_lineage=caller.lineage_id,
target_name=callee_name,
source_ref=SourceRef(
source_path=source_path,
line_start=line,
line_end=line,
),
)
)
deduped_nodes, lineage_aliases = _dedupe_nodes_with_aliases(nodes)
deduped_edges, edge_diagnostics = _remap_and_dedupe_edges(edges, deduped_nodes, lineage_aliases)
diagnostics.extend(edge_diagnostics)
snapshot = SirSnapshot(
snapshot_id=f"snapshot.{_stable_hash(project + ':' + str(root))}",
project_id=project,
metadata=SnapshotMetadata(source_root=root.as_posix()),
nodes=deduped_nodes,
edges=deduped_edges,
diagnostics=diagnostics,
unresolved_references=unresolved_references,
)
snapshot.snapshot_hash = compute_snapshot_hash(snapshot)
validate_snapshot(snapshot)
return snapshot
def parse_bsl_module_file(source_file: str | Path, source: str | None = None) -> list[ParsedRoutine]:
parser_path = resolve_rust_bsl_parser(source_file)
source_text = source if source is not None else _read_text_file(Path(source_file))
if parser_path:
rust_routines = parse_bsl_module_from_rust_json(_run_rust_bsl_parser(parser_path, Path(source_file)))
return _merge_inline_query_fallback(rust_routines, parse_bsl_module(source_text))
return parse_bsl_module(source_text)
def _merge_inline_query_fallback(
primary: list[ParsedRoutine],
fallback: list[ParsedRoutine],
) -> list[ParsedRoutine]:
fallback_by_name = {routine.name.casefold(): routine for routine in fallback}
result: list[ParsedRoutine] = []
seen: set[str] = set()
for routine in primary:
key = routine.name.casefold()
seen.add(key)
fallback_routine = fallback_by_name.get(key)
queries = routine.queries
if fallback_routine is not None and len(fallback_routine.queries) > len(queries):
queries = fallback_routine.queries
result.append(
ParsedRoutine(
name=routine.name,
is_function=routine.is_function,
export=routine.export,
line_start=routine.line_start,
line_end=routine.line_end,
calls=routine.calls,
queries=queries,
writes=routine.writes,
)
)
result.extend(routine for routine in fallback if routine.name.casefold() not in seen)
return result
def resolve_rust_bsl_parser(source_file: str | Path | None = None) -> str | None:
configured = os.getenv(_RUST_PARSER_ENV)
if configured:
return configured
return _auto_discovered_rust_bsl_parser(Path(source_file) if source_file is not None else None)
def _auto_discovered_rust_bsl_parser(source_file: Path | None = None) -> str | None:
binary_name = "bsl-parser.exe" if os.name == "nt" else "bsl-parser"
candidates: list[Path] = []
if source_file is not None:
for parent in [source_file.resolve().parent, *source_file.resolve().parents]:
candidates.append(parent / "rust" / "target" / "debug" / binary_name)
package_file = Path(__file__).resolve()
for parent in [package_file.parent, *package_file.parents]:
candidates.append(parent / "rust" / "target" / "debug" / binary_name)
for candidate in candidates:
if candidate.is_file():
return str(candidate)
return shutil.which("bsl-parser")
def parse_bsl_module_from_rust_json(payload: str | dict) -> list[ParsedRoutine]:
data = json.loads(payload) if isinstance(payload, str) else payload
routines_by_name: dict[str, _RoutineBuilder] = {}
for procedure in data.get("procedures", []):
source_range = procedure.get("source_range", {})
name = str(procedure["name"])
routines_by_name[name.lower()] = _RoutineBuilder(
name=name,
is_function=bool(procedure.get("is_function")),
export=bool(procedure.get("export")),
line_start=int(source_range.get("line_start", 0)),
line_end=int(source_range.get("line_end", source_range.get("line_start", 0))),
)
for call in data.get("calls", []):
caller = routines_by_name.get(str(call.get("caller", "")).lower())
if caller is None:
continue
source_range = call.get("source_range", {})
caller.calls.append((str(call["callee"]), int(source_range.get("line_start", 0))))
for query in data.get("queries", []):
owner = routines_by_name.get(str(query.get("owner_procedure", "")).lower())
if owner is None:
continue
source_range = query.get("source_range", {})
owner.queries.append(
ParsedQuery(
text=str(query.get("query_text", "")),
tables=tuple(str(table) for table in query.get("tables", [])),
line_start=int(source_range.get("line_start", 0)),
line_end=int(source_range.get("line_end", source_range.get("line_start", 0))),
)
)
for write in data.get("writes", []):
owner = routines_by_name.get(str(write.get("owner_procedure", "")).lower())
if owner is None:
continue
source_range = write.get("source_range", {})
owner.writes.append(
ParsedWrite(
target=str(write.get("target", "unknown")),
write_type=str(write.get("write_type", "OBJECT_WRITE")),
line=int(source_range.get("line_start", 0)),
)
)
return [builder.freeze() for builder in routines_by_name.values()]
def _run_rust_bsl_parser(parser_path: str, source_file: Path) -> str:
command = [parser_path, str(source_file)]
completed = subprocess.run(command, check=False, capture_output=True, text=True, encoding="utf-8")
if completed.returncode != 0:
message = completed.stderr.strip() or completed.stdout.strip() or f"exit code {completed.returncode}"
raise RuntimeError(f"Rust BSL parser failed for {source_file}: {message}")
return completed.stdout
def parse_bsl_module(source: str) -> list[ParsedRoutine]:
routines: list[ParsedRoutine] = []
current: _RoutineBuilder | None = None
collecting_query = False
query_start = 0
query_lines: list[str] = []
object_targets: dict[str, str] = {}
for line_no, line in enumerate(source.splitlines(), start=1):
if match := _ROUTINE_START_RE.match(line):
current = _RoutineBuilder(
name=match.group("name"),
is_function=match.group("kind").lower() in {"функция", "function"},
export=_routine_has_export(line),
line_start=line_no,
line_end=line_no,
)
object_targets = {}
continue
if current is None:
continue
current.line_end = line_no
stripped = line.strip()
if _ROUTINE_END_RE.match(line):
if collecting_query:
_append_query(current, query_lines, query_start, line_no)
collecting_query = False
query_lines = []
routines.append(current.freeze())
current = None
continue
if collecting_query:
query_lines.append(_clean_query_line(stripped))
if stripped.endswith(";"):
_append_query(current, query_lines, query_start, line_no)
collecting_query = False
query_lines = []
continue
if object_target := _extract_object_create_target(stripped):
variable, target = object_target
object_targets[variable.lower()] = target
if inline_query := _inline_new_query_text(stripped):
_append_query(current, [_clean_query_line(inline_query)], line_no, line_no)
continue
if _QUERY_TEXT_RE.search(line):
collecting_query = True
query_start = line_no
inline_query = _query_text_after_assignment(stripped)
if inline_query:
query_lines.append(_clean_query_line(inline_query))
if stripped.endswith(";"):
_append_query(current, query_lines, query_start, line_no)
collecting_query = False
query_lines = []
continue
if call_match := _CALL_RE.match(line):
callee = call_match.group("name")
if "." not in stripped and callee.lower() not in {"if", "если"}:
current.calls.append((callee, line_no))
elif call_match := _ASSIGNMENT_CALL_RE.search(line):
callee = call_match.group("name")
if "." not in stripped:
current.calls.append((callee, line_no))
elif call_match := _CONDITION_CALL_RE.search(line):
current.calls.append((call_match.group("name"), line_no))
if write := _extract_object_write(stripped, object_targets):
current.writes.append(ParsedWrite(write[0], write[1], line_no))
elif write := _extract_write(stripped):
current.writes.append(ParsedWrite(write[0], write[1], line_no))
return routines
def _bsl_parse_diagnostics(source_path: str, source: str, source_hash: str) -> list[Diagnostic]:
diagnostics: list[Diagnostic] = []
routine_stack: list[tuple[str, int]] = []
collecting_query = False
query_start = 0
for line_no, line in enumerate(source.splitlines(), start=1):
stripped = line.strip()
if _ROUTINE_START_RE.match(line):
if routine_stack:
name, start = routine_stack[-1]
diagnostics.append(
_diagnostic(
"BSL_NESTED_ROUTINE",
DiagnosticSeverity.ERROR,
f"Routine starts before previous routine is closed: {name}",
source_path,
line_no,
source_hash,
{"open_routine_line": start},
)
)
routine_stack.append((_ROUTINE_START_RE.match(line).group("name"), line_no))
continue
if collecting_query and stripped.endswith(";"):
collecting_query = False
if routine_stack and _ROUTINE_END_RE.match(line):
if collecting_query:
diagnostics.append(
_diagnostic(
"BSL_UNCLOSED_QUERY",
DiagnosticSeverity.ERROR,
"Query text assignment is not closed before routine end",
source_path,
query_start,
source_hash,
)
)
collecting_query = False
routine_stack.pop()
continue
if routine_stack and not collecting_query and _QUERY_TEXT_RE.search(line):
collecting_query = True
query_start = line_no
if stripped.endswith(";"):
collecting_query = False
if collecting_query:
diagnostics.append(
_diagnostic(
"BSL_UNCLOSED_QUERY",
DiagnosticSeverity.ERROR,
"Query text assignment is not closed before end of file",
source_path,
query_start,
source_hash,
)
)
for routine_name, line_start in routine_stack:
diagnostics.append(
_diagnostic(
"BSL_UNCLOSED_ROUTINE",
DiagnosticSeverity.ERROR,
f"Routine is not closed: {routine_name}",
source_path,
line_start,
source_hash,
)
)
return diagnostics
def _diagnostic(
code: str,
severity: DiagnosticSeverity,
message: str,
source_path: str,
line: int,
source_hash: str,
attributes: dict | None = None,
) -> Diagnostic:
return Diagnostic(
diagnostic_id=f"diag.{_stable_hash(f'{code}:{source_path}:{line}:{message}')}",
code=code,
severity=severity,
message=message,
source_ref=SourceRef(
source_path=source_path,
line_start=line,
line_end=line,
column_start=1,
source_hash=source_hash,
),
attributes=attributes or {},
)
def _append_query(current: _RoutineBuilder, lines: list[str], start: int, end: int) -> None:
text = "\n".join(line for line in lines if line)
current.queries.append(ParsedQuery(text=text, tables=tuple(_extract_tables(text)), line_start=start, line_end=end))
def _clean_query_line(line: str) -> str:
value = line.strip().rstrip(";").strip().strip('"').strip()
if value.startswith("|"):
value = value[1:].strip()
return value.strip('"').strip()
def _query_text_after_assignment(line: str) -> str:
if "=" not in line:
return ""
return line.split("=", 1)[1].strip()
def _inline_new_query_text(line: str) -> str:
match = _INLINE_NEW_QUERY_RE.search(line)
return match.group("query") if match else ""
def _extract_tables(query_text: str) -> list[str]:
lines = query_text.splitlines()
tables: list[str] = []
for index, line in enumerate(lines):
inline_table = _table_after_from(line)
if inline_table:
tables.append(inline_table)
join_table = _table_after_join(line)
if join_table:
tables.append(join_table)
elif _FROM_RE.match(line) and index + 1 < len(lines):
table = lines[index + 1].strip().split()[0].rstrip(",")
if table:
tables.append(table)
return tables
def _table_after_from(line: str) -> str:
match = re.search(r"\b(ИЗ|FROM)\s+(?P<table>[^\s,;]+)", line, re.IGNORECASE)
return match.group("table").rstrip(",") if match else ""
def _table_after_join(line: str) -> str:
match = re.search(r"\b(СОЕДИНЕНИЕ|JOIN)\s+(?P<table>[^\s,;]+)", line, re.IGNORECASE)
return match.group("table").rstrip(",") if match else ""
def _extract_write(line: str) -> tuple[str, str] | None:
lowered = line.lower()
if "движения." in lowered and ".записать" in lowered:
match = re.search(r"Движения\.([A-Za-zА-Яа-я0-9_]+)\.", line, re.IGNORECASE)
return ((match.group(1) if match else "unknown"), "REGISTER_WRITE")
if "movements." in lowered and ".write" in lowered:
match = re.search(r"Movements\.([A-Za-zА-Яа-я0-9_]+)\.", line, re.IGNORECASE)
return ((match.group(1) if match else "unknown"), "REGISTER_WRITE")
if ".записать()" in lowered or ".write()" in lowered:
return ("unknown", "OBJECT_WRITE")
return None
def _extract_object_create_target(line: str) -> tuple[str, str] | None:
match = _OBJECT_CREATE_RE.match(line)
if not match:
return None
factory = match.group("factory").lower()
name = match.group("name")
prefixes = {
"справочники": "Справочник",
"catalogs": "Справочник",
"документы": "Документ",
"documents": "Документ",
"регистрысведений": "РегистрСведений",
"informationregisters": "РегистрСведений",
"регистрынакопления": "РегистрНакопления",
"accumulationregisters": "РегистрНакопления",
"регистрыбухгалтерии": "РегистрБухгалтерии",
"accountingregisters": "РегистрБухгалтерии",
"регистрырасчета": "РегистрРасчета",
"calculationregisters": "РегистрРасчета",
}
prefix = prefixes[factory]
return match.group("var"), f"{prefix}.{name}"
def _extract_object_write(line: str, object_targets: dict[str, str]) -> tuple[str, str] | None:
match = re.match(
r"^\s*(?P<var>[A-Za-zА-Яа-я_][\wА-Яа-я]*)\.(Записать|Write)\s*\(",
line,
re.IGNORECASE,
)
if not match:
return None
target = object_targets.get(match.group("var").lower())
if not target:
return None
write_type = "REGISTER_WRITE" if target.lower().startswith("регистр") else "OBJECT_WRITE"
return target, write_type
def _routine_has_export(line: str) -> bool:
return bool(re.search(r"\b(Экспорт|Export)\b", line, re.IGNORECASE))
def _node(
kind: NodeKind,
name: str,
qualified_name: str,
stable_key: str,
source_ref: SourceRef,
attributes: dict | None = None,
) -> SemanticNode:
return SemanticNode(
semantic_id=make_semantic_id(kind.value, qualified_name),
lineage_id=make_lineage_id(kind.value, stable_key),
kind=kind,
name=name,
qualified_name=qualified_name,
source_ref=source_ref,
attributes=attributes or {},
)
def _table_node(table: str, source_path: str, source_hash: str) -> SemanticNode:
kind = NodeKind.REGISTER if table.lower().startswith("регистр") else NodeKind.TABLE
return _node(
kind,
table.split(".")[-1],
table,
table.lower(),
SourceRef(source_path=source_path, source_hash=source_hash),
)
def _register_node(register: str, source_path: str, source_hash: str) -> SemanticNode:
qualified_name = register if register.lower().startswith("регистр") else f"РегистрНакопления.{register}"
return _node(
NodeKind.REGISTER,
register.split(".")[-1],
qualified_name,
qualified_name.lower(),
SourceRef(source_path=source_path, source_hash=source_hash),
)
def _write_target_node(write: ParsedWrite, source_path: str, source_hash: str) -> SemanticNode:
if write.write_type == "OBJECT_WRITE":
lowered = write.target.lower()
if lowered.startswith("справочник.") or lowered.startswith("catalog."):
return _metadata_reference_node(NodeKind.CATALOG, write.target, source_path, source_hash)
if lowered.startswith("документ.") or lowered.startswith("document."):
return _metadata_reference_node(NodeKind.DOCUMENT, write.target, source_path, source_hash)
return _register_node(write.target, source_path, source_hash)
def _metadata_reference_node(
kind: NodeKind,
qualified_name: str,
source_path: str,
source_hash: str,
) -> SemanticNode:
return _node(
kind,
qualified_name.split(".")[-1],
qualified_name,
qualified_name.lower(),
SourceRef(source_path=source_path, source_hash=source_hash),
)
def _module_integration_graph(
module: SemanticNode,
text: str,
source_path: str,
source_hash: str,
) -> tuple[list[SemanticNode], list[SemanticEdge]]:
endpoints: list[tuple[str, str, dict]] = []
for url in _URL_RE.findall(text):
endpoints.append((url, "HTTP_SERVICE", {"url": url, "direction": "OUTBOUND"}))
if "HTTPСоединение" in text or "HTTPConnection" in text:
endpoints.append(("HTTPConnection", "HTTP_SERVICE", {"direction": "OUTBOUND"}))
if "WSПрокси" in text or "WSProxy" in text or "WSСсылка" in text:
endpoints.append(("WSProxy", "WEB_SERVICE", {"direction": "OUTBOUND"}))
if "FTPСоединение" in text or "FTPConnection" in text:
endpoints.append(("FTPConnection", "FILE_EXCHANGE", {"direction": "OUTBOUND"}))
if "COMОбъект" in text or "COMObject" in text:
endpoints.append(("COMObject", "COM_CONNECTOR", {"direction": "OUTBOUND"}))
nodes: list[SemanticNode] = []
edges: list[SemanticEdge] = []
seen: set[tuple[str, str]] = set()
for name, kind, attributes in endpoints:
key = (name, kind)
if key in seen:
continue
seen.add(key)
endpoint = _node(
NodeKind.INTEGRATION_ENDPOINT,
name,
f"{module.qualified_name}.{kind}.{name}",
f"{module.lineage_id}:integration:{kind}:{name}",
SourceRef(source_path=source_path, source_hash=source_hash),
{"integration_kind": kind, **attributes},
)
nodes.append(endpoint)
edges.append(
_edge(
EdgeKind.USES_INTEGRATION,
module,
endpoint,
source_path,
1,
{"integration_kind": kind, **attributes},
)
)
return nodes, edges
def _xml_node_kind(object_kind: str) -> NodeKind | None:
return {
"CATALOG": NodeKind.CATALOG,
"DOCUMENT": NodeKind.DOCUMENT,
"CONSTANT": NodeKind.CONSTANT,
"DOCUMENT_JOURNAL": NodeKind.DOCUMENT_JOURNAL,
"ENUM": NodeKind.ENUM,
"REPORT": NodeKind.REPORT,
"DATA_PROCESSOR": NodeKind.DATA_PROCESSOR,
"CHART_OF_CHARACTERISTIC_TYPES": NodeKind.CHART_OF_CHARACTERISTIC_TYPES,
"CHART_OF_ACCOUNTS": NodeKind.CHART_OF_ACCOUNTS,
"CHART_OF_CALCULATION_TYPES": NodeKind.CHART_OF_CALCULATION_TYPES,
"REGISTER": NodeKind.REGISTER,
"INFORMATION_REGISTER": NodeKind.REGISTER,
"ACCUMULATION_REGISTER": NodeKind.REGISTER,
"ACCOUNTING_REGISTER": NodeKind.REGISTER,
"CALCULATION_REGISTER": NodeKind.REGISTER,
"COMMON_MODULE": NodeKind.COMMON_MODULE,
"EXCHANGE_PLAN": NodeKind.EXCHANGE_PLAN,
"EXTERNAL_DATA_SOURCE": NodeKind.EXTERNAL_DATA_SOURCE,
"SCHEDULED_JOB": NodeKind.SCHEDULED_JOB,
"BUSINESS_PROCESS": NodeKind.BUSINESS_PROCESS,
"TASK": NodeKind.TASK,
"SUBSYSTEM": NodeKind.SUBSYSTEM,
"HTTP_SERVICE": NodeKind.HTTP_SERVICE,
"XDTO_PACKAGE": NodeKind.XDTO_PACKAGE,
"EXTENSION": NodeKind.EXTENSION,
"LAYOUT": NodeKind.LAYOUT,
"MOVEMENT": NodeKind.MOVEMENT,
"ROLE": NodeKind.ROLE,
"FORM": NodeKind.FORM,
"COMMAND": NodeKind.COMMAND,
"ATTRIBUTE": NodeKind.ATTRIBUTE,
"TABULAR_SECTION": NodeKind.TABULAR_SECTION,
"ELEMENT": NodeKind.FORM_ELEMENT,
}.get(object_kind)
def _xml_edge_kind(kind: NodeKind) -> EdgeKind:
if kind == NodeKind.FORM:
return EdgeKind.HAS_FORM
if kind == NodeKind.COMMAND:
return EdgeKind.HAS_COMMAND
if kind == NodeKind.ATTRIBUTE:
return EdgeKind.HAS_ATTRIBUTE
if kind == NodeKind.TABULAR_SECTION:
return EdgeKind.HAS_TABULAR_SECTION
if kind == NodeKind.ROLE:
return EdgeKind.HAS_ROLE
if kind == NodeKind.FORM_ELEMENT:
return EdgeKind.HAS_ELEMENT
return EdgeKind.CONTAINS
def _find_xml_parent(parents: dict[str, SemanticNode], qualified_name: str) -> SemanticNode | None:
candidates = [
parent
for prefix, parent in parents.items()
if qualified_name == prefix or qualified_name.startswith(f"{prefix}.")
]
if not candidates:
return None
return max(candidates, key=lambda node: len(node.qualified_name))
def _edge(
kind: EdgeKind,
source: SemanticNode,
target: SemanticNode,
source_path: str,
line: int,
attributes: dict | None = None,
) -> SemanticEdge:
key = f"{kind.value}:{source.lineage_id}:{target.lineage_id}:{source_path}:{line}"
return SemanticEdge(
edge_id=f"edge.{_stable_hash(key)}",
kind=kind,
source_lineage=source.lineage_id,
target_lineage=target.lineage_id,
source_ref=SourceRef(source_path=source_path, line_start=line, line_end=line),
attributes=attributes or {},
)
def _find_declared_routine(nodes: list[SemanticNode], module_name: str, routine_name: str) -> SemanticNode | None:
qualified = f"{module_name}.{routine_name}"
return next((node for node in nodes if node.qualified_name == qualified), None)
def _link_metadata_to_modules(
root: Path,
module_nodes: dict[str, SemanticNode],
metadata_nodes: list[SemanticNode],
form_nodes: list[SemanticNode],
) -> list[SemanticEdge]:
if not metadata_nodes:
return []
by_qualified = {_normalize_lookup_key(node.qualified_name): node for node in metadata_nodes}
by_kind_and_name = {
(node.kind, _normalize_lookup_key(node.name)): node
for node in metadata_nodes
}
forms_by_qualified = {_normalize_lookup_key(node.qualified_name): node for node in form_nodes}
edges: list[SemanticEdge] = []
for source_path, module in module_nodes.items():
source_file = Path(source_path)
owner = _find_metadata_module_owner(root, source_file, by_qualified, by_kind_and_name)
if owner is None:
continue
line = module.source_ref.line_start or 1
module_role = _module_role(source_file)
form_name = _form_name_for_module(root, source_file)
object_part = _module_object_part(module_role, form_name)
module.attributes.update(
{
"owner_lineage_id": owner.lineage_id,
"owner_qualified_name": owner.qualified_name,
"owner_kind": owner.kind.value,
"object_part": object_part,
"module_role": module_role,
}
)
if form_name:
module.attributes["form_name"] = form_name
edge_attributes = {
"link_type": "METADATA_MODULE",
"module_role": module_role,
"object_part": object_part,
"form_name": form_name,
}
edges.append(
_edge(
EdgeKind.CONTAINS,
owner,
module,
source_path,
line,
edge_attributes,
)
)
if module_role == "FORM_MODULE" and form_name:
form_node = _find_form_node_for_module(owner, form_name, forms_by_qualified)
if form_node is not None:
module.attributes["form_lineage_id"] = form_node.lineage_id
module.attributes["form_qualified_name"] = form_node.qualified_name
edges.append(
_edge(
EdgeKind.CONTAINS,
form_node,
module,
source_path,
line,
{**edge_attributes, "link_type": "FORM_MODULE"},
)
)
return edges
def _find_form_node_for_module(
owner: SemanticNode,
form_name: str,
forms_by_qualified: dict[str, SemanticNode],
) -> SemanticNode | None:
candidates = [
f"{owner.qualified_name}.{form_name}",
f"{owner.qualified_name}.Форма.{form_name}",
]
for candidate in candidates:
form = forms_by_qualified.get(_normalize_lookup_key(candidate))
if form is not None:
return form
suffix = f".{form_name}".casefold()
return next(
(
form
for key, form in forms_by_qualified.items()
if key.endswith(suffix) and key.startswith(_normalize_lookup_key(owner.qualified_name))
),
None,
)
def _module_object_part(module_role: str, form_name: str = "") -> str:
return {
"OBJECT_MODULE": "object.module",
"MANAGER_MODULE": "object.manager",
"RECORD_SET_MODULE": "object.record_set",
"FORM_MODULE": f"form.{form_name}.module" if form_name else "form.module",
"MODULE": "module",
}.get(module_role, "module")
def _link_role_rights(nodes: list[SemanticNode], role_rights: list[dict]) -> list[SemanticEdge]:
if not role_rights:
return []
by_qualified = {_normalize_lookup_key(node.qualified_name): node for node in nodes}
by_role_name = {
_normalize_lookup_key(node.name): node
for node in nodes
if node.kind == NodeKind.ROLE
}
edges: list[SemanticEdge] = []
for right in role_rights:
role_name = str(right.get("role", ""))
target_name = str(right.get("target", ""))
role = by_qualified.get(_normalize_lookup_key(role_name)) or by_role_name.get(_normalize_lookup_key(role_name))
target = by_qualified.get(_normalize_lookup_key(target_name))
if role is None or target is None:
continue
attributes = {
key: value
for key, value in right.items()
if key not in {"role", "target", "object", "Object", "metadata", "Metadata"}
}
edges.append(
_edge(
EdgeKind.GRANTS_ACCESS,
role,
target,
role.source_ref.source_path,
role.source_ref.line_start or 1,
attributes,
)
)
return edges
def _link_scheduled_jobs_to_routines(
scheduled_jobs: list[SemanticNode],
routine_by_name: dict[str, SemanticNode],
) -> list[SemanticEdge]:
edges: list[SemanticEdge] = []
for job in scheduled_jobs:
routine_name = _scheduled_job_routine_name(job.attributes)
if not routine_name:
continue
routine = routine_by_name.get(routine_name.casefold())
if routine is None:
continue
edges.append(
_edge(
EdgeKind.RUNS,
job,
routine,
job.source_ref.source_path,
job.source_ref.line_start or 1,
{"routine_name": routine_name},
)
)
return edges
def _scheduled_job_routine_name(attributes: dict) -> str:
for key in (
"method",
"Method",
"methodName",
"MethodName",
"routine",
"Routine",
"routineName",
"RoutineName",
"handler",
"Handler",
"ИмяМетода",
"Метод",
"Процедура",
):
value = attributes.get(key)
if value:
return str(value).split(".")[-1]
return ""
def _link_commands_to_handlers(
commands: list[SemanticNode],
routine_by_name: dict[str, SemanticNode],
) -> list[SemanticEdge]:
edges: list[SemanticEdge] = []
for command in commands:
handler_name = _command_handler_name(command.attributes)
if not handler_name:
continue
handler = routine_by_name.get(handler_name.casefold())
if handler is None:
continue
edges.append(
_edge(
EdgeKind.HANDLES,
command,
handler,
command.source_ref.source_path,
command.source_ref.line_start or 1,
{"handler_name": handler_name},
)
)
return edges
def _command_handler_name(attributes: dict) -> str:
for key in (
"action",
"Action",
"handler",
"Handler",
"method",
"Method",
"methodName",
"MethodName",
"Действие",
"Обработчик",
"Метод",
"ИмяМетода",
):
value = attributes.get(key)
if value:
return str(value).split(".")[-1]
return ""
def _link_forms_to_handlers(
forms: list[SemanticNode],
routine_by_name: dict[str, SemanticNode],
) -> list[SemanticEdge]:
edges: list[SemanticEdge] = []
for form in forms:
for source_key, handler_name in _form_handler_names(form.attributes):
handler = routine_by_name.get(handler_name.casefold())
if handler is None:
continue
edges.append(
_edge(
EdgeKind.HANDLES,
form,
handler,
form.source_ref.source_path,
form.source_ref.line_start or 1,
{
"handler_name": handler_name,
"handler_source": source_key,
"link_type": "FORM_EVENT",
},
)
)
return edges
def _form_handler_names(attributes: dict) -> list[tuple[str, str]]:
handler_keys = {
"oncreate",
"onopen",
"onclose",
"beforeclose",
"beforewrite",
"afterwrite",
"onread",
"onchange",
"event",
"handler",
"method",
"methodname",
"присозданиинсервере",
"присозданиинаклиенте",
"приоткрытии",
"передзакрытием",
"призакрытии",
"передзаписью",
"призаписи",
"причтении",
"приизменении",
"событие",
"обработчик",
"метод",
"имяметода",
}
handlers: list[tuple[str, str]] = []
for key, value in attributes.items():
if value and str(key).casefold() in handler_keys:
handlers.append((str(key), str(value).split(".")[-1]))
return handlers
def _find_metadata_module_owner(
root: Path,
source_file: Path,
by_qualified: dict[str, SemanticNode],
by_kind_and_name: dict[tuple[NodeKind, str], SemanticNode],
) -> SemanticNode | None:
for qualified_name, kind, name in _metadata_owner_candidates(root, source_file):
owner = by_qualified.get(_normalize_lookup_key(qualified_name))
if owner is not None:
return owner
owner = by_kind_and_name.get((kind, _normalize_lookup_key(name)))
if owner is not None:
return owner
return None
def _metadata_owner_candidates(root: Path, source_file: Path) -> list[tuple[str, NodeKind, str]]:
relative = _relative_path(source_file, root)
parts = list(relative.parts)
normalized_parts = [_normalize_path_part(part) for part in parts]
candidates: list[tuple[str, NodeKind, str]] = []
for index, part in enumerate(normalized_parts[:-1]):
alias = _PATH_METADATA_ALIASES.get(part)
if alias is None:
continue
prefix, kind = alias
if index + 1 >= len(parts):
continue
name = parts[index + 1]
if _normalize_path_part(name) in {"ext", "forms", "формы", "templates", "макеты"}:
continue
candidates.append((f"{prefix}.{name}", kind, name))
if len(parts) >= 2:
module_name = source_file.stem
parent_name = parts[-2]
for alias_part in ("commonmodules", "общиемодули"):
if alias_part in normalized_parts:
candidates.append((f"ОбщийМодуль.{module_name}", NodeKind.COMMON_MODULE, module_name))
candidates.append((parent_name, NodeKind.COMMON_MODULE, parent_name))
return candidates
def _relative_path(source_file: Path, root: Path) -> Path:
try:
base = root if root.is_dir() else root.parent
return source_file.resolve().relative_to(base.resolve())
except (OSError, ValueError):
return source_file
def _module_role(source_file: Path) -> str:
stem = source_file.stem.lower()
normalized_parts = [_normalize_path_part(part) for part in source_file.parts]
if any(part in {"forms", "формы"} for part in normalized_parts):
return "FORM_MODULE"
return {
"objectmodule": "OBJECT_MODULE",
"модульобъекта": "OBJECT_MODULE",
"managermodule": "MANAGER_MODULE",
"модульменеджера": "MANAGER_MODULE",
"recordsetmodule": "RECORD_SET_MODULE",
"модульнабора": "RECORD_SET_MODULE",
"module": "MODULE",
"модуль": "MODULE",
}.get(stem, "MODULE")
def _form_name_for_module(root: Path, source_file: Path) -> str:
parts = list(_relative_path(source_file, root).parts)
normalized_parts = [_normalize_path_part(part) for part in parts]
for marker in ("forms", "формы"):
if marker in normalized_parts:
index = normalized_parts.index(marker)
if index + 1 < len(parts):
return parts[index + 1]
return ""
def _normalize_path_part(value: str) -> str:
return re.sub(r"[\s_.-]+", "", value).lower()
def _normalize_lookup_key(value: str) -> str:
return value.replace("\\", "/").lower()
def _dedupe_nodes(nodes: list[SemanticNode]) -> list[SemanticNode]:
deduped, _aliases = _dedupe_nodes_with_aliases(nodes)
return deduped
def _dedupe_nodes_with_aliases(nodes: list[SemanticNode]) -> tuple[list[SemanticNode], dict[str, str]]:
seen_lineages: set[str] = set()
seen_semantic_ids: dict[str, str] = {}
result: list[SemanticNode] = []
lineage_aliases: dict[str, str] = {}
for node in nodes:
if node.lineage_id in seen_lineages:
continue
canonical_lineage = seen_semantic_ids.get(node.semantic_id)
if canonical_lineage is not None:
lineage_aliases[node.lineage_id] = canonical_lineage
seen_lineages.add(node.lineage_id)
continue
seen_lineages.add(node.lineage_id)
seen_semantic_ids[node.semantic_id] = node.lineage_id
result.append(node)
return result, lineage_aliases
def _remap_and_dedupe_edges(
edges: list[SemanticEdge],
nodes: list[SemanticNode],
lineage_aliases: dict[str, str],
) -> tuple[list[SemanticEdge], list[Diagnostic]]:
known_lineages = {node.lineage_id for node in nodes}
remapped_edges: list[SemanticEdge] = []
diagnostics: list[Diagnostic] = []
for edge in edges:
source_lineage = lineage_aliases.get(edge.source_lineage, edge.source_lineage)
target_lineage = lineage_aliases.get(edge.target_lineage, edge.target_lineage)
if source_lineage not in known_lineages or target_lineage not in known_lineages:
diagnostics.append(_dangling_edge_diagnostic(edge, source_lineage, target_lineage))
continue
if source_lineage != edge.source_lineage or target_lineage != edge.target_lineage:
edge = edge.model_copy(update={"source_lineage": source_lineage, "target_lineage": target_lineage})
remapped_edges.append(edge)
return _dedupe_edges(remapped_edges), diagnostics
def _dangling_edge_diagnostic(edge: SemanticEdge, source_lineage: str, target_lineage: str) -> Diagnostic:
source_path = edge.source_ref.source_path if edge.source_ref else ""
line = edge.source_ref.line_start if edge.source_ref and edge.source_ref.line_start else 1
source_hash = edge.source_ref.source_hash if edge.source_ref and edge.source_ref.source_hash else ""
return _diagnostic(
"SIR_DANGLING_EDGE_DROPPED",
DiagnosticSeverity.WARNING,
f"Dropped dangling edge {edge.kind.value}: {source_lineage} -> {target_lineage}",
source_path,
line,
source_hash,
{"edge_id": edge.edge_id, "source_lineage": edge.source_lineage, "target_lineage": edge.target_lineage},
)
def _dedupe_edges(edges: list[SemanticEdge]) -> list[SemanticEdge]:
seen: dict[str, SemanticEdge] = {}
for edge in edges:
seen.setdefault(edge.edge_id, edge)
return list(seen.values())
def _source_hash(source: str) -> str:
return hashlib.sha256(source.encode("utf-8")).hexdigest()
def _read_text_file(path: Path) -> str:
data = path.read_bytes()
for encoding in ("utf-8-sig", "utf-16", "cp1251"):
try:
return data.decode(encoding)
except UnicodeDecodeError:
continue
return data.decode("utf-8", errors="replace")
def _stable_hash(value: str) -> str:
return hashlib.sha1(value.encode("utf-8")).hexdigest()[:16]
__all__ = [
"ParsedQuery",
"ParsedRoutine",
"ParsedWrite",
"index_project",
"parse_bsl_module",
"parse_bsl_module_file",
"parse_bsl_module_from_rust_json",
"resolve_rust_bsl_parser",
]