from __future__ import annotations from pathlib import Path import os from semantic_kernel import index_project from sir import SirDelta, SirSnapshot, compute_snapshot_hash, validate_snapshot def source_hashes(snapshot: SirSnapshot) -> dict[str, str]: result: dict[str, str] = {} for node in snapshot.nodes: if node.source_ref.source_hash: result[node.source_ref.source_path] = node.source_ref.source_hash return result def changed_bsl_files(root: str | Path, snapshot: SirSnapshot) -> list[Path]: return changed_source_files(root, snapshot, suffixes={".bsl"}) def changed_source_files( root: str | Path, snapshot: SirSnapshot, *, suffixes: set[str] | None = None, ) -> list[Path]: base = Path(root) known_hashes = source_hashes(snapshot) changed: list[Path] = [] wanted_suffixes = {suffix.lower() for suffix in (suffixes or {".bsl", ".xml"})} for path in sorted(candidate for candidate in base.rglob("*") if candidate.suffix.lower() in wanted_suffixes): text = path.read_text(encoding="utf-8") new_snapshot = index_project(path, project_id=snapshot.project_id) new_hash = next(iter(source_hashes(new_snapshot).values()), None) if new_hash != known_hashes.get(path.as_posix()): changed.append(path) return changed def rebuild_changed_file(previous: SirSnapshot, changed_file: str | Path) -> tuple[SirSnapshot, SirDelta]: path = Path(changed_file) project_root = _project_root(previous, path) if project_root is not None and project_root.is_dir(): return rebuild_project(previous, project_root) return _rebuild_changed_file_fragment(previous, path) def rebuild_project(previous: SirSnapshot, project_root: str | Path) -> tuple[SirSnapshot, SirDelta]: current = index_project(project_root, project_id=previous.project_id) current.snapshot_id = previous.snapshot_id current.revision = previous.revision current.metadata.task_id = previous.metadata.task_id current.metadata.session_id = previous.metadata.session_id current.snapshot_hash = compute_snapshot_hash(current) validate_snapshot(current) return current, build_delta(previous, current) def _rebuild_changed_file_fragment(previous: SirSnapshot, changed_file: Path) -> tuple[SirSnapshot, SirDelta]: path = changed_file fragment = index_project(path, project_id=previous.project_id) changed_source = path.as_posix() old_fragment_lineages = { node.lineage_id for node in previous.nodes if node.source_ref.source_path == changed_source } next_fragment_lineages = {node.lineage_id for node in fragment.nodes} old_nodes = [node for node in previous.nodes if node.source_ref.source_path != changed_source] next_lineages = {node.lineage_id for node in old_nodes} | next_fragment_lineages old_edges = [ edge for edge in previous.edges if (edge.source_ref is None or edge.source_ref.source_path != changed_source) and edge.source_lineage not in old_fragment_lineages and edge.target_lineage in next_lineages ] next_snapshot = SirSnapshot( snapshot_id=previous.snapshot_id, project_id=previous.project_id, revision=previous.revision, metadata=previous.metadata, nodes=[*old_nodes, *fragment.nodes], edges=[*old_edges, *fragment.edges], diagnostics=[ diagnostic for diagnostic in previous.diagnostics if diagnostic.source_ref is None or diagnostic.source_ref.source_path != path.as_posix() ] + fragment.diagnostics, ) next_snapshot.snapshot_hash = compute_snapshot_hash(next_snapshot) validate_snapshot(next_snapshot) return next_snapshot, build_delta(previous, next_snapshot) def _project_root(previous: SirSnapshot, changed_file: Path) -> Path | None: if previous.metadata.source_root: return Path(previous.metadata.source_root) paths = [Path(path) for path in source_hashes(previous)] if not paths: return changed_file.parent if changed_file.parent.exists() else None try: common = Path(os.path.commonpath([path.as_posix() for path in [*paths, changed_file]])) return common if common.is_dir() else common.parent except ValueError: return None def build_delta(previous: SirSnapshot, current: SirSnapshot) -> SirDelta: previous_nodes = {node.lineage_id: node for node in previous.nodes} current_nodes = {node.lineage_id: node for node in current.nodes} previous_edges = {edge.edge_id: edge for edge in previous.edges} current_edges = {edge.edge_id: edge for edge in current.edges} added_nodes = [ node for lineage_id, node in current_nodes.items() if lineage_id not in previous_nodes ] updated_nodes = [ node for lineage_id, node in current_nodes.items() if lineage_id in previous_nodes and node != previous_nodes[lineage_id] ] removed_nodes = [ lineage_id for lineage_id in previous_nodes if lineage_id not in current_nodes ] added_edges = [ edge for edge_id, edge in current_edges.items() if edge_id not in previous_edges ] removed_edges = [ edge_id for edge_id in previous_edges if edge_id not in current_edges ] return SirDelta( delta_id=f"delta.{previous.snapshot_id}.{current.snapshot_hash}", snapshot_from=previous.snapshot_id, snapshot_to=current.snapshot_id, added_nodes=added_nodes, updated_nodes=updated_nodes, removed_nodes=removed_nodes, added_edges=added_edges, removed_edges=removed_edges, ) __all__ = [ "build_delta", "changed_bsl_files", "changed_source_files", "rebuild_changed_file", "rebuild_project", "source_hashes", ]