Extract HTML5 form helpers
This commit is contained in:
@@ -0,0 +1,78 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from urllib.parse import parse_qs
|
||||||
|
|
||||||
|
from fastapi import Request
|
||||||
|
|
||||||
|
|
||||||
|
async def html5_form_data(request: Request) -> dict[str, list[str]]:
|
||||||
|
body = (await request.body()).decode("utf-8")
|
||||||
|
return parse_qs(body, keep_blank_values=True)
|
||||||
|
|
||||||
|
|
||||||
|
def form_value(form: dict[str, list[str]], key: str) -> str | None:
|
||||||
|
values = form.get(key)
|
||||||
|
if not values:
|
||||||
|
return None
|
||||||
|
value = values[0].strip()
|
||||||
|
return value or None
|
||||||
|
|
||||||
|
|
||||||
|
def html5_metadata_payload(form: dict[str, list[str]]) -> dict:
|
||||||
|
return {
|
||||||
|
"object_kind": form_value(form, "object_kind") or "DOCUMENT",
|
||||||
|
"name": form_value(form, "name") or "",
|
||||||
|
"synonym": form_value(form, "synonym"),
|
||||||
|
"attributes": html5_metadata_attributes(form_value(form, "attributes") or ""),
|
||||||
|
"tabular_sections": html5_metadata_tabular_sections(form_value(form, "tabular_sections") or ""),
|
||||||
|
"forms": html5_csv_values(form_value(form, "forms") or ""),
|
||||||
|
"commands": html5_metadata_commands(form_value(form, "commands") or ""),
|
||||||
|
"task_id": form_value(form, "task_id"),
|
||||||
|
"session_id": form_value(form, "session_id"),
|
||||||
|
"user_id": form_value(form, "user_id"),
|
||||||
|
"_raw_attributes": form_value(form, "attributes") or "",
|
||||||
|
"_raw_tabular_sections": form_value(form, "tabular_sections") or "",
|
||||||
|
"_raw_forms": form_value(form, "forms") or "",
|
||||||
|
"_raw_commands": form_value(form, "commands") or "",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def html5_metadata_request_payload(payload: dict) -> dict:
|
||||||
|
return {key: value for key, value in payload.items() if not key.startswith("_raw_")}
|
||||||
|
|
||||||
|
|
||||||
|
def html5_csv_values(raw: str) -> list[str]:
|
||||||
|
return [item.strip() for item in raw.replace("\n", ",").split(",") if item.strip()]
|
||||||
|
|
||||||
|
|
||||||
|
def html5_metadata_attributes(raw: str) -> list[dict]:
|
||||||
|
attributes: list[dict] = []
|
||||||
|
for item in html5_csv_values(raw):
|
||||||
|
name, _, type_name = item.partition(":")
|
||||||
|
if name.strip():
|
||||||
|
attributes.append({"name": name.strip(), "type": type_name.strip() or "Строка"})
|
||||||
|
return attributes
|
||||||
|
|
||||||
|
|
||||||
|
def html5_metadata_commands(raw: str) -> list[dict]:
|
||||||
|
commands: list[dict] = []
|
||||||
|
for item in html5_csv_values(raw):
|
||||||
|
name, _, handler = item.partition(":")
|
||||||
|
if name.strip():
|
||||||
|
commands.append({"name": name.strip(), "handler": handler.strip() or None})
|
||||||
|
return commands
|
||||||
|
|
||||||
|
|
||||||
|
def html5_metadata_tabular_sections(raw: str) -> list[dict]:
|
||||||
|
sections: list[dict] = []
|
||||||
|
for item in html5_csv_values(raw):
|
||||||
|
name, _, attrs = item.partition("[")
|
||||||
|
if not name.strip():
|
||||||
|
continue
|
||||||
|
attributes = []
|
||||||
|
for attr in attrs.rstrip("]").split(";"):
|
||||||
|
attr_name, _, attr_type = attr.partition(":")
|
||||||
|
if attr_name.strip():
|
||||||
|
attributes.append({"name": attr_name.strip(), "type": attr_type.strip() or "Строка"})
|
||||||
|
sections.append({"name": name.strip(), "attributes": attributes})
|
||||||
|
return sections
|
||||||
@@ -18,7 +18,7 @@ from difflib import SequenceMatcher
|
|||||||
from enum import Enum
|
from enum import Enum
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any, Callable
|
from typing import Any, Callable
|
||||||
from urllib.parse import parse_qs, quote, urljoin, urlsplit, urlunsplit
|
from urllib.parse import quote, urljoin, urlsplit, urlunsplit
|
||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
|
|
||||||
from collaboration import (
|
from collaboration import (
|
||||||
@@ -36,6 +36,12 @@ from fastapi.responses import PlainTextResponse, Response, StreamingResponse
|
|||||||
from neo4j import AsyncGraphDatabase
|
from neo4j import AsyncGraphDatabase
|
||||||
from pydantic import BaseModel, Field
|
from pydantic import BaseModel, Field
|
||||||
|
|
||||||
|
from api_server.html5_forms import (
|
||||||
|
form_value as _form_value,
|
||||||
|
html5_form_data as _html5_form_data,
|
||||||
|
html5_metadata_payload as _html5_metadata_payload,
|
||||||
|
html5_metadata_request_payload as _html5_metadata_request_payload,
|
||||||
|
)
|
||||||
from api_server.html5_projects import render_html5_index, render_html5_project_rows
|
from api_server.html5_projects import render_html5_index, render_html5_project_rows
|
||||||
from api_server.html5_responses import (
|
from api_server.html5_responses import (
|
||||||
Html5StaticFiles,
|
Html5StaticFiles,
|
||||||
@@ -8154,79 +8160,6 @@ def _project_has_stored_snapshot(project_id: str) -> bool:
|
|||||||
return _storage.has_snapshot(project_id)
|
return _storage.has_snapshot(project_id)
|
||||||
|
|
||||||
|
|
||||||
async def _html5_form_data(request: Request) -> dict[str, list[str]]:
|
|
||||||
body = (await request.body()).decode("utf-8")
|
|
||||||
return parse_qs(body, keep_blank_values=True)
|
|
||||||
|
|
||||||
|
|
||||||
def _form_value(form: dict[str, list[str]], key: str) -> str | None:
|
|
||||||
values = form.get(key)
|
|
||||||
if not values:
|
|
||||||
return None
|
|
||||||
value = values[0].strip()
|
|
||||||
return value or None
|
|
||||||
|
|
||||||
|
|
||||||
def _html5_metadata_payload(form: dict[str, list[str]]) -> dict:
|
|
||||||
return {
|
|
||||||
"object_kind": _form_value(form, "object_kind") or "DOCUMENT",
|
|
||||||
"name": _form_value(form, "name") or "",
|
|
||||||
"synonym": _form_value(form, "synonym"),
|
|
||||||
"attributes": _html5_metadata_attributes(_form_value(form, "attributes") or ""),
|
|
||||||
"tabular_sections": _html5_metadata_tabular_sections(_form_value(form, "tabular_sections") or ""),
|
|
||||||
"forms": _html5_csv_values(_form_value(form, "forms") or ""),
|
|
||||||
"commands": _html5_metadata_commands(_form_value(form, "commands") or ""),
|
|
||||||
"task_id": _form_value(form, "task_id"),
|
|
||||||
"session_id": _form_value(form, "session_id"),
|
|
||||||
"user_id": _form_value(form, "user_id"),
|
|
||||||
"_raw_attributes": _form_value(form, "attributes") or "",
|
|
||||||
"_raw_tabular_sections": _form_value(form, "tabular_sections") or "",
|
|
||||||
"_raw_forms": _form_value(form, "forms") or "",
|
|
||||||
"_raw_commands": _form_value(form, "commands") or "",
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def _html5_metadata_request_payload(payload: dict) -> dict:
|
|
||||||
return {key: value for key, value in payload.items() if not key.startswith("_raw_")}
|
|
||||||
|
|
||||||
|
|
||||||
def _html5_csv_values(raw: str) -> list[str]:
|
|
||||||
return [item.strip() for item in raw.replace("\n", ",").split(",") if item.strip()]
|
|
||||||
|
|
||||||
|
|
||||||
def _html5_metadata_attributes(raw: str) -> list[dict]:
|
|
||||||
attributes: list[dict] = []
|
|
||||||
for item in _html5_csv_values(raw):
|
|
||||||
name, _, type_name = item.partition(":")
|
|
||||||
if name.strip():
|
|
||||||
attributes.append({"name": name.strip(), "type": type_name.strip() or "Строка"})
|
|
||||||
return attributes
|
|
||||||
|
|
||||||
|
|
||||||
def _html5_metadata_commands(raw: str) -> list[dict]:
|
|
||||||
commands: list[dict] = []
|
|
||||||
for item in _html5_csv_values(raw):
|
|
||||||
name, _, handler = item.partition(":")
|
|
||||||
if name.strip():
|
|
||||||
commands.append({"name": name.strip(), "handler": handler.strip() or None})
|
|
||||||
return commands
|
|
||||||
|
|
||||||
|
|
||||||
def _html5_metadata_tabular_sections(raw: str) -> list[dict]:
|
|
||||||
sections: list[dict] = []
|
|
||||||
for item in _html5_csv_values(raw):
|
|
||||||
name, _, attrs = item.partition("[")
|
|
||||||
if not name.strip():
|
|
||||||
continue
|
|
||||||
attributes = []
|
|
||||||
for attr in attrs.rstrip("]").split(";"):
|
|
||||||
attr_name, _, attr_type = attr.partition(":")
|
|
||||||
if attr_name.strip():
|
|
||||||
attributes.append({"name": attr_name.strip(), "type": attr_type.strip() or "Строка"})
|
|
||||||
sections.append({"name": name.strip(), "attributes": attributes})
|
|
||||||
return sections
|
|
||||||
|
|
||||||
|
|
||||||
def _runtime_for_object_context(project_id: str, impact: ObjectImpactResponse) -> list[RuntimeSummaryResponse]:
|
def _runtime_for_object_context(project_id: str, impact: ObjectImpactResponse) -> list[RuntimeSummaryResponse]:
|
||||||
lineages = {
|
lineages = {
|
||||||
item.lineage_id
|
item.lineage_id
|
||||||
|
|||||||
@@ -7,6 +7,12 @@ import zipfile
|
|||||||
from fastapi.testclient import TestClient
|
from fastapi.testclient import TestClient
|
||||||
|
|
||||||
from api_server import main
|
from api_server import main
|
||||||
|
from api_server.html5_forms import (
|
||||||
|
form_value,
|
||||||
|
html5_csv_values,
|
||||||
|
html5_metadata_payload,
|
||||||
|
html5_metadata_request_payload,
|
||||||
|
)
|
||||||
from api_server.html5_sse import html5_sse_comment, html5_sse_event, html5_sse_if_changed
|
from api_server.html5_sse import html5_sse_comment, html5_sse_event, html5_sse_if_changed
|
||||||
from api_server.main import app
|
from api_server.main import app
|
||||||
from one_c_normalizer import ConfigurationRoot, MetadataGroup, MetadataObject, Module, NormalizedProject
|
from one_c_normalizer import ConfigurationRoot, MetadataGroup, MetadataObject, Module, NormalizedProject
|
||||||
@@ -72,6 +78,48 @@ def test_html5_sse_formatters_emit_stable_event_stream_chunks():
|
|||||||
assert "data: <div>done</div>" in third[0]
|
assert "data: <div>done</div>" in third[0]
|
||||||
|
|
||||||
|
|
||||||
|
def test_html5_form_helpers_normalize_metadata_payloads():
|
||||||
|
form = {
|
||||||
|
"name": [" ЗаказПокупателя "],
|
||||||
|
"synonym": [" Заказ покупателя "],
|
||||||
|
"attributes": ["Номер:Строка, Дата:Дата\nКомментарий"],
|
||||||
|
"tabular_sections": ["Товары[Номенклатура:СправочникСсылка.Номенклатура;Количество:Число]"],
|
||||||
|
"forms": ["ФормаДокумента, ФормаСписка\nФормаВыбора"],
|
||||||
|
"commands": ["Провести:ПровестиДокумент, Печать"],
|
||||||
|
"task_id": [" task-1 "],
|
||||||
|
"empty": [" "],
|
||||||
|
}
|
||||||
|
|
||||||
|
payload = html5_metadata_payload(form)
|
||||||
|
|
||||||
|
assert form_value(form, "name") == "ЗаказПокупателя"
|
||||||
|
assert form_value(form, "empty") is None
|
||||||
|
assert html5_csv_values("one, two\nthree") == ["one", "two", "three"]
|
||||||
|
assert payload["object_kind"] == "DOCUMENT"
|
||||||
|
assert payload["name"] == "ЗаказПокупателя"
|
||||||
|
assert payload["attributes"] == [
|
||||||
|
{"name": "Номер", "type": "Строка"},
|
||||||
|
{"name": "Дата", "type": "Дата"},
|
||||||
|
{"name": "Комментарий", "type": "Строка"},
|
||||||
|
]
|
||||||
|
assert payload["tabular_sections"] == [
|
||||||
|
{
|
||||||
|
"name": "Товары",
|
||||||
|
"attributes": [
|
||||||
|
{"name": "Номенклатура", "type": "СправочникСсылка.Номенклатура"},
|
||||||
|
{"name": "Количество", "type": "Число"},
|
||||||
|
],
|
||||||
|
}
|
||||||
|
]
|
||||||
|
assert payload["forms"] == ["ФормаДокумента", "ФормаСписка", "ФормаВыбора"]
|
||||||
|
assert payload["commands"] == [
|
||||||
|
{"name": "Провести", "handler": "ПровестиДокумент"},
|
||||||
|
{"name": "Печать", "handler": None},
|
||||||
|
]
|
||||||
|
assert payload["_raw_attributes"] == "Номер:Строка, Дата:Дата\nКомментарий"
|
||||||
|
assert "_raw_attributes" not in html5_metadata_request_payload(payload)
|
||||||
|
|
||||||
|
|
||||||
def test_cors_allows_lan_panel_origin():
|
def test_cors_allows_lan_panel_origin():
|
||||||
client = TestClient(app)
|
client = TestClient(app)
|
||||||
response = client.options(
|
response = client.options(
|
||||||
|
|||||||
Reference in New Issue
Block a user