Improve SMB credential handling for AI structure
This commit is contained in:
@@ -77,13 +77,21 @@ def _smbclient() -> Any:
|
||||
|
||||
|
||||
def _register_session(smbclient: Any, *, server: str, username: str, password: str, domain: str | None) -> None:
|
||||
qualified_user = f"{domain}\\{username}" if domain else username
|
||||
effective_domain, effective_username = _normalize_credentials(username, domain)
|
||||
qualified_user = f"{effective_domain}\\{effective_username}" if effective_domain else effective_username
|
||||
try:
|
||||
smbclient.register_session(server, username=qualified_user, password=password)
|
||||
except Exception as error: # pragma: no cover - depends on smb backend details
|
||||
raise RuntimeError(_translate_smb_error(error, server=server, username=qualified_user)) from error
|
||||
|
||||
|
||||
def _copy_smb_directory(smbclient: Any, source: str, target: Path) -> None:
|
||||
target.mkdir(parents=True, exist_ok=True)
|
||||
for item in smbclient.scandir(source):
|
||||
try:
|
||||
items = list(smbclient.scandir(source))
|
||||
except Exception as error: # pragma: no cover - depends on smb backend details
|
||||
raise RuntimeError(_translate_smb_error(error, path=source)) from error
|
||||
for item in items:
|
||||
destination = target / item.name
|
||||
child_source = f"{source}\\{item.name}"
|
||||
try:
|
||||
@@ -98,9 +106,12 @@ def _copy_smb_directory(smbclient: Any, source: str, target: Path) -> None:
|
||||
|
||||
|
||||
def _copy_local_file_to_smb(smbclient: Any, source: Path, target: str) -> None:
|
||||
try:
|
||||
with source.open("rb") as local_file:
|
||||
with smbclient.open_file(target, mode="wb") as remote_file:
|
||||
shutil.copyfileobj(local_file, remote_file, length=1024 * 1024)
|
||||
except Exception as error: # pragma: no cover - depends on smb backend details
|
||||
raise RuntimeError(_translate_smb_error(error, path=target)) from error
|
||||
|
||||
|
||||
def _ensure_smb_directory(smbclient: Any, path: str) -> None:
|
||||
@@ -115,9 +126,9 @@ def _ensure_smb_directory(smbclient: Any, path: str) -> None:
|
||||
_ensure_smb_directory(smbclient, parent)
|
||||
try:
|
||||
smbclient.mkdir(normalized)
|
||||
except OSError:
|
||||
except OSError as error:
|
||||
if not smbclient.path.isdir(normalized):
|
||||
raise
|
||||
raise RuntimeError(_translate_smb_error(error, path=normalized)) from error
|
||||
|
||||
|
||||
def _unc_parent_path(path: str) -> str | None:
|
||||
@@ -129,3 +140,31 @@ def _unc_parent_path(path: str) -> str | None:
|
||||
return f"\\\\{server}\\{share}"
|
||||
parent_relative = "\\".join(parts[:-1])
|
||||
return f"\\\\{server}\\{share}\\{parent_relative}"
|
||||
|
||||
|
||||
def _normalize_credentials(username: str, domain: str | None) -> tuple[str | None, str]:
|
||||
raw_username = username.strip()
|
||||
raw_domain = (domain or "").strip() or None
|
||||
if "\\" in raw_username:
|
||||
embedded_domain, embedded_username = raw_username.split("\\", 1)
|
||||
return embedded_domain.strip() or raw_domain, embedded_username.strip()
|
||||
if "@" in raw_username and not raw_domain:
|
||||
embedded_username, embedded_domain = raw_username.split("@", 1)
|
||||
return embedded_domain.strip() or None, embedded_username.strip()
|
||||
return raw_domain, raw_username
|
||||
|
||||
|
||||
def _translate_smb_error(error: Exception, *, server: str | None = None, path: str | None = None, username: str | None = None) -> str:
|
||||
message = str(error).strip()
|
||||
lowered = message.casefold()
|
||||
target = path or server or "сетевой ресурс"
|
||||
if any(token in lowered for token in ["logon failure", "access denied", "authentication", "STATUS_LOGON_FAILURE".casefold(), "STATUS_ACCESS_DENIED".casefold()]):
|
||||
user_part = f" для пользователя {username}" if username else ""
|
||||
return f"Ошибка авторизации SMB{user_part}. Проверьте логин, пароль, домен и права доступа к {target}."
|
||||
if any(token in lowered for token in ["bad network name", "object name not found", "path not found", "no such file", "STATUS_OBJECT_PATH_NOT_FOUND".casefold(), "STATUS_BAD_NETWORK_NAME".casefold()]):
|
||||
return f"Сетевой путь не найден или недоступен: {target}."
|
||||
if any(token in lowered for token in ["connection reset", "connection refused", "timed out", "host is down", "network name deleted", "network path was not found"]):
|
||||
return f"Не удалось подключиться к сетевому ресурсу {target}. Проверьте доступность сервера и сети."
|
||||
if message:
|
||||
return f"Ошибка SMB при обращении к {target}: {message}"
|
||||
return f"Неизвестная ошибка SMB при обращении к {target}."
|
||||
|
||||
@@ -1310,6 +1310,28 @@ def test_server_smb_browse_validates_unc_path():
|
||||
assert "UNC путь" in response.json()["error"]
|
||||
|
||||
|
||||
def test_smb_credentials_embedded_domain_format_is_normalized():
|
||||
from api_server import smb_paths
|
||||
|
||||
domain, username = smb_paths._normalize_credentials("MST\\m", None)
|
||||
|
||||
assert domain == "MST"
|
||||
assert username == "m"
|
||||
|
||||
|
||||
def test_smb_error_translation_returns_russian_auth_message():
|
||||
from api_server import smb_paths
|
||||
|
||||
message = smb_paths._translate_smb_error(
|
||||
RuntimeError("STATUS_LOGON_FAILURE: logon failure"),
|
||||
server="192.168.220.200",
|
||||
username="MST\\m",
|
||||
)
|
||||
|
||||
assert "Ошибка авторизации SMB" in message
|
||||
assert "Проверьте логин, пароль, домен" in message
|
||||
|
||||
|
||||
def test_project_setup_recovers_indexed_status_from_stored_snapshot(tmp_path: Path):
|
||||
(tmp_path / "metadata.xml").write_text(
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user