from __future__ import annotations from pydantic import BaseModel, Field from sir import EdgeKind, NodeKind, SemanticNode, SirSnapshot class ScheduledJob(BaseModel): job_id: str name: str routine_name: str schedule: str | None = None attributes: dict = Field(default_factory=dict) class JobBinding(BaseModel): job: ScheduledJob routine: SemanticNode | None = None def bind_jobs(snapshot: SirSnapshot, jobs: list[ScheduledJob]) -> list[JobBinding]: routines = { node.name.casefold(): node for node in snapshot.nodes if node.kind.value in {"PROCEDURE", "FUNCTION"} } return [ JobBinding(job=job, routine=routines.get(job.routine_name.casefold())) for job in sorted(jobs, key=lambda item: item.name) ] def snapshot_scheduled_jobs(snapshot: SirSnapshot) -> list[JobBinding]: nodes = {node.lineage_id: node for node in snapshot.nodes} bindings: list[JobBinding] = [] for job_node in sorted( (node for node in snapshot.nodes if node.kind == NodeKind.SCHEDULED_JOB), key=lambda node: node.qualified_name, ): run_edge = next( ( edge for edge in snapshot.edges if edge.kind == EdgeKind.RUNS and edge.source_lineage == job_node.lineage_id ), None, ) routine = nodes.get(run_edge.target_lineage) if run_edge is not None else None routine_name = ( routine.name if routine is not None else str(job_node.attributes.get("method") or job_node.attributes.get("Method") or "") ) bindings.append( JobBinding( job=ScheduledJob( job_id=job_node.lineage_id, name=job_node.name, routine_name=routine_name, schedule=str(job_node.attributes.get("schedule") or job_node.attributes.get("Schedule") or "") or None, attributes=job_node.attributes, ), routine=routine, ) ) return bindings __all__ = ["JobBinding", "ScheduledJob", "bind_jobs", "snapshot_scheduled_jobs"]