Load 1C access profiles groups and users
CI / python (push) Has been cancelled
CI / rust (push) Has been cancelled

This commit is contained in:
2026-05-21 18:17:27 +03:00
parent 3c7b1825c4
commit d0b74c05be
11 changed files with 599 additions and 0 deletions
@@ -71,6 +71,49 @@ class Rights(ObjectPart):
permissions: dict = {} permissions: dict = {}
class AccessRoleAssignment(BaseModel):
role: str
role_qualified_name: str | None = None
source: str | None = None
attributes: dict = {}
class AccessProfile(BaseModel):
name: str
qualified_name: str | None = None
source_path: str | None = None
attributes: dict = {}
roles: list[AccessRoleAssignment] = []
class AccessGroup(BaseModel):
name: str
qualified_name: str | None = None
source_path: str | None = None
profile: str | None = None
profile_qualified_name: str | None = None
attributes: dict = {}
roles: list[AccessRoleAssignment] = []
users: list[str] = []
class AccessUser(BaseModel):
name: str
qualified_name: str | None = None
source_path: str | None = None
full_name: str | None = None
disabled: bool = False
attributes: dict = {}
roles: list[AccessRoleAssignment] = []
groups: list[str] = []
class AccessModel(BaseModel):
profiles: list[AccessProfile] = []
groups: list[AccessGroup] = []
users: list[AccessUser] = []
class Extension(ObjectPart): class Extension(ObjectPart):
kind: str = "EXTENSION" kind: str = "EXTENSION"
version: str | None = None version: str | None = None
@@ -137,6 +180,7 @@ class NormalizedProject(BaseModel):
project_id: str | None = None project_id: str | None = None
configuration: ConfigurationRoot configuration: ConfigurationRoot
source_path: str | None = None source_path: str | None = None
access: AccessModel = Field(default_factory=AccessModel)
def normalize_bsl_source(text: str) -> str: def normalize_bsl_source(text: str) -> str:
@@ -207,6 +251,9 @@ def build_normalized_project(
extension_metadata_objects: dict[str, dict[str, MetadataObject]] = {} extension_metadata_objects: dict[str, dict[str, MetadataObject]] = {}
part_owners: dict[str, tuple[MetadataObject, ObjectPart]] = {} part_owners: dict[str, tuple[MetadataObject, ObjectPart]] = {}
pending_roles: dict[str, Role] = {} pending_roles: dict[str, Role] = {}
access_profiles: dict[str, AccessProfile] = {}
access_groups: dict[str, AccessGroup] = {}
access_users: dict[str, AccessUser] = {}
extensions: list[Extension] = [] extensions: list[Extension] = []
extension_by_qualified_name: dict[str, Extension] = {} extension_by_qualified_name: dict[str, Extension] = {}
saw_configuration = False saw_configuration = False
@@ -248,6 +295,19 @@ def build_normalized_project(
metadata=dict(item.attributes), metadata=dict(item.attributes),
rights=role.rights, rights=role.rights,
) )
elif item.object_kind == "ACCESS_PROFILE":
profile = _access_profile_from_item(item)
access_profiles[_access_key(profile.qualified_name, profile.name)] = profile
elif item.object_kind == "ACCESS_GROUP":
group = _access_group_from_item(item)
access_groups[_access_key(group.qualified_name, group.name)] = group
elif item.object_kind == "ACCESS_USER":
user = _access_user_from_item(item)
access_users[_access_key(user.qualified_name, user.name)] = user
elif item.object_kind == "ACCESS_ROLE_ASSIGNMENT":
_attach_access_role_assignment(item, access_profiles, access_groups, access_users)
elif item.object_kind == "ACCESS_GROUP_MEMBERSHIP":
_attach_access_group_membership(item, access_groups, access_users)
elif item.object_kind in _ROOT_METADATA_OBJECT_KINDS: elif item.object_kind in _ROOT_METADATA_OBJECT_KINDS:
target_objects = _metadata_target_for_item( target_objects = _metadata_target_for_item(
item, item,
@@ -292,6 +352,11 @@ def build_normalized_project(
return NormalizedProject( return NormalizedProject(
project_id=project_id, project_id=project_id,
source_path=source_path, source_path=source_path,
access=AccessModel(
profiles=sorted(access_profiles.values(), key=lambda item: item.name.casefold()),
groups=sorted(access_groups.values(), key=lambda item: item.name.casefold()),
users=sorted(access_users.values(), key=lambda item: item.name.casefold()),
),
configuration=ConfigurationRoot( configuration=ConfigurationRoot(
name=configuration_name, name=configuration_name,
metadata=configuration_metadata, metadata=configuration_metadata,
@@ -344,6 +409,143 @@ def _all_metadata_objects(
return result return result
def _access_key(qualified_name: str | None, name: str) -> str:
return (qualified_name or name).casefold()
def _access_role_assignment_from_attributes(attributes: dict, *, source: str | None = None) -> AccessRoleAssignment | None:
role = _first_attr(attributes, "role", "Role", "Роль", "roleName", "ИмяРоли")
if not role:
return None
role_qualified_name = role if "." in role else f"Роль.{role}"
return AccessRoleAssignment(
role=role.split(".")[-1],
role_qualified_name=role_qualified_name,
source=source or _first_attr(attributes, "source", "Source", "Источник"),
attributes=dict(attributes),
)
def _access_profile_from_item(item: OneCXmlObject) -> AccessProfile:
attributes = dict(item.attributes)
profile = AccessProfile(
name=item.name,
qualified_name=item.qualified_name,
source_path=item.source_path,
attributes=attributes,
)
for role in _split_attr_list(attributes, "roles", "Roles", "Роли"):
assignment = _access_role_assignment_from_attributes({"role": role}, source=item.qualified_name)
if assignment is not None:
profile.roles.append(assignment)
return profile
def _access_group_from_item(item: OneCXmlObject) -> AccessGroup:
attributes = dict(item.attributes)
group = AccessGroup(
name=item.name,
qualified_name=item.qualified_name,
source_path=item.source_path,
profile=_first_attr(attributes, "profile", "Profile", "Профиль", "accessProfile", "ПрофильГруппыДоступа"),
profile_qualified_name=_first_attr(attributes, "profileQualifiedName", "ProfileQualifiedName", "ПрофильПолноеИмя"),
attributes=attributes,
)
for role in _split_attr_list(attributes, "roles", "Roles", "Роли"):
assignment = _access_role_assignment_from_attributes({"role": role}, source=item.qualified_name)
if assignment is not None:
group.roles.append(assignment)
group.users.extend(_split_attr_list(attributes, "users", "Users", "Пользователи", "members", "Members", "Участники"))
return group
def _access_user_from_item(item: OneCXmlObject) -> AccessUser:
attributes = dict(item.attributes)
user = AccessUser(
name=item.name,
qualified_name=item.qualified_name,
source_path=item.source_path,
full_name=_first_attr(attributes, "fullName", "FullName", "ПолноеИмя", "full_name"),
disabled=_truthy(_first_attr(attributes, "disabled", "Disabled", "Недействителен", "isDisabled")),
attributes=attributes,
)
for role in _split_attr_list(attributes, "roles", "Roles", "Роли"):
assignment = _access_role_assignment_from_attributes({"role": role}, source=item.qualified_name)
if assignment is not None:
user.roles.append(assignment)
user.groups.extend(_split_attr_list(attributes, "groups", "Groups", "Группы", "accessGroups", "ГруппыДоступа"))
return user
def _attach_access_role_assignment(
item: OneCXmlObject,
profiles: dict[str, AccessProfile],
groups: dict[str, AccessGroup],
users: dict[str, AccessUser],
) -> None:
assignment = _access_role_assignment_from_attributes(item.attributes, source=item.qualified_name)
if assignment is None:
return
owner = _first_attr(item.attributes, "owner", "Owner", "Владелец", "profile", "Profile", "Профиль", "group", "Group", "Группа", "user", "User", "Пользователь")
owner_key = owner.casefold() if owner else item.qualified_name.rsplit(".", 1)[0].casefold()
for collection in (profiles, groups, users):
target = collection.get(owner_key)
if target is None:
target = next(
(
value
for value in collection.values()
if value.name.casefold() == owner_key or str(value.qualified_name or "").casefold() == owner_key
),
None,
)
if target is not None:
target.roles.append(assignment)
return
def _attach_access_group_membership(
item: OneCXmlObject,
groups: dict[str, AccessGroup],
users: dict[str, AccessUser],
) -> None:
group_name = _first_attr(item.attributes, "group", "Group", "Группа", "accessGroup", "ГруппаДоступа")
user_name = _first_attr(item.attributes, "user", "User", "Пользователь", "member", "Member", "Участник")
if not group_name or not user_name:
return
group = groups.get(group_name.casefold()) or next(
(value for value in groups.values() if value.name.casefold() == group_name.casefold()),
None,
)
user = users.get(user_name.casefold()) or next(
(value for value in users.values() if value.name.casefold() == user_name.casefold()),
None,
)
if group is not None and user_name not in group.users:
group.users.append(user_name)
if user is not None and group_name not in user.groups:
user.groups.append(group_name)
def _first_attr(attributes: dict, *keys: str) -> str:
for key in keys:
value = attributes.get(key)
if value not in (None, ""):
return str(value)
return ""
def _split_attr_list(attributes: dict, *keys: str) -> list[str]:
value = _first_attr(attributes, *keys)
if not value:
return []
return [item.strip() for item in re.split(r"[,;\n]", value) if item.strip()]
def _truthy(value: str) -> bool:
return value.casefold() in {"true", "1", "yes", "да", "истина"}
def _walk_xml_objects( def _walk_xml_objects(
source_path: str, source_path: str,
element: ET.Element, element: ET.Element,
@@ -361,6 +563,14 @@ def _walk_xml_objects(
right = _xml_right_object(source_path, element, role_context) right = _xml_right_object(source_path, element, role_context)
if right is not None: if right is not None:
result.append(right) result.append(right)
elif object_kind == "ACCESS_ROLE_ASSIGNMENT":
assignment = _xml_access_role_assignment(source_path, element, parent_qualified_name)
if assignment is not None:
result.append(assignment)
elif object_kind == "ACCESS_GROUP_MEMBERSHIP":
membership = _xml_access_group_membership(source_path, element, parent_qualified_name)
if membership is not None:
result.append(membership)
elif object_kind is not None: elif object_kind is not None:
name = _xml_name(element, source_path=source_path) name = _xml_name(element, source_path=source_path)
if name: if name:
@@ -430,6 +640,56 @@ def _xml_role_reference(element: ET.Element) -> str:
return "" return ""
def _xml_access_role_assignment(
source_path: str,
element: ET.Element,
parent_qualified_name: str | None,
) -> OneCXmlObject | None:
attributes = _xml_attributes(element)
role = _first_attr(attributes, "role", "Role", "Роль", "name", "Name", "Имя")
if not role:
text = (element.text or "").strip()
role = text if text else ""
if not role:
return None
owner = _first_attr(attributes, "profile", "Profile", "Профиль", "group", "Group", "Группа", "user", "User", "Пользователь")
if not owner and parent_qualified_name:
owner = parent_qualified_name
attributes.setdefault("role", role)
if owner:
attributes.setdefault("owner", owner)
return OneCXmlObject(
source_path=source_path,
object_kind="ACCESS_ROLE_ASSIGNMENT",
name=role,
qualified_name=f"{owner}.{role}" if owner else role,
attributes=attributes,
)
def _xml_access_group_membership(
source_path: str,
element: ET.Element,
parent_qualified_name: str | None,
) -> OneCXmlObject | None:
attributes = _xml_attributes(element)
user = _first_attr(attributes, "user", "User", "Пользователь", "member", "Member", "Участник", "name", "Name", "Имя")
group = _first_attr(attributes, "group", "Group", "Группа", "accessGroup", "ГруппаДоступа")
if not group and parent_qualified_name:
group = parent_qualified_name
if not user or not group:
return None
attributes.setdefault("user", user)
attributes.setdefault("group", group)
return OneCXmlObject(
source_path=source_path,
object_kind="ACCESS_GROUP_MEMBERSHIP",
name=user,
qualified_name=f"{group}.{user}",
attributes=attributes,
)
_OBJECT_KIND_BY_TAG = { _OBJECT_KIND_BY_TAG = {
"configuration": "PROJECT", "configuration": "PROJECT",
"конфигурация": "PROJECT", "конфигурация": "PROJECT",
@@ -540,6 +800,41 @@ _OBJECT_KIND_BY_TAG = {
"пакетxdto": "XDTO_PACKAGE", "пакетxdto": "XDTO_PACKAGE",
"role": "ROLE", "role": "ROLE",
"роль": "ROLE", "роль": "ROLE",
"accessprofile": "ACCESS_PROFILE",
"accessprofiles": "ACCESS_PROFILE",
"accessgroupprofile": "ACCESS_PROFILE",
"accessgroupprofiles": "ACCESS_PROFILE",
"профильгруппыдоступа": "ACCESS_PROFILE",
"профилигруппдоступа": "ACCESS_PROFILE",
"accessgroup": "ACCESS_GROUP",
"accessgroups": "ACCESS_GROUP",
"группадоступа": "ACCESS_GROUP",
"группыдоступа": "ACCESS_GROUP",
"infobaseuser": "ACCESS_USER",
"infobaseusers": "ACCESS_USER",
"accessuser": "ACCESS_USER",
"accessusers": "ACCESS_USER",
"user": "ACCESS_USER",
"users": "ACCESS_USER",
"пользователь": "ACCESS_USER",
"пользователи": "ACCESS_USER",
"roleassignment": "ACCESS_ROLE_ASSIGNMENT",
"roleassignments": "ACCESS_ROLE_ASSIGNMENT",
"accessrole": "ACCESS_ROLE_ASSIGNMENT",
"accessroles": "ACCESS_ROLE_ASSIGNMENT",
"profilerole": "ACCESS_ROLE_ASSIGNMENT",
"grouprole": "ACCESS_ROLE_ASSIGNMENT",
"userrole": "ACCESS_ROLE_ASSIGNMENT",
"рольдоступа": "ACCESS_ROLE_ASSIGNMENT",
"роли": "ACCESS_ROLE_ASSIGNMENT",
"member": "ACCESS_GROUP_MEMBERSHIP",
"members": "ACCESS_GROUP_MEMBERSHIP",
"membership": "ACCESS_GROUP_MEMBERSHIP",
"memberships": "ACCESS_GROUP_MEMBERSHIP",
"participant": "ACCESS_GROUP_MEMBERSHIP",
"participants": "ACCESS_GROUP_MEMBERSHIP",
"участник": "ACCESS_GROUP_MEMBERSHIP",
"участники": "ACCESS_GROUP_MEMBERSHIP",
"sessionparameter": "SESSION_PARAMETER", "sessionparameter": "SESSION_PARAMETER",
"sessionparameters": "SESSION_PARAMETER", "sessionparameters": "SESSION_PARAMETER",
"параметрсеанса": "SESSION_PARAMETER", "параметрсеанса": "SESSION_PARAMETER",
@@ -779,6 +1074,9 @@ _QUALIFIED_PREFIX_BY_KIND = {
"STYLE_ITEM": "ЭлементСтиля", "STYLE_ITEM": "ЭлементСтиля",
"STYLE": "Стиль", "STYLE": "Стиль",
"LANGUAGE": "Язык", "LANGUAGE": "Язык",
"ACCESS_PROFILE": "ПрофильГруппыДоступа",
"ACCESS_GROUP": "ГруппаДоступа",
"ACCESS_USER": "Пользователь",
"FORM": "Форма", "FORM": "Форма",
"COMMAND": "Команда", "COMMAND": "Команда",
"URL_TEMPLATE": "ШаблонURL", "URL_TEMPLATE": "ШаблонURL",
@@ -1349,6 +1647,19 @@ def _xml_object_kind(element: ET.Element, *, parent_object_kind: str | None = No
tag = _local_name(element.tag).lower() tag = _local_name(element.tag).lower()
if parent_object_kind in {"FORM", "ELEMENT"} and tag in _FORM_ELEMENT_TAGS and _xml_name(element): if parent_object_kind in {"FORM", "ELEMENT"} and tag in _FORM_ELEMENT_TAGS and _xml_name(element):
return "ELEMENT" return "ELEMENT"
if parent_object_kind in {"ACCESS_PROFILE", "ACCESS_GROUP", "ACCESS_USER"} and tag in {
"role",
"roles",
"роль",
"роли",
"accessrole",
"profilerole",
"grouprole",
"userrole",
}:
return "ACCESS_ROLE_ASSIGNMENT"
if parent_object_kind == "ACCESS_GROUP" and tag in {"member", "members", "user", "users", "участник", "участники", "пользователь", "пользователи"}:
return "ACCESS_GROUP_MEMBERSHIP"
if tag in {"metadataobject", "object"}: if tag in {"metadataobject", "object"}:
type_name = _xml_type_name(element) type_name = _xml_type_name(element)
if type_name: if type_name:
@@ -1614,6 +1925,11 @@ def _read_text_file(path: Path) -> str:
__all__ = [ __all__ = [
"COMMON_BRANCH_CHILDREN", "COMMON_BRANCH_CHILDREN",
"AccessGroup",
"AccessModel",
"AccessProfile",
"AccessRoleAssignment",
"AccessUser",
"Command", "Command",
"ConfigurationRoot", "ConfigurationRoot",
"Extension", "Extension",
@@ -256,6 +256,33 @@ def test_normalize_edt_project_knows_full_common_metadata_catalog(tmp_path: Path
}.issubset(objects) }.issubset(objects)
def test_normalize_project_loads_access_profiles_groups_and_users(tmp_path: Path):
xml = tmp_path / "access.xml"
xml.write_text(
"""
<AccessData>
<Role name="ЧтениеПродаж" qualifiedName="Роль.ЧтениеПродаж" />
<AccessProfile name="МенеджерПродаж">
<Role name="ЧтениеПродаж" />
</AccessProfile>
<AccessGroup name="ОтделПродаж" profile="МенеджерПродаж">
<Member user="ivanov" />
</AccessGroup>
<InfobaseUser name="ivanov" fullName="Иванов Иван" />
</AccessData>
""",
encoding="utf-8",
)
normalized = normalize_one_c_project(tmp_path, project_id="access-data")
assert normalized.access.profiles[0].name == "МенеджерПродаж"
assert normalized.access.profiles[0].roles[0].role_qualified_name == "Роль.ЧтениеПродаж"
assert normalized.access.groups[0].profile == "МенеджерПродаж"
assert normalized.access.groups[0].users == ["ivanov"]
assert normalized.access.users[0].full_name == "Иванов Иван"
def test_normalize_edt_project_preserves_localized_descriptions(tmp_path: Path): def test_normalize_edt_project_preserves_localized_descriptions(tmp_path: Path):
catalog = tmp_path / "Контрагенты.mdo" catalog = tmp_path / "Контрагенты.mdo"
catalog.write_text( catalog.write_text(
@@ -106,6 +106,9 @@ _METADATA_OWNER_KINDS = {
NodeKind.STYLE_ITEM, NodeKind.STYLE_ITEM,
NodeKind.STYLE, NodeKind.STYLE,
NodeKind.LANGUAGE, NodeKind.LANGUAGE,
NodeKind.ACCESS_PROFILE,
NodeKind.ACCESS_GROUP,
NodeKind.ACCESS_USER,
NodeKind.XDTO_PACKAGE, NodeKind.XDTO_PACKAGE,
NodeKind.EXTENSION, NodeKind.EXTENSION,
NodeKind.ROLE, NodeKind.ROLE,
@@ -293,6 +296,8 @@ def index_project(path: str | Path, *, project_id: str | None = None, structure_
command_nodes: list[SemanticNode] = [] command_nodes: list[SemanticNode] = []
form_nodes: list[SemanticNode] = [] form_nodes: list[SemanticNode] = []
role_rights: list[dict] = [] role_rights: list[dict] = []
access_role_assignments: list[dict] = []
access_group_memberships: list[dict] = []
for source_file in source_files: for source_file in source_files:
text = _read_text_file(source_file) text = _read_text_file(source_file)
@@ -400,6 +405,12 @@ def index_project(path: str | Path, *, project_id: str | None = None, structure_
if xml_object.object_kind == "RIGHT": if xml_object.object_kind == "RIGHT":
role_rights.append(xml_object.attributes) role_rights.append(xml_object.attributes)
continue continue
if xml_object.object_kind == "ACCESS_ROLE_ASSIGNMENT":
access_role_assignments.append(xml_object.attributes)
continue
if xml_object.object_kind == "ACCESS_GROUP_MEMBERSHIP":
access_group_memberships.append(xml_object.attributes)
continue
kind = _xml_node_kind(xml_object.object_kind) kind = _xml_node_kind(xml_object.object_kind)
if kind is None: if kind is None:
continue continue
@@ -472,6 +483,9 @@ def index_project(path: str | Path, *, project_id: str | None = None, structure_
NodeKind.LANGUAGE, NodeKind.LANGUAGE,
NodeKind.XDTO_PACKAGE, NodeKind.XDTO_PACKAGE,
NodeKind.EXTENSION, NodeKind.EXTENSION,
NodeKind.ACCESS_PROFILE,
NodeKind.ACCESS_GROUP,
NodeKind.ACCESS_USER,
NodeKind.ROLE, NodeKind.ROLE,
NodeKind.FORM, NodeKind.FORM,
NodeKind.TABULAR_SECTION, NodeKind.TABULAR_SECTION,
@@ -480,6 +494,8 @@ def index_project(path: str | Path, *, project_id: str | None = None, structure_
edges.extend(_link_metadata_to_modules(root, module_nodes, metadata_nodes, form_nodes)) edges.extend(_link_metadata_to_modules(root, module_nodes, metadata_nodes, form_nodes))
edges.extend(_link_role_rights(nodes, role_rights)) edges.extend(_link_role_rights(nodes, role_rights))
edges.extend(_link_access_role_assignments(nodes, access_role_assignments))
edges.extend(_link_access_group_memberships(nodes, access_group_memberships))
edges.extend(_link_scheduled_jobs_to_routines(scheduled_job_nodes, routine_by_name)) edges.extend(_link_scheduled_jobs_to_routines(scheduled_job_nodes, routine_by_name))
edges.extend(_link_commands_to_handlers(command_nodes, routine_by_name)) edges.extend(_link_commands_to_handlers(command_nodes, routine_by_name))
edges.extend(_link_forms_to_handlers(form_nodes, routine_by_name)) edges.extend(_link_forms_to_handlers(form_nodes, routine_by_name))
@@ -1109,6 +1125,9 @@ def _xml_node_kind(object_kind: str) -> NodeKind | None:
"STYLE_ITEM": NodeKind.STYLE_ITEM, "STYLE_ITEM": NodeKind.STYLE_ITEM,
"STYLE": NodeKind.STYLE, "STYLE": NodeKind.STYLE,
"LANGUAGE": NodeKind.LANGUAGE, "LANGUAGE": NodeKind.LANGUAGE,
"ACCESS_PROFILE": NodeKind.ACCESS_PROFILE,
"ACCESS_GROUP": NodeKind.ACCESS_GROUP,
"ACCESS_USER": NodeKind.ACCESS_USER,
"XDTO_PACKAGE": NodeKind.XDTO_PACKAGE, "XDTO_PACKAGE": NodeKind.XDTO_PACKAGE,
"EXTENSION": NodeKind.EXTENSION, "EXTENSION": NodeKind.EXTENSION,
"LAYOUT": NodeKind.LAYOUT, "LAYOUT": NodeKind.LAYOUT,
@@ -1313,6 +1332,57 @@ def _link_role_rights(nodes: list[SemanticNode], role_rights: list[dict]) -> lis
return edges return edges
def _link_access_role_assignments(nodes: list[SemanticNode], assignments: list[dict]) -> list[SemanticEdge]:
if not assignments:
return []
by_qualified = {_normalize_lookup_key(node.qualified_name): node for node in nodes}
by_name_kind = {
(node.kind, _normalize_lookup_key(node.name)): node
for node in nodes
if node.kind in {NodeKind.ACCESS_PROFILE, NodeKind.ACCESS_GROUP, NodeKind.ACCESS_USER, NodeKind.ROLE}
}
edges: list[SemanticEdge] = []
for assignment in assignments:
owner_name = str(assignment.get("owner") or assignment.get("profile") or assignment.get("group") or assignment.get("user") or "")
role_name = str(assignment.get("role") or assignment.get("Role") or assignment.get("Роль") or "")
if not owner_name or not role_name:
continue
owner = by_qualified.get(_normalize_lookup_key(owner_name)) or next(
(
by_name_kind.get((kind, _normalize_lookup_key(owner_name)))
for kind in (NodeKind.ACCESS_PROFILE, NodeKind.ACCESS_GROUP, NodeKind.ACCESS_USER)
if by_name_kind.get((kind, _normalize_lookup_key(owner_name))) is not None
),
None,
)
role = by_qualified.get(_normalize_lookup_key(role_name)) or by_qualified.get(_normalize_lookup_key(f"Роль.{role_name}")) or by_name_kind.get((NodeKind.ROLE, _normalize_lookup_key(role_name)))
if owner is None or role is None:
continue
edges.append(_edge(EdgeKind.ASSIGNS_ROLE, owner, role, owner.source_ref.source_path, 1, dict(assignment)))
return edges
def _link_access_group_memberships(nodes: list[SemanticNode], memberships: list[dict]) -> list[SemanticEdge]:
if not memberships:
return []
by_qualified = {_normalize_lookup_key(node.qualified_name): node for node in nodes}
by_name_kind = {
(node.kind, _normalize_lookup_key(node.name)): node
for node in nodes
if node.kind in {NodeKind.ACCESS_GROUP, NodeKind.ACCESS_USER}
}
edges: list[SemanticEdge] = []
for membership in memberships:
group_name = str(membership.get("group") or membership.get("Group") or membership.get("Группа") or "")
user_name = str(membership.get("user") or membership.get("User") or membership.get("Пользователь") or "")
group = by_qualified.get(_normalize_lookup_key(group_name)) or by_name_kind.get((NodeKind.ACCESS_GROUP, _normalize_lookup_key(group_name)))
user = by_qualified.get(_normalize_lookup_key(user_name)) or by_name_kind.get((NodeKind.ACCESS_USER, _normalize_lookup_key(user_name)))
if group is None or user is None:
continue
edges.append(_edge(EdgeKind.MEMBER_OF, user, group, user.source_ref.source_path, 1, dict(membership)))
return edges
def _link_scheduled_jobs_to_routines( def _link_scheduled_jobs_to_routines(
scheduled_jobs: list[SemanticNode], scheduled_jobs: list[SemanticNode],
routine_by_name: dict[str, SemanticNode], routine_by_name: dict[str, SemanticNode],
@@ -107,6 +107,33 @@ def test_index_project_keeps_extended_1c_metadata_objects(tmp_path: Path):
}.issubset(by_kind) }.issubset(by_kind)
def test_index_project_links_access_profiles_groups_and_users(tmp_path: Path):
xml = tmp_path / "access.xml"
xml.write_text(
"""
<AccessData>
<Role name="ЧтениеПродаж" qualifiedName="Роль.ЧтениеПродаж" />
<AccessProfile name="МенеджерПродаж">
<Role name="ЧтениеПродаж" />
</AccessProfile>
<AccessGroup name="ОтделПродаж" profile="МенеджерПродаж">
<Member user="ivanov" />
</AccessGroup>
<InfobaseUser name="ivanov" fullName="Иванов Иван" />
</AccessData>
""",
encoding="utf-8",
)
snapshot = index_project(tmp_path, project_id="access-graph")
assert any(node.kind == NodeKind.ACCESS_PROFILE and node.name == "МенеджерПродаж" for node in snapshot.nodes)
assert any(node.kind == NodeKind.ACCESS_GROUP and node.name == "ОтделПродаж" for node in snapshot.nodes)
assert any(node.kind == NodeKind.ACCESS_USER and node.name == "ivanov" for node in snapshot.nodes)
assert any(edge.kind == EdgeKind.ASSIGNS_ROLE for edge in snapshot.edges)
assert any(edge.kind == EdgeKind.MEMBER_OF for edge in snapshot.edges)
def test_index_project_remaps_edges_from_duplicate_metadata_nodes(tmp_path: Path): def test_index_project_remaps_edges_from_duplicate_metadata_nodes(tmp_path: Path):
first = tmp_path / "first.xml" first = tmp_path / "first.xml"
first.write_text( first.write_text(
+5
View File
@@ -52,6 +52,9 @@ class NodeKind(str, Enum):
STYLE_ITEM = "STYLE_ITEM" STYLE_ITEM = "STYLE_ITEM"
STYLE = "STYLE" STYLE = "STYLE"
LANGUAGE = "LANGUAGE" LANGUAGE = "LANGUAGE"
ACCESS_PROFILE = "ACCESS_PROFILE"
ACCESS_GROUP = "ACCESS_GROUP"
ACCESS_USER = "ACCESS_USER"
HTTP_SERVICE = "HTTP_SERVICE" HTTP_SERVICE = "HTTP_SERVICE"
XDTO_PACKAGE = "XDTO_PACKAGE" XDTO_PACKAGE = "XDTO_PACKAGE"
EXTENSION = "EXTENSION" EXTENSION = "EXTENSION"
@@ -83,3 +86,5 @@ class EdgeKind(str, Enum):
RUNS = "RUNS" RUNS = "RUNS"
USES_INTEGRATION = "USES_INTEGRATION" USES_INTEGRATION = "USES_INTEGRATION"
HANDLES = "HANDLES" HANDLES = "HANDLES"
ASSIGNS_ROLE = "ASSIGNS_ROLE"
MEMBER_OF = "MEMBER_OF"
@@ -68,6 +68,30 @@ def import_quality_response(
f"Найдено прав: {summary.rights_count}", f"Найдено прав: {summary.rights_count}",
summary.rights_count, summary.rights_count,
), ),
_quality_check(
"access_profiles",
"Access profiles",
True,
f"Найдено профилей групп доступа: {summary.access_profile_count}",
summary.access_profile_count,
severity="INFO",
),
_quality_check(
"access_groups",
"Access groups",
True,
f"Найдено групп доступа: {summary.access_group_count}",
summary.access_group_count,
severity="INFO",
),
_quality_check(
"access_users",
"Access users",
True,
f"Найдено пользователей ИБ: {summary.access_user_count}",
summary.access_user_count,
severity="INFO",
),
_quality_check( _quality_check(
"extensions", "extensions",
"Extensions", "Extensions",
@@ -2889,6 +2889,34 @@ async def get_normalized_project_summary(project_id: str) -> NormalizedProjectSu
return _normalized_project_summary(normalized) return _normalized_project_summary(normalized)
@app.get("/projects/{project_id}/access")
async def get_project_access_model(project_id: str) -> dict:
normalized = _load_normalized_project(project_id)
if normalized is None:
raise HTTPException(status_code=404, detail="NormalizedProject not found")
return normalized.access.model_dump(mode="json")
@app.get("/projects/{project_id}/access/users/{user_name}")
async def get_project_access_user(project_id: str, user_name: str) -> dict:
normalized = _load_normalized_project(project_id)
if normalized is None:
raise HTTPException(status_code=404, detail="NormalizedProject not found")
wanted = user_name.casefold()
user = next(
(
item
for item in normalized.access.users
if item.name.casefold() == wanted or str(item.qualified_name or "").casefold() == wanted
),
None,
)
if user is None:
raise HTTPException(status_code=404, detail="Access user not found")
effective_roles = _effective_access_roles(normalized, user)
return {"user": user.model_dump(mode="json"), "effective_roles": effective_roles}
@app.get("/projects/{project_id}/imports/quality", response_model=ImportQualityResponse) @app.get("/projects/{project_id}/imports/quality", response_model=ImportQualityResponse)
async def get_import_quality(project_id: str) -> ImportQualityResponse: async def get_import_quality(project_id: str) -> ImportQualityResponse:
return _import_quality_response(project_id) return _import_quality_response(project_id)
@@ -3985,6 +4013,9 @@ _FLOWCHART_KIND_LABELS = {
NodeKind.STYLE_ITEM: "Элементы стиля", NodeKind.STYLE_ITEM: "Элементы стиля",
NodeKind.STYLE: "Стили", NodeKind.STYLE: "Стили",
NodeKind.LANGUAGE: "Языки", NodeKind.LANGUAGE: "Языки",
NodeKind.ACCESS_PROFILE: "Профили групп доступа",
NodeKind.ACCESS_GROUP: "Группы доступа",
NodeKind.ACCESS_USER: "Пользователи ИБ",
NodeKind.HTTP_SERVICE: "HTTP-сервисы", NodeKind.HTTP_SERVICE: "HTTP-сервисы",
NodeKind.INTEGRATION_ENDPOINT: "Интеграции", NodeKind.INTEGRATION_ENDPOINT: "Интеграции",
NodeKind.SCHEDULED_JOB: "Регламентные задания", NodeKind.SCHEDULED_JOB: "Регламентные задания",
@@ -4008,6 +4039,8 @@ _FLOWCHART_EDGE_LABELS = {
EdgeKind.RUNS: "запускает", EdgeKind.RUNS: "запускает",
EdgeKind.USES_INTEGRATION: "интеграция", EdgeKind.USES_INTEGRATION: "интеграция",
EdgeKind.HANDLES: "обработчик", EdgeKind.HANDLES: "обработчик",
EdgeKind.ASSIGNS_ROLE: "назначает роль",
EdgeKind.MEMBER_OF: "участник",
} }
_FLOWCHART_IMPORTANT_EDGES = { _FLOWCHART_IMPORTANT_EDGES = {
@@ -4017,6 +4050,8 @@ _FLOWCHART_IMPORTANT_EDGES = {
EdgeKind.WRITES, EdgeKind.WRITES,
EdgeKind.HAS_ROLE, EdgeKind.HAS_ROLE,
EdgeKind.GRANTS_ACCESS, EdgeKind.GRANTS_ACCESS,
EdgeKind.ASSIGNS_ROLE,
EdgeKind.MEMBER_OF,
EdgeKind.RUNS, EdgeKind.RUNS,
EdgeKind.USES_INTEGRATION, EdgeKind.USES_INTEGRATION,
EdgeKind.HANDLES, EdgeKind.HANDLES,
@@ -4045,6 +4080,9 @@ _FLOWCHART_LOGIC_NODE_KINDS = {
NodeKind.XDTO_PACKAGE, NodeKind.XDTO_PACKAGE,
NodeKind.EXTENSION, NodeKind.EXTENSION,
NodeKind.INTEGRATION_ENDPOINT, NodeKind.INTEGRATION_ENDPOINT,
NodeKind.ACCESS_PROFILE,
NodeKind.ACCESS_GROUP,
NodeKind.ACCESS_USER,
NodeKind.ROLE, NodeKind.ROLE,
} }
@@ -7210,6 +7248,39 @@ def _load_normalized_project(project_id: str) -> NormalizedProject | None:
return normalized return normalized
def _effective_access_roles(normalized: NormalizedProject, user) -> list[dict]:
roles = [item.model_dump(mode="json") for item in user.roles]
user_groups = {group.casefold() for group in user.groups}
groups = [
group
for group in normalized.access.groups
if group.name.casefold() in user_groups
or str(group.qualified_name or "").casefold() in user_groups
or user.name in group.users
]
profile_names = {
value.casefold()
for group in groups
for value in (group.profile, group.profile_qualified_name)
if value
}
profiles = [
profile
for profile in normalized.access.profiles
if profile.name.casefold() in profile_names or str(profile.qualified_name or "").casefold() in profile_names
]
for group in groups:
roles.extend(item.model_dump(mode="json") for item in group.roles)
for profile in profiles:
roles.extend(item.model_dump(mode="json") for item in profile.roles)
unique: dict[str, dict] = {}
for role in roles:
key = str(role.get("role_qualified_name") or role.get("role") or "").casefold()
if key:
unique.setdefault(key, role)
return sorted(unique.values(), key=lambda item: str(item.get("role") or "").casefold())
def _import_quality_response(project_id: str) -> ImportQualityResponse: def _import_quality_response(project_id: str) -> ImportQualityResponse:
normalized = _load_normalized_project(project_id) normalized = _load_normalized_project(project_id)
summary = _normalized_project_summary(normalized) if normalized is not None else None summary = _normalized_project_summary(normalized) if normalized is not None else None
@@ -20,6 +20,10 @@ class NormalizedProjectSummary(BaseModel):
command_count: int = 0 command_count: int = 0
role_count: int = 0 role_count: int = 0
rights_count: int = 0 rights_count: int = 0
access_profile_count: int = 0
access_group_count: int = 0
access_user_count: int = 0
access_assignment_count: int = 0
module_count: int = 0 module_count: int = 0
layout_count: int = 0 layout_count: int = 0
movement_count: int = 0 movement_count: int = 0
@@ -30,6 +30,13 @@ def normalized_project_summary(normalized: NormalizedProject) -> NormalizedProje
command_count=sum(len(item.commands) for item in objects), command_count=sum(len(item.commands) for item in objects),
role_count=sum(1 for item in objects if item.object_kind == "ROLE"), role_count=sum(1 for item in objects if item.object_kind == "ROLE"),
rights_count=sum(len(item.rights) for item in objects), rights_count=sum(len(item.rights) for item in objects),
access_profile_count=len(normalized.access.profiles),
access_group_count=len(normalized.access.groups),
access_user_count=len(normalized.access.users),
access_assignment_count=sum(len(item.roles) for item in normalized.access.profiles)
+ sum(len(item.roles) for item in normalized.access.groups)
+ sum(len(item.roles) for item in normalized.access.users)
+ sum(len(item.users) for item in normalized.access.groups),
module_count=sum(len(item.modules) for item in objects), module_count=sum(len(item.modules) for item in objects),
layout_count=sum(len(item.layouts) for item in objects), layout_count=sum(len(item.layouts) for item in objects),
movement_count=sum(len(item.movements) for item in objects), movement_count=sum(len(item.movements) for item in objects),
+18
View File
@@ -1476,6 +1476,13 @@ def test_import_supports_structure_only_indexing(tmp_path: Path):
<Role name="Менеджер" qualifiedName="Роль.Менеджер"> <Role name="Менеджер" qualifiedName="Роль.Менеджер">
<Right object="HTTPСервис.ПубличныйAPI" read="true" /> <Right object="HTTPСервис.ПубличныйAPI" read="true" />
</Role> </Role>
<AccessProfile name="ПрофильМенеджера">
<Role name="Менеджер" />
</AccessProfile>
<AccessGroup name="Менеджеры" profile="ПрофильМенеджера">
<Member user="ivanov" />
</AccessGroup>
<InfobaseUser name="ivanov" fullName="Иванов Иван" />
</Configuration> </Configuration>
""", """,
encoding="utf-8", encoding="utf-8",
@@ -1495,6 +1502,9 @@ def test_import_supports_structure_only_indexing(tmp_path: Path):
assert payload["object_count"] >= 2 assert payload["object_count"] >= 2
assert payload["normalized_summary"]["group_count"] >= 5 assert payload["normalized_summary"]["group_count"] >= 5
assert payload["normalized_summary"]["rights_count"] == 1 assert payload["normalized_summary"]["rights_count"] == 1
assert payload["normalized_summary"]["access_profile_count"] == 1
assert payload["normalized_summary"]["access_group_count"] == 1
assert payload["normalized_summary"]["access_user_count"] == 1
setup = client.get(f"/projects/{project_id}/setup") setup = client.get(f"/projects/{project_id}/setup")
assert setup.status_code == 200 assert setup.status_code == 200
@@ -1525,6 +1535,14 @@ def test_import_supports_structure_only_indexing(tmp_path: Path):
assert detail.json()["group_name"] == "Роли" assert detail.json()["group_name"] == "Роли"
assert detail.json()["object"]["rights"][0]["target"] == "HTTPСервис.ПубличныйAPI" assert detail.json()["object"]["rights"][0]["target"] == "HTTPСервис.ПубличныйAPI"
access = client.get(f"/projects/{project_id}/access")
assert access.status_code == 200
assert access.json()["profiles"][0]["roles"][0]["role_qualified_name"] == "Роль.Менеджер"
access_user = client.get(f"/projects/{project_id}/access/users/ivanov")
assert access_user.status_code == 200
assert access_user.json()["effective_roles"][0]["role_qualified_name"] == "Роль.Менеджер"
tree = client.get(f"/projects/{project_id}/metadata/tree") tree = client.get(f"/projects/{project_id}/metadata/tree")
assert tree.status_code == 200 assert tree.status_code == 200
root = tree.json()["root"] root = tree.json()["root"]
@@ -245,6 +245,11 @@ def _request_fingerprint(request: RuntimeImportRequest) -> str:
def _mock_project(project_id: str | None) -> NormalizedProject: def _mock_project(project_id: str | None) -> NormalizedProject:
from one_c_normalizer import ( from one_c_normalizer import (
AccessGroup,
AccessModel,
AccessProfile,
AccessRoleAssignment,
AccessUser,
Command, Command,
ConfigurationRoot, ConfigurationRoot,
Extension, Extension,
@@ -258,6 +263,31 @@ def _mock_project(project_id: str | None) -> NormalizedProject:
return NormalizedProject( return NormalizedProject(
project_id=project_id, project_id=project_id,
source_path="mock://runtime-adapter", source_path="mock://runtime-adapter",
access=AccessModel(
profiles=[
AccessProfile(
name="МенеджерПродаж",
qualified_name="ПрофильГруппыДоступа.МенеджерПродаж",
roles=[AccessRoleAssignment(role="Менеджер", role_qualified_name="Роль.Менеджер")],
)
],
groups=[
AccessGroup(
name="ОтделПродаж",
qualified_name="ГруппаДоступа.ОтделПродаж",
profile="МенеджерПродаж",
users=["demo.user"],
)
],
users=[
AccessUser(
name="demo.user",
qualified_name="Пользователь.demo.user",
full_name="Demo User",
groups=["ОтделПродаж"],
)
],
),
configuration=ConfigurationRoot( configuration=ConfigurationRoot(
groups=[ groups=[
MetadataGroup( MetadataGroup(