""" CloudSearch Transfer — 凭证管理器 v1.0.0 参考 search-ucmao 的 get_and_validate_credential + cloud-auto-save 的 Token回写 """ import time import logging from typing import Optional, Dict, Any from dataclasses import dataclass, field from ..config import PlatformConfig logger = logging.getLogger(__name__) @dataclass class CredentialStatus: """凭证状态""" valid: bool platform: str last_check: float = 0.0 last_error: str = "" checks_count: int = 0 fail_count: int = 0 class CredentialManager: """ 凭证管理器 - 凭证校验(各平台最小长度要求不同) - Token自动刷新(阿里云/迅雷) - 健康检测 """ # 各平台最小凭证长度 MIN_LENGTH_MAP = { "quark": 50, # Cookie ≥ 50字符 "baidu": 50, # Cookie ≥ 50字符 "uc": 50, # Cookie ≥ 50字符 "aliyun": 20, # refresh_token ≥ 20字符 "xunlei": 30, # refresh_token ≥ 30字符 "pan123": 30, "cloud189": 30, } # 凭证类型:cookie / refresh_token CREDENTIAL_TYPE = { "quark": "cookie", "baidu": "cookie", "uc": "cookie", "aliyun": "refresh_token", "xunlei": "refresh_token", "pan123": "cookie", "cloud189": "cookie", } def __init__(self): self._status: Dict[str, CredentialStatus] = {} self._token_cache: Dict[str, Dict[str, Any]] = {} def validate(self, platform: str, config: PlatformConfig) -> bool: """ 校验凭证有效性 参考 search-ucmao 的 get_and_validate_credential 逻辑 """ min_len = self.MIN_LENGTH_MAP.get(platform, 20) if self.CREDENTIAL_TYPE.get(platform) == "refresh_token": token = config.refresh_token valid = bool(token and len(token) >= min_len) else: cookie = config.cookie valid = bool(cookie and len(cookie) >= min_len) # 记录状态 status = self._status.get(platform, CredentialStatus(valid=False, platform=platform)) status.last_check = time.time() status.checks_count += 1 if not valid: status.fail_count += 1 status.last_error = f"凭证长度不足 (需要≥{min_len})" else: status.valid = True self._status[platform] = status return valid def get_credential(self, platform: str, config: PlatformConfig) -> str: """ 获取有效凭证 对于Token类型会自动刷新 """ if not self.validate(platform, config): return "" cred_type = self.CREDENTIAL_TYPE.get(platform, "cookie") if cred_type == "refresh_token": # 优先使用缓存的access_token cached = self._token_cache.get(platform, {}) if cached.get("access_token") and cached.get("expires_at", 0) > time.time() + 60: return cached["access_token"] return config.refresh_token else: return config.cookie def update_access_token(self, platform: str, access_token: str, expires_in: int = 3600): """更新缓存的access_token""" self._token_cache[platform] = { "access_token": access_token, "expires_at": time.time() + expires_in, } def get_status(self, platform: str) -> Optional[CredentialStatus]: """获取凭证状态""" return self._status.get(platform) def get_all_status(self) -> Dict[str, CredentialStatus]: """获取所有平台凭证状态""" return dict(self._status) def mark_invalid(self, platform: str, reason: str = ""): """标记凭证失效""" status = self._status.get(platform, CredentialStatus(valid=False, platform=platform)) status.valid = False status.last_error = reason status.fail_count += 1 status.last_check = time.time() self._status[platform] = status logger.warning(f"[Credential] {platform} marked invalid: {reason}")