""" CloudSearch Transfer — 迅雷网盘凭证管理器 v1.0.0 迅雷网盘使用 refresh_token + captcha_token 双重认证机制: 1. refresh_token → access_token (OAuth) POST https://xluser-ssl.xunlei.com/v1/auth/token Body: {"grant_type": "refresh_token", "refresh_token": "...", "client_id": "..."} 2. captcha_token 获取(某些操作需要) POST /v1/shield/captcha/init Body: {"client_id": "...", "action": "...", "device_id": "...", "meta": {"captcha_sign": "..."}} 3. get_headers() 返回所有需要的认证头: Authorization: Bearer x-captcha-token: x-client-id: x-device-id: """ from __future__ import annotations import logging import time import threading from typing import Dict, Optional import requests logger = logging.getLogger(__name__) # ─── 常量 ─────────────────────────────────────────────────────────── # 迅雷网盘 OAuth 认证端点 XUNLEI_AUTH_API = "https://xluser-ssl.xunlei.com" # 迅雷网盘客户端标识(固定值) CLIENT_ID = "Xqp0kJBXWhwaTpB6" DEVICE_ID = "925b7631473a13716b791d7f28289cad" # ─── 默认请求头 ───────────────────────────────────────────────────── DEFAULT_HEADERS: Dict[str, str] = { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/135.0.0.0 Safari/537.36" ), "Accept": "application/json, text/plain, */*", "Content-Type": "application/json", } class XunleiCredentialManager: """迅雷网盘凭证管理器。 职责: - 使用 refresh_token 换取 access_token - 获取 captcha_token(特定 action 需要) - 构建包含所有认证头的请求头字典 - 访问令牌过期前自动刷新(提前 60s) 用法: mgr = XunleiCredentialManager(refresh_token="xxx") mgr.refresh_access_token() # 刷新 access_token captcha = mgr.get_captcha_token("restore") # 获取验证码令牌 headers = mgr.get_headers() # 获取完整的认证请求头 is_ok = mgr.validate() # 验证凭证有效性 Attributes: CLIENT_ID: 迅雷客户端 ID。 DEVICE_ID: 设备标识。 """ # ─── 类常量 ──────────────────────────────────────────────── CLIENT_ID: str = CLIENT_ID DEVICE_ID: str = DEVICE_ID def __init__(self, refresh_token: str = "") -> None: """初始化迅雷凭证管理器。 Args: refresh_token: 迅雷网盘的 refresh_token。 """ self._refresh_token: str = refresh_token.strip() self._access_token: str = "" self._expires_at: float = 0.0 self._captcha_tokens: Dict[str, str] = {} # action → captcha_token self._lock: threading.Lock = threading.Lock() self._session: requests.Session = requests.Session() self._session.headers.update(DEFAULT_HEADERS) # ─── 公开 API ────────────────────────────────────────────── def validate(self) -> bool: """验证 refresh_token 是否有效。 要求 refresh_token 长度 >= 20,且能成功换取 access_token。 Returns: True 表示凭证有效。 """ if not self._refresh_token or len(self._refresh_token) < 20: logger.warning( "[XunleiCredential] refresh_token 长度不足 20,验证失败" ) return False return self.refresh_access_token() def is_valid(self) -> bool: """validate() 的别名。""" return self.validate() def refresh_access_token(self) -> bool: """使用 refresh_token 换取 access_token。 POST /v1/auth/token Body: {"grant_type": "refresh_token", "refresh_token": "...", "client_id": "..."} 返回 True 表示成功,False 表示失败。 """ with self._lock: return self._do_refresh() def get_captcha_token(self, action: str) -> str: """获取指定 action 的 captcha_token。 POST /v1/shield/captcha/init Body: { "client_id": "...", "action": "...", "device_id": "...", "meta": {"captcha_sign": "..."} } captcha_token 会按 action 缓存,避免重复获取。 Args: action: 操作类型,如 "restore"、"share" 等。 Returns: captcha_token 字符串,获取失败返回空字符串。 """ with self._lock: # 检查缓存 if action in self._captcha_tokens: return self._captcha_tokens[action] return self._do_get_captcha(action) def get_headers(self) -> Dict[str, str]: """构建包含所有认证头的请求头字典。 返回: - Authorization: Bearer - x-captcha-token: (如有) - x-client-id: - x-device-id: Returns: 认证请求头字典。 """ self._ensure_token_valid() headers: Dict[str, str] = { "x-client-id": self.CLIENT_ID, "x-device-id": self.DEVICE_ID, } if self._access_token: headers["Authorization"] = f"Bearer {self._access_token}" return headers def get_headers_with_captcha(self, action: str = "") -> Dict[str, str]: """获取带 captcha_token 的完整认证头。 Args: action: captcha 操作类型,空字符串表示不需要 captcha。 Returns: 包含 Authorization + x-captcha-token 的请求头字典。 """ headers = self.get_headers() if action: captcha = self.get_captcha_token(action) if captcha: headers["x-captcha-token"] = captcha return headers def get_access_token(self) -> str: """获取当前有效的 access_token(必要时自动刷新)。""" self._ensure_token_valid() return self._access_token @property def refresh_token(self) -> str: """返回当前 refresh_token。""" return self._refresh_token @refresh_token.setter def refresh_token(self, value: str) -> None: """更新 refresh_token。""" self._refresh_token = value.strip() with self._lock: self._access_token = "" self._expires_at = 0.0 self._captcha_tokens.clear() # ─── 内部方法 ────────────────────────────────────────────── def _ensure_token_valid(self) -> None: """确保 access_token 有效(过期则自动刷新)。""" if not self._access_token or time.time() >= (self._expires_at - 60): self.refresh_access_token() def _do_refresh(self) -> bool: """实际执行 token 刷新。 POST https://xluser-ssl.xunlei.com/v1/auth/token """ if not self._refresh_token: logger.error("[XunleiCredential] 没有 refresh_token,无法刷新") return False url = f"{XUNLEI_AUTH_API}/v1/auth/token" body: Dict[str, str] = { "grant_type": "refresh_token", "refresh_token": self._refresh_token, "client_id": self.CLIENT_ID, } try: resp = self._session.post(url, json=body, timeout=30) data = resp.json() if resp.status_code != 200: logger.error( "[XunleiCredential] 刷新 token 失败: HTTP %d, %s", resp.status_code, data, ) return False access_token = data.get("access_token", "") if not access_token: logger.error( "[XunleiCredential] 响应中缺少 access_token: %s", data ) return False expires_in = int(data.get("expires_in", 7200)) new_refresh = data.get("refresh_token", self._refresh_token) self._access_token = access_token self._expires_at = time.time() + expires_in # 更新 refresh_token(服务端可能下发新的) if new_refresh != self._refresh_token: logger.info( "[XunleiCredential] refresh_token 已轮换: " f"{self._refresh_token[:8]}... → {new_refresh[:8]}..." ) self._refresh_token = new_refresh # 清除 captcha 缓存(token 变了,captcha 可能也失效了) self._captcha_tokens.clear() logger.info( "[XunleiCredential] Token 刷新成功 (expires_in=%ds)", expires_in ) return True except requests.RequestException as e: logger.error(f"[XunleiCredential] 刷新 token 网络异常: {e}") return False except Exception as e: logger.exception(f"[XunleiCredential] 刷新 token 未知异常: {e}") return False def _do_get_captcha(self, action: str) -> str: """获取 captcha_token。 POST /v1/shield/captcha/init """ url = f"{XUNLEI_AUTH_API}/v1/shield/captcha/init" body: Dict[str, Any] = { "client_id": self.CLIENT_ID, "action": action, "device_id": self.DEVICE_ID, "meta": { "captcha_sign": "", }, } # 需要 Authorization 头 if not self._access_token: if not self._do_refresh(): logger.error("[XunleiCredential] 无法获取 access_token,跳过 captcha") return "" headers: Dict[str, str] = { "Authorization": f"Bearer {self._access_token}", "Content-Type": "application/json", } try: resp = self._session.post(url, json=body, headers=headers, timeout=15) data = resp.json() captcha_token = data.get("captcha_token", "") if captcha_token: self._captcha_tokens[action] = captcha_token logger.info( "[XunleiCredential] captcha_token 获取成功 for action=%s", action, ) else: logger.warning( "[XunleiCredential] captcha_token 为空 for action=%s: %s", action, data, ) return captcha_token except requests.RequestException as e: logger.error(f"[XunleiCredential] 获取 captcha_token 网络异常: {e}") return "" except Exception as e: logger.exception(f"[XunleiCredential] 获取 captcha_token 异常: {e}") return "" def __repr__(self) -> str: return ( f"XunleiCredentialManager(" f"refresh_token={'***' if self._refresh_token else 'None'}, " f"has_access_token={bool(self._access_token)}, " f"captcha_actions={list(self._captcha_tokens.keys())})" )