99 lines
3.5 KiB
Python
99 lines
3.5 KiB
Python
from pathlib import Path
|
|
|
|
from projection_engine import InMemoryProjection, Neo4jProjection
|
|
from semantic_kernel import index_project
|
|
from sir import SirDelta
|
|
|
|
|
|
def test_projection_queries(tmp_path: Path):
|
|
module = tmp_path / "demo_module.bsl"
|
|
module.write_text(
|
|
"""
|
|
Процедура Проведение()
|
|
ПроверитьОстатки();
|
|
Движения.ОстаткиТоваров.Записать();
|
|
КонецПроцедуры
|
|
|
|
Процедура ПроверитьОстатки()
|
|
Запрос = Новый Запрос;
|
|
Запрос.Текст =
|
|
"ВЫБРАТЬ
|
|
Остатки.Номенклатура
|
|
ИЗ
|
|
РегистрНакопления.ОстаткиТоваров КАК Остатки";
|
|
КонецПроцедуры
|
|
""",
|
|
encoding="utf-8",
|
|
)
|
|
graph = InMemoryProjection()
|
|
graph.project_snapshot(index_project(tmp_path, project_id="demo"))
|
|
|
|
assert [node.name for node in graph.find_callees("Проведение")] == ["ПроверитьОстатки"]
|
|
assert [node.name for node in graph.find_callers("ПроверитьОстатки")] == ["Проведение"]
|
|
assert [node.name for node in graph.find_writes("Проведение")] == ["ОстаткиТоваров"]
|
|
assert [node.qualified_name for node in graph.find_query_tables("ПроверитьОстатки")] == [
|
|
"РегистрНакопления.ОстаткиТоваров"
|
|
]
|
|
|
|
|
|
class FakeAsyncResult:
|
|
async def single(self):
|
|
return {"count": 0}
|
|
|
|
|
|
class FakeSession:
|
|
def __init__(self) -> None:
|
|
self.runs: list[tuple[str, dict]] = []
|
|
|
|
async def __aenter__(self):
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc, tb):
|
|
return None
|
|
|
|
async def run(self, query: str, **parameters):
|
|
self.runs.append((query, parameters))
|
|
return FakeAsyncResult()
|
|
|
|
|
|
class FakeDriver:
|
|
def __init__(self) -> None:
|
|
self.session_instance = FakeSession()
|
|
|
|
def session(self):
|
|
return self.session_instance
|
|
|
|
|
|
async def test_neo4j_projection_applies_delta(tmp_path: Path):
|
|
module = tmp_path / "demo_module.bsl"
|
|
module.write_text("Процедура Проведение()\nКонецПроцедуры\n", encoding="utf-8")
|
|
previous = index_project(tmp_path, project_id="demo-delta")
|
|
|
|
module.write_text(
|
|
"""
|
|
Процедура Проведение()
|
|
Движения.ОстаткиТоваров.Записать();
|
|
КонецПроцедуры
|
|
""",
|
|
encoding="utf-8",
|
|
)
|
|
current = index_project(tmp_path, project_id="demo-delta")
|
|
delta = SirDelta(
|
|
delta_id="delta.test",
|
|
snapshot_from=previous.snapshot_id,
|
|
snapshot_to=current.snapshot_id,
|
|
added_nodes=[node for node in current.nodes if node.lineage_id not in {old.lineage_id for old in previous.nodes}],
|
|
updated_nodes=[],
|
|
removed_nodes=[],
|
|
added_edges=[edge for edge in current.edges if edge.edge_id not in {old.edge_id for old in previous.edges}],
|
|
removed_edges=[],
|
|
)
|
|
|
|
driver = FakeDriver()
|
|
await Neo4jProjection(driver).apply_delta(delta, project_id="demo-delta")
|
|
|
|
parameter_sets = [parameters for _query, parameters in driver.session_instance.runs]
|
|
assert any(parameters.get("project_id") == "demo-delta" for parameters in parameter_sets)
|
|
assert any(parameters.get("name") == "ОстаткиТоваров" for parameters in parameter_sets)
|
|
assert any(parameters.get("kind") == "WRITES" for parameters in parameter_sets)
|