Preview 1C access profile drafts
CI / python (push) Has been cancelled
CI / rust (push) Has been cancelled

This commit is contained in:
2026-05-21 18:22:47 +03:00
parent d0b74c05be
commit feaf40c205
2 changed files with 169 additions and 0 deletions
+154
View File
@@ -830,6 +830,29 @@ class ImportRequest(BaseModel):
mode: ImportMode = ImportMode.FULL_REPLACE
class AccessProfileDraftRequest(BaseModel):
name: str
target_objects: list[str] = Field(default_factory=list)
permissions: list[str] = Field(default_factory=lambda: ["read"])
source_user: str | None = None
class AccessProfileDraftRole(BaseModel):
role: str
role_qualified_name: str
matched_objects: list[str] = Field(default_factory=list)
permissions: dict = Field(default_factory=dict)
source: str = "role-rights"
class AccessProfileDraftResponse(BaseModel):
name: str
roles: list[AccessProfileDraftRole] = Field(default_factory=list)
missing_objects: list[str] = Field(default_factory=list)
warnings: list[str] = Field(default_factory=list)
proposed_profile: dict = Field(default_factory=dict)
class ProjectSetupResponse(BaseModel):
project_id: str
status: ProjectSetupStatus
@@ -2917,6 +2940,14 @@ async def get_project_access_user(project_id: str, user_name: str) -> dict:
return {"user": user.model_dump(mode="json"), "effective_roles": effective_roles}
@app.post("/projects/{project_id}/access/profile-preview", response_model=AccessProfileDraftResponse)
async def preview_project_access_profile(project_id: str, request: AccessProfileDraftRequest) -> AccessProfileDraftResponse:
normalized = _load_normalized_project(project_id)
if normalized is None:
raise HTTPException(status_code=404, detail="NormalizedProject not found")
return _build_access_profile_draft(normalized, request)
@app.get("/projects/{project_id}/imports/quality", response_model=ImportQualityResponse)
async def get_import_quality(project_id: str) -> ImportQualityResponse:
return _import_quality_response(project_id)
@@ -7281,6 +7312,129 @@ def _effective_access_roles(normalized: NormalizedProject, user) -> list[dict]:
return sorted(unique.values(), key=lambda item: str(item.get("role") or "").casefold())
def _build_access_profile_draft(
normalized: NormalizedProject,
request: AccessProfileDraftRequest,
) -> AccessProfileDraftResponse:
profile_name = request.name.strip()
if not profile_name:
raise HTTPException(status_code=400, detail="Profile name is required")
target_objects = [item.strip() for item in request.target_objects if item.strip()]
permissions = [item.strip().casefold() for item in request.permissions if item.strip()]
role_candidates = _access_roles_for_targets(normalized, target_objects, permissions)
warnings: list[str] = []
if request.source_user:
user = _access_user_by_name(normalized, request.source_user)
if user is None:
warnings.append(f"Пользователь не найден: {request.source_user}")
else:
for role in _effective_access_roles(normalized, user):
role_name = str(role.get("role") or "").strip()
role_qualified_name = str(role.get("role_qualified_name") or role_name or "").strip()
if not role_name or not role_qualified_name:
continue
role_candidates.setdefault(
role_qualified_name.casefold(),
AccessProfileDraftRole(
role=role_name,
role_qualified_name=role_qualified_name,
matched_objects=[],
permissions={},
source=f"user:{user.name}",
),
)
covered_objects = {
matched.casefold()
for role in role_candidates.values()
for matched in role.matched_objects
}
missing_objects = [
target
for target in target_objects
if target.casefold() not in covered_objects
]
if missing_objects:
warnings.append("Для части объектов не найдены роли с подходящими правами.")
roles = sorted(role_candidates.values(), key=lambda item: item.role.casefold())
return AccessProfileDraftResponse(
name=profile_name,
roles=roles,
missing_objects=missing_objects,
warnings=warnings,
proposed_profile={
"name": profile_name,
"qualified_name": f"ПрофильГруппыДоступа.{profile_name}",
"roles": [role.role_qualified_name for role in roles],
"target_objects": target_objects,
"permissions": permissions,
"status": "preview_only",
},
)
def _access_roles_for_targets(
normalized: NormalizedProject,
target_objects: list[str],
permissions: list[str],
) -> dict[str, AccessProfileDraftRole]:
result: dict[str, AccessProfileDraftRole] = {}
if not target_objects:
return result
target_lookup = {target.casefold(): target for target in target_objects}
for role_object in _normalized_role_objects(normalized):
role_qualified_name = role_object.qualified_name
candidate_permissions: dict[str, str] = {}
matched_objects: list[str] = []
for right in role_object.rights:
target = str(right.target or right.name or "")
canonical_target = target_lookup.get(target.casefold())
if canonical_target is None:
continue
permission_values = {
str(key).casefold(): str(value)
for key, value in right.permissions.items()
}
if permissions and not any(_permission_is_enabled(permission_values.get(permission)) for permission in permissions):
continue
matched_objects.append(canonical_target)
candidate_permissions.update(permission_values)
if matched_objects:
result[role_qualified_name.casefold()] = AccessProfileDraftRole(
role=role_object.name,
role_qualified_name=role_qualified_name,
matched_objects=sorted(set(matched_objects), key=str.casefold),
permissions=candidate_permissions,
)
return result
def _normalized_role_objects(normalized: NormalizedProject) -> list[MetadataObject]:
return [
item
for group in _normalized_all_groups(normalized)
for item in group.objects
if item.object_kind == "ROLE"
]
def _access_user_by_name(normalized: NormalizedProject, user_name: str):
wanted = user_name.casefold()
return next(
(
item
for item in normalized.access.users
if item.name.casefold() == wanted or str(item.qualified_name or "").casefold() == wanted
),
None,
)
def _permission_is_enabled(value: str | None) -> bool:
return str(value or "").casefold() in {"true", "1", "yes", "да", "истина"}
def _import_quality_response(project_id: str) -> ImportQualityResponse:
normalized = _load_normalized_project(project_id)
summary = _normalized_project_summary(normalized) if normalized is not None else None
+15
View File
@@ -1543,6 +1543,21 @@ def test_import_supports_structure_only_indexing(tmp_path: Path):
assert access_user.status_code == 200
assert access_user.json()["effective_roles"][0]["role_qualified_name"] == "Роль.Менеджер"
profile_preview = client.post(
f"/projects/{project_id}/access/profile-preview",
json={
"name": "НовыйПрофильHTTP",
"target_objects": ["HTTPСервис.ПубличныйAPI"],
"permissions": ["read"],
"source_user": "ivanov",
},
)
assert profile_preview.status_code == 200
preview_payload = profile_preview.json()
assert preview_payload["proposed_profile"]["qualified_name"] == "ПрофильГруппыДоступа.НовыйПрофильHTTP"
assert "Роль.Менеджер" in preview_payload["proposed_profile"]["roles"]
assert preview_payload["missing_objects"] == []
tree = client.get(f"/projects/{project_id}/metadata/tree")
assert tree.status_code == 200
root = tree.json()["root"]