Block authoring apply on RBAC and privacy guards
CI / python (push) Has been cancelled
CI / rust (push) Has been cancelled

This commit is contained in:
2026-05-16 20:07:19 +03:00
parent cca24d0e7e
commit b63d18a173
4 changed files with 193 additions and 10 deletions
+60 -10
View File
@@ -3242,11 +3242,7 @@ def _authoring_guard_checks(
status="WARNING" if context.review_findings else "OK",
message=f"{len(context.review_findings)} review findings in context",
),
AuthoringGuardCheck(
name="privacy",
status="WARNING" if context.privacy_markers else "OK",
message=f"{len(context.privacy_markers)} privacy markers in context",
),
_authoring_privacy_check(context),
AuthoringGuardCheck(
name="ai-token-budget",
status=token_status,
@@ -3264,6 +3260,7 @@ def _authoring_task_session_check(
project_id: str,
task_id: str | None,
session_id: str | None,
user_id: str | None = None,
) -> AuthoringGuardCheck:
if not task_id:
return AuthoringGuardCheck(
@@ -3315,6 +3312,12 @@ def _authoring_task_session_check(
status="BLOCKED",
message=f"Session {session_id} is already finished",
)
if user_id and session.user_id != user_id:
return AuthoringGuardCheck(
name="task-session",
status="BLOCKED",
message=f"Session {session_id} belongs to user {session.user_id}",
)
return AuthoringGuardCheck(
name="task-session",
status="OK",
@@ -3322,6 +3325,47 @@ def _authoring_task_session_check(
)
def _authoring_rbac_check(user_id: str | None) -> AuthoringGuardCheck:
if not user_id:
return AuthoringGuardCheck(
name="rbac",
status="BLOCKED",
message="User id is required for workspace apply",
)
if not _rbac.is_allowed(user_id, Permission.APPLY_AUTHORING_CHANGE):
return AuthoringGuardCheck(
name="rbac",
status="BLOCKED",
message=f"User {user_id} is not allowed to apply authoring changes",
)
return AuthoringGuardCheck(
name="rbac",
status="OK",
message=f"User {user_id} can apply authoring changes",
)
def _authoring_privacy_check(context: AuthoringContextResponse) -> AuthoringGuardCheck:
blocked_markers = [
marker
for marker in context.privacy_markers
if marker.classification in {PrivacyClassification.PERSONAL_DATA, PrivacyClassification.SECRET}
]
if blocked_markers:
return AuthoringGuardCheck(
name="privacy",
status="BLOCKED",
message=f"{len(blocked_markers)} sensitive privacy markers require explicit local-only handling before apply",
)
if context.privacy_markers:
return AuthoringGuardCheck(
name="privacy",
status="WARNING",
message=f"{len(context.privacy_markers)} privacy markers in context",
)
return AuthoringGuardCheck(name="privacy", status="OK", message="No privacy markers in context")
def _authoring_target_node(snapshot: SirSnapshot, request: AuthoringSemanticDiffPreviewRequest):
if request.target_lineage_id:
found = next((node for node in snapshot.nodes if node.lineage_id == request.target_lineage_id), None)
@@ -3455,7 +3499,8 @@ def _authoring_semantic_diff_preview(
user_id=request.user_id,
),
)
checks.append(_authoring_task_session_check(project_id, request.task_id, request.session_id))
checks.append(_authoring_task_session_check(project_id, request.task_id, request.session_id, request.user_id))
checks.append(_authoring_rbac_check(request.user_id))
version_preview = _authoring_version_preview(target, request.proposed_text, request.task_id, request.session_id)
return AuthoringSemanticDiffPreviewResponse(
project_id=project_id,
@@ -5282,7 +5327,8 @@ def _authoring_metadata_object_preview(
AuthoringGuardCheck(name="preview", status="REQUIRED", message="Metadata draft must be reviewed before apply"),
AuthoringGuardCheck(name="workspace-history", status="READY", message="Draft can be saved to SFERA workspace history"),
AuthoringGuardCheck(name="production-1c", status="BLOCKED", message="Production 1C metadata write is disabled"),
_authoring_task_session_check(project_id, request.task_id, request.session_id),
_authoring_task_session_check(project_id, request.task_id, request.session_id, request.user_id),
_authoring_rbac_check(request.user_id),
]
return AuthoringMetadataObjectPreviewResponse(
project_id=project_id,
@@ -5818,9 +5864,13 @@ async def authoring_apply_rollback(
raise HTTPException(status_code=409, detail="Expected rollback version id does not match current preview")
if not preview.apply_available:
raise HTTPException(status_code=409, detail="Rollback apply is not available")
task_session_check = _authoring_task_session_check(project_id, request.task_id, request.session_id)
if task_session_check.status == "BLOCKED":
raise HTTPException(status_code=409, detail={"blocked_checks": [task_session_check.model_dump(mode="json")]})
apply_checks = [
_authoring_task_session_check(project_id, request.task_id, request.session_id),
_authoring_rbac_check(request.approved_by),
]
blocking_checks = [check for check in apply_checks if check.status == "BLOCKED"]
if blocking_checks:
raise HTTPException(status_code=409, detail={"blocked_checks": [check.model_dump(mode="json") for check in blocking_checks]})
version, path = _persist_authoring_rollback(project_id, change_payload, preview, request)
return AuthoringApplyRollbackResponse(
project_id=project_id,
+130
View File
@@ -13,6 +13,8 @@ from one_c_normalizer import ConfigurationRoot, MetadataGroup, MetadataObject, M
def create_authoring_session(client: TestClient, project_id: str, task_id: str, session_id: str, user_id: str = "dev.ivan") -> None:
user = client.post("/collaboration/users", json={"user_id": user_id, "display_name": user_id})
assert user.status_code == 200
grant = client.post(f"/security/users/{user_id}/roles/developer")
assert grant.status_code == 200
task = client.post(
"/collaboration/tasks",
json={"task_id": task_id, "project_id": project_id, "title": f"Authoring {task_id}", "assignee_user_id": user_id},
@@ -1999,6 +2001,7 @@ def test_authoring_context_and_completion_preview(tmp_path: Path):
),
"task_id": "task.authoring",
"session_id": "session.authoring",
"user_id": "dev.ivan",
},
)
@@ -2011,6 +2014,7 @@ def test_authoring_context_and_completion_preview(tmp_path: Path):
assert diff_payload["version_preview"]["task_id"] == "task.authoring"
assert diff_payload["version_preview"]["apply_available"] is False
assert any(row["name"] == "task-session" and row["status"] == "OK" for row in diff_payload["checks"])
assert any(row["name"] == "rbac" and row["status"] == "OK" for row in diff_payload["checks"])
assert any(row["name"] == "apply" and row["status"] == "BLOCKED" for row in diff_payload["checks"])
apply_response = client.post(
@@ -2025,6 +2029,7 @@ def test_authoring_context_and_completion_preview(tmp_path: Path):
),
"task_id": "task.authoring",
"session_id": "session.authoring",
"user_id": "dev.ivan",
"expected_next_version_id": diff_payload["version_preview"]["next_version_id"],
"approved_by": "dev.ivan",
"approval_note": "preview checked",
@@ -2150,6 +2155,129 @@ def test_authoring_apply_requires_active_task_session(tmp_path: Path):
assert blocked[0]["name"] == "task-session"
def test_authoring_apply_requires_rbac_permission(tmp_path: Path):
project_id = f"authoring-rbac-{uuid4()}"
module = tmp_path / "rbac_module.bsl"
source_text = "Процедура Проверить()\nКонецПроцедуры\n"
module.write_text(source_text, encoding="utf-8")
client = TestClient(app)
indexed = client.post("/projects/index", json={"path": str(tmp_path), "project_id": project_id})
assert indexed.status_code == 200
user = client.post("/collaboration/users", json={"user_id": "viewer.ivan", "display_name": "Viewer"})
assert user.status_code == 200
grant = client.post("/security/users/viewer.ivan/roles/viewer")
assert grant.status_code == 200
task = client.post(
"/collaboration/tasks",
json={"task_id": "task.rbac", "project_id": project_id, "title": "RBAC authoring", "assignee_user_id": "viewer.ivan"},
)
assert task.status_code == 200
session = client.post(
"/collaboration/sessions",
json={"session": {"session_id": "session.rbac", "task_id": "task.rbac", "user_id": "viewer.ivan"}},
)
assert session.status_code == 200
preview = client.post(
f"/projects/{project_id}/authoring/semantic-diff-preview",
json={
"routine_name": "Проверить",
"source_path": str(module),
"original_text": source_text,
"proposed_text": source_text.replace("КонецПроцедуры", " Возврат;\nКонецПроцедуры"),
"task_id": "task.rbac",
"session_id": "session.rbac",
"user_id": "viewer.ivan",
},
)
assert preview.status_code == 200
preview_payload = preview.json()
assert any(check["name"] == "rbac" and check["status"] == "BLOCKED" for check in preview_payload["checks"])
apply_response = client.post(
f"/projects/{project_id}/authoring/apply-change-set",
json={
"routine_name": "Проверить",
"source_path": str(module),
"original_text": source_text,
"proposed_text": source_text.replace("КонецПроцедуры", " Возврат;\nКонецПроцедуры"),
"task_id": "task.rbac",
"session_id": "session.rbac",
"user_id": "viewer.ivan",
"expected_next_version_id": preview_payload["version_preview"]["next_version_id"],
"approved_by": "viewer.ivan",
},
)
assert apply_response.status_code == 409
assert any(check["name"] == "rbac" for check in apply_response.json()["detail"]["blocked_checks"])
def test_authoring_apply_blocks_sensitive_privacy_context(tmp_path: Path):
project_id = f"authoring-privacy-{uuid4()}"
(tmp_path / "metadata.xml").write_text(
"""
<Configuration>
<Document name="ЗаказПокупателя" qualifiedName="Документ.ЗаказПокупателя">
<Attribute name="Телефон" qualifiedName="Документ.ЗаказПокупателя.Телефон" />
</Document>
</Configuration>
""",
encoding="utf-8",
)
module = tmp_path / "Documents" / "ЗаказПокупателя" / "Ext" / "ObjectModule.bsl"
module.parent.mkdir(parents=True)
source_text = "Процедура Проверить()\n Сообщить(Телефон);\nКонецПроцедуры\n"
module.write_text(source_text, encoding="utf-8")
client = TestClient(app)
indexed = client.post("/projects/index", json={"path": str(tmp_path), "project_id": project_id})
assert indexed.status_code == 200
create_authoring_session(client, project_id, "task.privacy", "session.privacy")
schema = client.get(f"/projects/{project_id}/objects/schema/Документ.ЗаказПокупателя")
phone_lineage = schema.json()["attributes"][0]["lineage_id"]
marker = client.post(
f"/projects/{project_id}/privacy/markers",
json={"target_id": phone_lineage, "classification": "PERSONAL_DATA", "reason": "phone number"},
)
assert marker.status_code == 200
preview = client.post(
f"/projects/{project_id}/authoring/semantic-diff-preview",
json={
"object_name": "Документ.ЗаказПокупателя",
"routine_name": "Проверить",
"source_path": str(module),
"original_text": source_text,
"proposed_text": source_text.replace("Сообщить(Телефон);", "Сообщить(Телефон);\n Возврат;"),
"task_id": "task.privacy",
"session_id": "session.privacy",
"user_id": "dev.ivan",
},
)
assert preview.status_code == 200
preview_payload = preview.json()
assert any(check["name"] == "privacy" and check["status"] == "BLOCKED" for check in preview_payload["checks"])
apply_response = client.post(
f"/projects/{project_id}/authoring/apply-change-set",
json={
"object_name": "Документ.ЗаказПокупателя",
"routine_name": "Проверить",
"source_path": str(module),
"original_text": source_text,
"proposed_text": source_text.replace("Сообщить(Телефон);", "Сообщить(Телефон);\n Возврат;"),
"task_id": "task.privacy",
"session_id": "session.privacy",
"user_id": "dev.ivan",
"expected_next_version_id": preview_payload["version_preview"]["next_version_id"],
"approved_by": "dev.ivan",
},
)
assert apply_response.status_code == 409
assert any(check["name"] == "privacy" for check in apply_response.json()["detail"]["blocked_checks"])
def test_authoring_metadata_object_preview_and_apply(tmp_path: Path):
project_id = f"metadata-authoring-api-{uuid4()}"
(tmp_path / "metadata.xml").write_text(
@@ -2187,6 +2315,7 @@ def test_authoring_metadata_object_preview_and_apply(tmp_path: Path):
"commands": [{"name": "Заполнить", "handler": "ЗаполнитьКоманда"}],
"task_id": "task.metadata",
"session_id": "session.metadata",
"user_id": "dev.ivan",
}
preview = client.post(f"/projects/{project_id}/authoring/metadata-object-preview", json=draft)
@@ -2196,6 +2325,7 @@ def test_authoring_metadata_object_preview_and_apply(tmp_path: Path):
assert preview_payload["changed"] is True
assert preview_payload["version_preview"]["apply_available"] is True
assert any(check["name"] == "task-session" and check["status"] == "OK" for check in preview_payload["checks"])
assert any(check["name"] == "rbac" and check["status"] == "OK" for check in preview_payload["checks"])
assert any("Реквизит.Контрагент" in row["text"] for row in preview_payload["semantic_diff"])
assert any("ТабличнаяЧасть.Товары" in row["text"] for row in preview_payload["semantic_diff"])
assert any("Команда.Заполнить" in row["text"] for row in preview_payload["semantic_diff"])