Harden HTML5 SSE and local assets
CI / python (push) Has been cancelled
CI / rust (push) Has been cancelled

This commit is contained in:
2026-05-17 10:49:48 +03:00
parent 65c82c4fed
commit 22f59b7580
5 changed files with 535 additions and 34 deletions
+98 -22
View File
@@ -33,6 +33,7 @@ from collaboration import (
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import PlainTextResponse, Response, StreamingResponse
from fastapi.staticfiles import StaticFiles
from neo4j import AsyncGraphDatabase
from pydantic import BaseModel, Field
@@ -50,6 +51,7 @@ from api_server.html5 import (
render_html5_metadata_preview_result,
render_html5_object_context,
render_html5_object_report,
render_html5_operation_detail,
render_html5_project_setup,
render_html5_project_rows,
render_html5_project_report,
@@ -125,6 +127,8 @@ from transaction_topology import routines_touching_target, transaction_write_set
from ui_semantics import form_semantics
app = FastAPI(title="SFERA API", version="0.1.0")
_HTML5_ASSETS_DIR = Path(__file__).resolve().parent / "static" / "html5"
app.mount("/html5/assets", StaticFiles(directory=_HTML5_ASSETS_DIR), name="html5-assets")
app.add_middleware(
CORSMiddleware,
allow_origin_regex=os.environ.get(
@@ -1631,36 +1635,59 @@ async def html5_delete_project(project_id: str, request: Request) -> Response:
@app.get("/html5/operations")
async def html5_operations() -> Response:
async def html5_operations(project_id: str = "", status: str = "", kind: str = "") -> Response:
return Response(
render_html5_operations(_html5_operation_jobs()),
render_html5_operations(
_html5_operation_jobs(project_id=project_id, status=status, kind=kind),
project_id=project_id,
status=status,
kind=kind,
),
media_type="text/html; charset=utf-8",
)
@app.get("/html5/operations/jobs")
async def html5_operation_jobs() -> Response:
async def html5_operation_jobs(project_id: str = "", status: str = "", kind: str = "") -> Response:
return Response(
render_html5_operation_rows(_html5_operation_jobs()),
render_html5_operation_rows(_html5_operation_jobs(project_id=project_id, status=status, kind=kind)),
media_type="text/html; charset=utf-8",
)
@app.get("/html5/operations/jobs/{job_id}/detail")
async def html5_operation_job_detail(job_id: str) -> Response:
job = _operations.jobs.get(job_id)
if job is None:
raise HTTPException(status_code=404, detail=f"Unknown operation job: {job_id}")
return Response(
render_html5_operation_detail(job),
media_type="text/html; charset=utf-8",
)
@app.get("/html5/operations/summary")
async def html5_operation_summary() -> Response:
async def html5_operation_summary(project_id: str = "", status: str = "", kind: str = "") -> Response:
return Response(
render_html5_operation_summary(_html5_operation_jobs()),
render_html5_operation_summary(_html5_operation_jobs(project_id=project_id, status=status, kind=kind)),
media_type="text/html; charset=utf-8",
)
@app.get("/html5/operations/events")
async def html5_operations_events(once: bool = False) -> StreamingResponse:
async def html5_operations_events(
once: bool = False,
project_id: str = "",
status: str = "",
kind: str = "",
) -> StreamingResponse:
def stream_operations():
last_fragments: dict[str, str] = {}
while True:
jobs = _html5_operation_jobs()
yield _html5_sse_event("operations-summary", render_html5_operation_summary(jobs))
yield _html5_sse_event("operations-jobs", render_html5_operation_rows(jobs))
yield _html5_sse_comment("operations heartbeat")
jobs = _html5_operation_jobs(project_id=project_id, status=status, kind=kind)
yield from _html5_sse_if_changed(last_fragments, "operations-summary", render_html5_operation_summary(jobs))
yield from _html5_sse_if_changed(last_fragments, "operations-jobs", render_html5_operation_rows(jobs))
if once:
break
time.sleep(3)
@@ -1696,7 +1723,9 @@ async def html5_project_editor(project_id: str, q: str = "") -> Response:
@app.get("/html5/projects/{project_id}/events")
async def html5_project_events(project_id: str, once: bool = False) -> StreamingResponse:
async def stream_status():
last_fragments: dict[str, str] = {}
while True:
yield _html5_sse_comment(f"project {project_id} heartbeat")
try:
snapshot = _project_snapshot_or_404(project_id)
fragment = render_html5_status(project_id, snapshot)
@@ -1708,17 +1737,23 @@ async def html5_project_events(project_id: str, once: bool = False) -> Streaming
report = None
findings = None
flowchart = None
yield _html5_sse_event("status", fragment)
yield _html5_sse_event(
for event_text in _html5_sse_if_changed(last_fragments, "status", fragment):
yield event_text
for event_text in _html5_sse_if_changed(
last_fragments,
"authoring-changes",
render_html5_authoring_changes(project_id, _authoring_change_summaries(project_id)),
)
):
yield event_text
if report is not None:
yield _html5_sse_event("project-report", render_html5_project_report(project_id, report))
for event_text in _html5_sse_if_changed(last_fragments, "project-report", render_html5_project_report(project_id, report)):
yield event_text
if findings is not None:
yield _html5_sse_event("project-review", render_html5_review(project_id, findings))
for event_text in _html5_sse_if_changed(last_fragments, "project-review", render_html5_review(project_id, findings)):
yield event_text
if flowchart is not None:
yield _html5_sse_event("project-flowchart", render_html5_flowchart(project_id, flowchart))
for event_text in _html5_sse_if_changed(last_fragments, "project-flowchart", render_html5_flowchart(project_id, flowchart)):
yield event_text
if once:
break
await asyncio.sleep(5)
@@ -2024,10 +2059,27 @@ async def html5_project_setup_summary(project_id: str) -> Response:
@app.get("/html5/projects/{project_id}/setup/events")
async def html5_project_setup_events(project_id: str, once: bool = False) -> StreamingResponse:
async def stream_setup():
last_fragments: dict[str, str] = {}
while True:
setup = _project_setup_response(project_id)
yield _html5_sse_event("setup-summary", render_html5_setup_summary(project_id, setup))
yield _html5_sse_event("setup-import-job", render_html5_import_job(project_id, _html5_latest_import_job(project_id)))
yield _html5_sse_comment(f"setup {project_id} heartbeat")
try:
setup = _project_setup_response(project_id)
except HTTPException as error:
setup_error = f'<div class="setup-summary" data-html5-setup-summary><p class="muted padded">{error.detail}</p></div>'
for event_text in _html5_sse_if_changed(last_fragments, "setup-summary", setup_error):
yield event_text
if once:
break
await asyncio.sleep(2)
continue
for event_text in _html5_sse_if_changed(last_fragments, "setup-summary", render_html5_setup_summary(project_id, setup)):
yield event_text
for event_text in _html5_sse_if_changed(
last_fragments,
"setup-import-job",
render_html5_import_job(project_id, _html5_latest_import_job(project_id)),
):
yield event_text
if once:
break
await asyncio.sleep(2)
@@ -8406,8 +8458,21 @@ def _current_import_source(project_id: str) -> ImportSourceKind:
return ImportSourceKind.XML_DUMP
def _html5_operation_jobs() -> list[OperationJob]:
return sorted(_operations.jobs.values(), key=lambda job: job.updated_at, reverse=True)[:50]
def _html5_operation_jobs(project_id: str = "", status: str = "", kind: str = "") -> list[OperationJob]:
normalized_project = project_id.strip().casefold()
normalized_status = status.strip().casefold()
normalized_kind = kind.strip().casefold()
jobs = []
for job in _operations.jobs.values():
payload = job.payload or {}
if normalized_project and str(payload.get("project_id") or "").casefold() != normalized_project:
continue
if normalized_status and _operation_value(getattr(job, "status", "")).casefold() != normalized_status:
continue
if normalized_kind and _operation_value(getattr(job, "kind", "")).casefold() != normalized_kind:
continue
jobs.append(job)
return sorted(jobs, key=lambda job: job.updated_at, reverse=True)[:50]
def _html5_latest_import_job(project_id: str) -> OperationJob | None:
@@ -8425,7 +8490,18 @@ def _operation_value(value: object) -> str:
def _html5_sse_event(event: str, fragment: str) -> str:
data = "\n".join(f"data: {line}" for line in fragment.splitlines())
return f"event: {event}\n{data}\n\n"
return f"event: {event}\nretry: 5000\n{data}\n\n"
def _html5_sse_if_changed(last_fragments: dict[str, str], event: str, fragment: str):
if last_fragments.get(event) == fragment:
return
last_fragments[event] = fragment
yield _html5_sse_event(event, fragment)
def _html5_sse_comment(message: str) -> str:
return f": {message}\n\n"
def _project_summaries() -> list[ProjectSummaryResponse]: