From feaf40c205d1dbbc76102b92838a6b59d1935bee Mon Sep 17 00:00:00 2001 From: Mikhail Date: Thu, 21 May 2026 18:22:47 +0300 Subject: [PATCH] Preview 1C access profile drafts --- services/api-server/src/api_server/main.py | 154 +++++++++++++++++++++ services/api-server/tests/test_api.py | 15 ++ 2 files changed, 169 insertions(+) diff --git a/services/api-server/src/api_server/main.py b/services/api-server/src/api_server/main.py index b323106..23c9d53 100644 --- a/services/api-server/src/api_server/main.py +++ b/services/api-server/src/api_server/main.py @@ -830,6 +830,29 @@ class ImportRequest(BaseModel): mode: ImportMode = ImportMode.FULL_REPLACE +class AccessProfileDraftRequest(BaseModel): + name: str + target_objects: list[str] = Field(default_factory=list) + permissions: list[str] = Field(default_factory=lambda: ["read"]) + source_user: str | None = None + + +class AccessProfileDraftRole(BaseModel): + role: str + role_qualified_name: str + matched_objects: list[str] = Field(default_factory=list) + permissions: dict = Field(default_factory=dict) + source: str = "role-rights" + + +class AccessProfileDraftResponse(BaseModel): + name: str + roles: list[AccessProfileDraftRole] = Field(default_factory=list) + missing_objects: list[str] = Field(default_factory=list) + warnings: list[str] = Field(default_factory=list) + proposed_profile: dict = Field(default_factory=dict) + + class ProjectSetupResponse(BaseModel): project_id: str status: ProjectSetupStatus @@ -2917,6 +2940,14 @@ async def get_project_access_user(project_id: str, user_name: str) -> dict: return {"user": user.model_dump(mode="json"), "effective_roles": effective_roles} +@app.post("/projects/{project_id}/access/profile-preview", response_model=AccessProfileDraftResponse) +async def preview_project_access_profile(project_id: str, request: AccessProfileDraftRequest) -> AccessProfileDraftResponse: + normalized = _load_normalized_project(project_id) + if normalized is None: + raise HTTPException(status_code=404, detail="NormalizedProject not found") + return _build_access_profile_draft(normalized, request) + + @app.get("/projects/{project_id}/imports/quality", response_model=ImportQualityResponse) async def get_import_quality(project_id: str) -> ImportQualityResponse: return _import_quality_response(project_id) @@ -7281,6 +7312,129 @@ def _effective_access_roles(normalized: NormalizedProject, user) -> list[dict]: return sorted(unique.values(), key=lambda item: str(item.get("role") or "").casefold()) +def _build_access_profile_draft( + normalized: NormalizedProject, + request: AccessProfileDraftRequest, +) -> AccessProfileDraftResponse: + profile_name = request.name.strip() + if not profile_name: + raise HTTPException(status_code=400, detail="Profile name is required") + target_objects = [item.strip() for item in request.target_objects if item.strip()] + permissions = [item.strip().casefold() for item in request.permissions if item.strip()] + role_candidates = _access_roles_for_targets(normalized, target_objects, permissions) + warnings: list[str] = [] + + if request.source_user: + user = _access_user_by_name(normalized, request.source_user) + if user is None: + warnings.append(f"Пользователь не найден: {request.source_user}") + else: + for role in _effective_access_roles(normalized, user): + role_name = str(role.get("role") or "").strip() + role_qualified_name = str(role.get("role_qualified_name") or role_name or "").strip() + if not role_name or not role_qualified_name: + continue + role_candidates.setdefault( + role_qualified_name.casefold(), + AccessProfileDraftRole( + role=role_name, + role_qualified_name=role_qualified_name, + matched_objects=[], + permissions={}, + source=f"user:{user.name}", + ), + ) + + covered_objects = { + matched.casefold() + for role in role_candidates.values() + for matched in role.matched_objects + } + missing_objects = [ + target + for target in target_objects + if target.casefold() not in covered_objects + ] + if missing_objects: + warnings.append("Для части объектов не найдены роли с подходящими правами.") + roles = sorted(role_candidates.values(), key=lambda item: item.role.casefold()) + return AccessProfileDraftResponse( + name=profile_name, + roles=roles, + missing_objects=missing_objects, + warnings=warnings, + proposed_profile={ + "name": profile_name, + "qualified_name": f"ПрофильГруппыДоступа.{profile_name}", + "roles": [role.role_qualified_name for role in roles], + "target_objects": target_objects, + "permissions": permissions, + "status": "preview_only", + }, + ) + + +def _access_roles_for_targets( + normalized: NormalizedProject, + target_objects: list[str], + permissions: list[str], +) -> dict[str, AccessProfileDraftRole]: + result: dict[str, AccessProfileDraftRole] = {} + if not target_objects: + return result + target_lookup = {target.casefold(): target for target in target_objects} + for role_object in _normalized_role_objects(normalized): + role_qualified_name = role_object.qualified_name + candidate_permissions: dict[str, str] = {} + matched_objects: list[str] = [] + for right in role_object.rights: + target = str(right.target or right.name or "") + canonical_target = target_lookup.get(target.casefold()) + if canonical_target is None: + continue + permission_values = { + str(key).casefold(): str(value) + for key, value in right.permissions.items() + } + if permissions and not any(_permission_is_enabled(permission_values.get(permission)) for permission in permissions): + continue + matched_objects.append(canonical_target) + candidate_permissions.update(permission_values) + if matched_objects: + result[role_qualified_name.casefold()] = AccessProfileDraftRole( + role=role_object.name, + role_qualified_name=role_qualified_name, + matched_objects=sorted(set(matched_objects), key=str.casefold), + permissions=candidate_permissions, + ) + return result + + +def _normalized_role_objects(normalized: NormalizedProject) -> list[MetadataObject]: + return [ + item + for group in _normalized_all_groups(normalized) + for item in group.objects + if item.object_kind == "ROLE" + ] + + +def _access_user_by_name(normalized: NormalizedProject, user_name: str): + wanted = user_name.casefold() + return next( + ( + item + for item in normalized.access.users + if item.name.casefold() == wanted or str(item.qualified_name or "").casefold() == wanted + ), + None, + ) + + +def _permission_is_enabled(value: str | None) -> bool: + return str(value or "").casefold() in {"true", "1", "yes", "да", "истина"} + + def _import_quality_response(project_id: str) -> ImportQualityResponse: normalized = _load_normalized_project(project_id) summary = _normalized_project_summary(normalized) if normalized is not None else None diff --git a/services/api-server/tests/test_api.py b/services/api-server/tests/test_api.py index 892eb43..665ab3e 100644 --- a/services/api-server/tests/test_api.py +++ b/services/api-server/tests/test_api.py @@ -1543,6 +1543,21 @@ def test_import_supports_structure_only_indexing(tmp_path: Path): assert access_user.status_code == 200 assert access_user.json()["effective_roles"][0]["role_qualified_name"] == "Роль.Менеджер" + profile_preview = client.post( + f"/projects/{project_id}/access/profile-preview", + json={ + "name": "НовыйПрофильHTTP", + "target_objects": ["HTTPСервис.ПубличныйAPI"], + "permissions": ["read"], + "source_user": "ivanov", + }, + ) + assert profile_preview.status_code == 200 + preview_payload = profile_preview.json() + assert preview_payload["proposed_profile"]["qualified_name"] == "ПрофильГруппыДоступа.НовыйПрофильHTTP" + assert "Роль.Менеджер" in preview_payload["proposed_profile"]["roles"] + assert preview_payload["missing_objects"] == [] + tree = client.get(f"/projects/{project_id}/metadata/tree") assert tree.status_code == 200 root = tree.json()["root"]