Speed up stored snapshot inventory
This commit is contained in:
@@ -45,12 +45,35 @@ class FileStorage:
|
|||||||
self.initialize()
|
self.initialize()
|
||||||
result: list[StoredSnapshotInfo] = []
|
result: list[StoredSnapshotInfo] = []
|
||||||
for path in sorted(self.snapshots_dir.glob("*.json")):
|
for path in sorted(self.snapshots_dir.glob("*.json")):
|
||||||
snapshot = snapshot_from_json(path.read_bytes())
|
try:
|
||||||
|
payload = orjson.loads(path.read_bytes())
|
||||||
|
except (FileNotFoundError, orjson.JSONDecodeError):
|
||||||
|
continue
|
||||||
|
project_id = payload.get("project_id")
|
||||||
|
snapshot_id = payload.get("snapshot_id")
|
||||||
|
if not isinstance(project_id, str) or not isinstance(snapshot_id, str):
|
||||||
|
continue
|
||||||
|
snapshot_hash = payload.get("snapshot_hash")
|
||||||
result.append(
|
result.append(
|
||||||
StoredSnapshotInfo(
|
StoredSnapshotInfo(
|
||||||
project_id=snapshot.project_id,
|
project_id=project_id,
|
||||||
snapshot_id=snapshot.snapshot_id,
|
snapshot_id=snapshot_id,
|
||||||
snapshot_hash=snapshot.snapshot_hash,
|
snapshot_hash=snapshot_hash if isinstance(snapshot_hash, str) else None,
|
||||||
|
path=path.as_posix(),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return result
|
||||||
|
|
||||||
|
def list_snapshot_refs(self) -> list[StoredSnapshotInfo]:
|
||||||
|
self.initialize()
|
||||||
|
result: list[StoredSnapshotInfo] = []
|
||||||
|
for path in sorted(self.snapshots_dir.glob("*.json")):
|
||||||
|
project_id = path.stem
|
||||||
|
result.append(
|
||||||
|
StoredSnapshotInfo(
|
||||||
|
project_id=project_id,
|
||||||
|
snapshot_id=f"stored.{project_id}",
|
||||||
|
snapshot_hash=None,
|
||||||
path=path.as_posix(),
|
path=path.as_posix(),
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
@@ -59,6 +82,11 @@ class FileStorage:
|
|||||||
def has_snapshot(self, project_id: str) -> bool:
|
def has_snapshot(self, project_id: str) -> bool:
|
||||||
return self._snapshot_path(project_id).exists()
|
return self._snapshot_path(project_id).exists()
|
||||||
|
|
||||||
|
def count_snapshots(self) -> int:
|
||||||
|
if not self.snapshots_dir.exists():
|
||||||
|
return 0
|
||||||
|
return sum(1 for _ in self.snapshots_dir.glob("*.json"))
|
||||||
|
|
||||||
def write_document(self, collection: str, document_id: str, payload: dict[str, Any]) -> Path:
|
def write_document(self, collection: str, document_id: str, payload: dict[str, Any]) -> Path:
|
||||||
directory = self._collection_dir(collection)
|
directory = self._collection_dir(collection)
|
||||||
directory.mkdir(parents=True, exist_ok=True)
|
directory.mkdir(parents=True, exist_ok=True)
|
||||||
|
|||||||
@@ -17,6 +17,10 @@ def test_file_storage_saves_and_loads_snapshot(tmp_path: Path):
|
|||||||
assert info.project_id == "demo"
|
assert info.project_id == "demo"
|
||||||
assert restored.snapshot_hash == snapshot.snapshot_hash
|
assert restored.snapshot_hash == snapshot.snapshot_hash
|
||||||
assert storage.list_snapshots()[0].snapshot_id == snapshot.snapshot_id
|
assert storage.list_snapshots()[0].snapshot_id == snapshot.snapshot_id
|
||||||
|
snapshot_ref = storage.list_snapshot_refs()[0]
|
||||||
|
assert snapshot_ref.project_id == "demo"
|
||||||
|
assert snapshot_ref.snapshot_id == "stored.demo"
|
||||||
|
assert snapshot_ref.path.endswith("demo.json")
|
||||||
|
|
||||||
|
|
||||||
def test_file_storage_generic_documents(tmp_path: Path):
|
def test_file_storage_generic_documents(tmp_path: Path):
|
||||||
|
|||||||
@@ -2937,7 +2937,7 @@ async def incremental_file(project_id: str, request: IncrementalFileRequest) ->
|
|||||||
|
|
||||||
@app.get("/storage/snapshots", response_model=list[StoredSnapshotInfo])
|
@app.get("/storage/snapshots", response_model=list[StoredSnapshotInfo])
|
||||||
async def list_stored_snapshots() -> list[StoredSnapshotInfo]:
|
async def list_stored_snapshots() -> list[StoredSnapshotInfo]:
|
||||||
return _storage.list_snapshots()
|
return _storage.list_snapshot_refs()
|
||||||
|
|
||||||
|
|
||||||
def _project_snapshot_or_404(project_id: str) -> SirSnapshot:
|
def _project_snapshot_or_404(project_id: str) -> SirSnapshot:
|
||||||
@@ -6612,7 +6612,7 @@ async def license_state() -> LicenseState:
|
|||||||
async def admin_summary() -> dict:
|
async def admin_summary() -> dict:
|
||||||
return {
|
return {
|
||||||
"indexed_projects": len(_snapshots),
|
"indexed_projects": len(_snapshots),
|
||||||
"stored_snapshots": len(_storage.list_snapshots()),
|
"stored_snapshots": _storage.count_snapshots(),
|
||||||
"knowledge_records": len(_knowledge.list_records()),
|
"knowledge_records": len(_knowledge.list_records()),
|
||||||
"knowledge_packs": len(_knowledge.list_packs()),
|
"knowledge_packs": len(_knowledge.list_packs()),
|
||||||
"users": len(_collaboration.users),
|
"users": len(_collaboration.users),
|
||||||
@@ -7476,12 +7476,12 @@ def _project_setup_response(project_id: str) -> ProjectSetupResponse:
|
|||||||
|
|
||||||
|
|
||||||
def _project_has_stored_snapshot(project_id: str) -> bool:
|
def _project_has_stored_snapshot(project_id: str) -> bool:
|
||||||
return any(snapshot.project_id == project_id for snapshot in _storage.list_snapshots())
|
return _storage.has_snapshot(project_id)
|
||||||
|
|
||||||
|
|
||||||
def _project_summaries() -> list[ProjectSummaryResponse]:
|
def _project_summaries() -> list[ProjectSummaryResponse]:
|
||||||
project_ids = set(_project_setup.keys())
|
project_ids = set(_project_setup.keys())
|
||||||
stored_snapshots = _storage.list_snapshots()
|
stored_snapshots = _storage.list_snapshot_refs()
|
||||||
project_ids.update(snapshot.project_id for snapshot in stored_snapshots)
|
project_ids.update(snapshot.project_id for snapshot in stored_snapshots)
|
||||||
snapshot_project_ids = {snapshot.project_id for snapshot in stored_snapshots}
|
snapshot_project_ids = {snapshot.project_id for snapshot in stored_snapshots}
|
||||||
result: list[ProjectSummaryResponse] = []
|
result: list[ProjectSummaryResponse] = []
|
||||||
@@ -7514,6 +7514,11 @@ def _normalize_project_id(project_id: str) -> str:
|
|||||||
|
|
||||||
def _delete_project_data(project_id: str, request: ProjectDeleteRequest) -> list[str]:
|
def _delete_project_data(project_id: str, request: ProjectDeleteRequest) -> list[str]:
|
||||||
deleted: list[str] = []
|
deleted: list[str] = []
|
||||||
|
had_indexed_data = (
|
||||||
|
project_id in _snapshots
|
||||||
|
or project_id in _normalized_projects
|
||||||
|
or _storage.has_snapshot(project_id)
|
||||||
|
)
|
||||||
_project_setup.pop(project_id, None)
|
_project_setup.pop(project_id, None)
|
||||||
_snapshots.pop(project_id, None)
|
_snapshots.pop(project_id, None)
|
||||||
_graphs.pop(project_id, None)
|
_graphs.pop(project_id, None)
|
||||||
@@ -7534,7 +7539,7 @@ def _delete_project_data(project_id: str, request: ProjectDeleteRequest) -> list
|
|||||||
if folder.exists():
|
if folder.exists():
|
||||||
shutil.rmtree(folder)
|
shutil.rmtree(folder)
|
||||||
deleted.append(folder.name)
|
deleted.append(folder.name)
|
||||||
if request.delete_versions:
|
if request.delete_versions and had_indexed_data:
|
||||||
count = _storage.delete_documents_matching("object_versions", lambda payload: payload.get("project_id") == project_id)
|
count = _storage.delete_documents_matching("object_versions", lambda payload: payload.get("project_id") == project_id)
|
||||||
if count:
|
if count:
|
||||||
deleted.append(f"object_versions:{count}")
|
deleted.append(f"object_versions:{count}")
|
||||||
|
|||||||
Reference in New Issue
Block a user