Plan publishing 1C access profiles
CI / python (push) Has been cancelled
CI / rust (push) Has been cancelled

This commit is contained in:
2026-05-21 18:32:06 +03:00
parent db5fdf0aa4
commit 6051f59e08
2 changed files with 116 additions and 0 deletions
+105
View File
@@ -868,6 +868,14 @@ class AccessProfileApplyResponse(BaseModel):
message: str message: str
class AccessProfilePublishPlanResponse(BaseModel):
profile: dict
operations: list[dict] = Field(default_factory=list)
ready_for_extension: bool = False
warnings: list[str] = Field(default_factory=list)
extension_payload: dict = Field(default_factory=dict)
class ProjectSetupResponse(BaseModel): class ProjectSetupResponse(BaseModel):
project_id: str project_id: str
status: ProjectSetupStatus status: ProjectSetupStatus
@@ -2996,6 +3004,17 @@ async def apply_project_access_profile(project_id: str, request: AccessProfileAp
) )
@app.get("/projects/{project_id}/access/profiles/{profile_name}/publish-plan", response_model=AccessProfilePublishPlanResponse)
async def get_project_access_profile_publish_plan(project_id: str, profile_name: str) -> AccessProfilePublishPlanResponse:
normalized = _load_normalized_project(project_id)
if normalized is None:
raise HTTPException(status_code=404, detail="NormalizedProject not found")
profile = _access_profile_by_name(normalized, profile_name)
if profile is None:
raise HTTPException(status_code=404, detail="Access profile not found")
return _build_access_profile_publish_plan(normalized, profile)
@app.get("/projects/{project_id}/imports/quality", response_model=ImportQualityResponse) @app.get("/projects/{project_id}/imports/quality", response_model=ImportQualityResponse)
async def get_import_quality(project_id: str) -> ImportQualityResponse: async def get_import_quality(project_id: str) -> ImportQualityResponse:
return _import_quality_response(project_id) return _import_quality_response(project_id)
@@ -7454,6 +7473,92 @@ def _access_profile_from_draft(
) )
def _access_profile_by_name(normalized: NormalizedProject, profile_name: str):
wanted = profile_name.casefold()
return next(
(
profile
for profile in normalized.access.profiles
if profile.name.casefold() == wanted or str(profile.qualified_name or "").casefold() == wanted
),
None,
)
def _build_access_profile_publish_plan(normalized: NormalizedProject, profile) -> AccessProfilePublishPlanResponse:
existing = _access_profile_by_name(
normalized.model_copy(update={"access": normalized.access.model_copy(update={"profiles": [item for item in normalized.access.profiles if item is not profile]})}),
profile.qualified_name or profile.name,
)
profile_roles = [role.role_qualified_name or f"Роль.{role.role}" for role in profile.roles]
target_objects = list(profile.attributes.get("target_objects") or [])
permissions = list(profile.attributes.get("permissions") or [])
missing_objects = list(profile.attributes.get("missing_objects") or [])
operations: list[dict] = []
if existing is None:
operations.append(
{
"action": "CREATE_ACCESS_PROFILE",
"target": profile.qualified_name or f"ПрофильГруппыДоступа.{profile.name}",
"name": profile.name,
}
)
else:
operations.append(
{
"action": "UPDATE_ACCESS_PROFILE",
"target": existing.qualified_name or existing.name,
"name": profile.name,
}
)
for role_name in profile_roles:
operations.append(
{
"action": "ADD_ROLE_TO_PROFILE",
"target": profile.qualified_name or f"ПрофильГруппыДоступа.{profile.name}",
"role": role_name,
}
)
candidate_groups = [
group
for group in normalized.access.groups
if not group.profile and not group.profile_qualified_name
]
for group in candidate_groups:
operations.append(
{
"action": "CAN_ATTACH_GROUP",
"target": group.qualified_name or group.name,
"profile": profile.qualified_name or f"ПрофильГруппыДоступа.{profile.name}",
}
)
warnings: list[str] = []
if missing_objects:
warnings.append("Профиль содержит недостающие объекты и не готов к применению без ручной проверки.")
if not profile_roles:
warnings.append("В профиле нет ролей для применения.")
ready = not missing_objects and bool(profile_roles)
extension_payload = {
"operation": "access.profile.apply",
"dry_run": True,
"profile": {
"name": profile.name,
"qualified_name": profile.qualified_name or f"ПрофильГруппыДоступа.{profile.name}",
"roles": profile_roles,
"target_objects": target_objects,
"permissions": permissions,
},
"operations": operations,
}
return AccessProfilePublishPlanResponse(
profile=profile.model_dump(mode="json"),
operations=operations,
ready_for_extension=ready,
warnings=warnings,
extension_payload=extension_payload,
)
def _access_roles_for_targets( def _access_roles_for_targets(
normalized: NormalizedProject, normalized: NormalizedProject,
target_objects: list[str], target_objects: list[str],
+11
View File
@@ -2,6 +2,7 @@ from pathlib import Path
import re import re
import time import time
from types import SimpleNamespace from types import SimpleNamespace
from urllib.parse import quote
from uuid import uuid4 from uuid import uuid4
import zipfile import zipfile
@@ -1574,6 +1575,16 @@ def test_import_supports_structure_only_indexing(tmp_path: Path):
assert updated_access.status_code == 200 assert updated_access.status_code == 200
assert any(item["name"] == "НовыйПрофильHTTP" for item in updated_access.json()["profiles"]) assert any(item["name"] == "НовыйПрофильHTTP" for item in updated_access.json()["profiles"])
publish_plan = client.get(
f"/projects/{project_id}/access/profiles/{quote('НовыйПрофильHTTP')}/publish-plan"
)
assert publish_plan.status_code == 200
plan_payload = publish_plan.json()
assert plan_payload["ready_for_extension"] is True
assert plan_payload["operations"][0]["action"] == "CREATE_ACCESS_PROFILE"
assert any(item["action"] == "ADD_ROLE_TO_PROFILE" and item["role"] == "Роль.Менеджер" for item in plan_payload["operations"])
assert plan_payload["extension_payload"]["operation"] == "access.profile.apply"
tree = client.get(f"/projects/{project_id}/metadata/tree") tree = client.get(f"/projects/{project_id}/metadata/tree")
assert tree.status_code == 200 assert tree.status_code == 200
root = tree.json()["root"] root = tree.json()["root"]