""" CloudSearch Transfer — 迅雷网盘转存核心 v1.0.0 迅雷网盘 4 步转存流程: ① GET .../drive/v1/share?share_id=xx → pass_code_token, files[], title ② POST .../share/restore → restore_task_id (转存) ③ 轮询 GET .../tasks/{task_id} → progress==100, trace_file_ids → oldId→newId映射 ④ POST .../share → share_url + pass_code 迅雷网盘需要 refresh_token + captcha_token 双重认证。 """ from __future__ import annotations import json import logging import re import time from typing import Any, Dict, List, Optional, Tuple import requests from .credential import XunleiCredentialManager logger = logging.getLogger(__name__) # ─── 迅雷 API 基础地址 ────────────────────────────────────────────── XUNLEI_PAN_API = "https://api-pan.xunlei.com" # ─── URL 解析正则 ─────────────────────────────────────────────────── # 匹配 pan.xunlei.com/s/ SHARE_URL_PATTERN = re.compile(r"pan\.xunlei\.com/s/([A-Za-z0-9]+)") class XunleiTransfer: """迅雷网盘转存引擎。 封装完整的 4 步 API 流程:获取分享详情 → 转存文件 → 轮询转存任务 → 创建新分享。 Attributes: credential: 迅雷凭证管理器实例。 session: 复用的 requests.Session。 timeout: 请求超时(秒)。 poll_interval: 轮询间隔(秒)。 poll_max_attempts: 最大轮询次数。 """ def __init__( self, credential: XunleiCredentialManager, timeout: int = 30, poll_interval: float = 1.0, poll_max_attempts: int = 60, ) -> None: """初始化转存引擎。 Args: credential: 有效的迅雷凭证管理器。 timeout: HTTP 请求超时秒数。 poll_interval: 异步任务轮询间隔秒数。 poll_max_attempts: 异步任务最大轮询次数。 """ self.credential: XunleiCredentialManager = credential self.timeout: int = timeout self.poll_interval: float = poll_interval self.poll_max_attempts: int = poll_max_attempts self.session: requests.Session = requests.Session() # ─── 步骤 ①:获取分享详情 ───────────────────────────────────── def _get_share_info(self, share_id: str) -> Dict[str, Any]: """步骤①:获取分享详情。 GET /drive/v1/share?share_id= 返回字段包含:pass_code_token, files[], title 等。 Args: share_id: 分享 ID(从 URL 解析)。 Returns: 分享信息字典,包含 files, title, pass_code_token。 Raises: RuntimeError: API 返回错误。 """ url = f"{XUNLEI_PAN_API}/drive/v1/share" params: Dict[str, str] = {"share_id": share_id} headers = self.credential.get_headers() logger.info("[XunleiTransfer] ① Fetching share info for share_id=%s", share_id) try: resp = self.session.get( url, params=params, headers=headers, timeout=self.timeout ) resp.raise_for_status() except requests.RequestException as exc: raise RuntimeError(f"获取分享详情失败: {exc}") from exc data: Dict[str, Any] = resp.json() # 检查业务错误 errcode = data.get("errcode", data.get("error_code", 0)) if errcode != 0: raise RuntimeError( f"分享详情API返回错误: errcode={errcode}, message={data.get('message', data.get('error', ''))}" ) # 提取关键字段 pass_code_token: str = data.get("pass_code_token", "") files: List[Dict[str, Any]] = data.get("files", []) title: str = data.get("title", data.get("share_name", "")) if not files: raise RuntimeError("分享内容为空") logger.info( "[XunleiTransfer] ① Share info: title=%s, files=%d, has_pass_code_token=%s", title, len(files), bool(pass_code_token), ) return { "pass_code_token": pass_code_token, "files": files, "title": title, "share_id": share_id, } # ─── 步骤 ②:转存文件 ───────────────────────────────────────── def _restore_files( self, share_id: str, pass_code_token: str, file_ids: List[str], parent_id: str = "", ) -> str: """步骤②:转存文件到自己的迅雷网盘。 POST /drive/v1/share/restore Body: { "file_ids": ["", ...], "pass_code_token": "", "share_id": "", "parent_id": "", "specify_parent_id": true } Args: share_id: 分享 ID。 pass_code_token: 步骤①获取的 pass_code_token。 file_ids: 要转存的文件 ID 列表。 parent_id: 目标父目录 ID,空字符串表示根目录。 Returns: restore_task_id 字符串,用于步骤③轮询。 Raises: RuntimeError: API 返回错误。 """ url = f"{XUNLEI_PAN_API}/drive/v1/share/restore" body: Dict[str, Any] = { "file_ids": file_ids, "pass_code_token": pass_code_token, "share_id": share_id, "parent_id": parent_id or "", "specify_parent_id": True, } # restore 操作可能需要 captcha_token headers = self.credential.get_headers_with_captcha(action="restore") headers.setdefault("Content-Type", "application/json") logger.info( "[XunleiTransfer] ② Restoring %d files from share_id=%s", len(file_ids), share_id, ) try: resp = self.session.post( url, json=body, headers=headers, timeout=self.timeout ) resp.raise_for_status() except requests.RequestException as exc: raise RuntimeError(f"转存请求失败: {exc}") from exc data: Dict[str, Any] = resp.json() errcode = data.get("errcode", data.get("error_code", 0)) if errcode != 0: raise RuntimeError( f"转存请求失败: errcode={errcode}, message={data.get('message', data.get('error', ''))}" ) task_id: Optional[str] = data.get("restore_task_id", data.get("task_id")) if not task_id: raise RuntimeError(f"转存 task_id 缺失, response: {data}") logger.info("[XunleiTransfer] ② Restore task created: task_id=%s", task_id) return task_id # ─── 步骤 ③:轮询转存任务 ───────────────────────────────────── def _poll_restore_task(self, task_id: str) -> Dict[str, str]: """步骤③:轮询转存任务直到完成。 GET /drive/v1/tasks/{task_id} 当 progress==100 时表示完成,返回 oldId→newId 映射。 从 params.trace_file_ids 解析 JSON 字符串获取映射关系。 Args: task_id: 步骤②返回的 restore_task_id。 Returns: {"oldId": "newId", ...} 文件 ID 映射字典。 Raises: RuntimeError: 任务失败或超时。 """ url = f"{XUNLEI_PAN_API}/drive/v1/tasks/{task_id}" headers = self.credential.get_headers() for attempt in range(1, self.poll_max_attempts + 1): try: resp = self.session.get(url, headers=headers, timeout=self.timeout) resp.raise_for_status() except requests.RequestException: logger.warning( "[XunleiTransfer] ③ Poll attempt %d/%d failed, retrying...", attempt, self.poll_max_attempts, ) time.sleep(self.poll_interval) continue data: Dict[str, Any] = resp.json() progress: int = data.get("progress", 0) status: str = data.get("status", "") logger.debug( "[XunleiTransfer] ③ Poll %d/%d: progress=%d, status=%s", attempt, self.poll_max_attempts, progress, status, ) if status == "failed" or status == "error": raise RuntimeError( f"转存任务失败: task_id={task_id}, status={status}" ) if progress == 100: # 从 params.trace_file_ids 解析 oldId→newId 映射 params: Dict[str, Any] = data.get("params", {}) trace_file_ids: str = params.get("trace_file_ids", "") if trace_file_ids: try: id_mapping: Dict[str, str] = json.loads(trace_file_ids) logger.info( "[XunleiTransfer] ③ Restore completed: %d files mapped", len(id_mapping), ) return id_mapping except json.JSONDecodeError: logger.warning( "[XunleiTransfer] ③ Failed to parse trace_file_ids: %s", trace_file_ids, ) # fallback: 检查 result 字段 result = data.get("result", {}) if result: logger.info("[XunleiTransfer] ③ Restore completed via result field") return result # 最后的 fallback: 返回空映射 logger.warning( "[XunleiTransfer] ③ Restore completed but no file mapping found" ) return {} if progress < 0: raise RuntimeError( f"转存任务异常: task_id={task_id}, progress={progress}" ) time.sleep(self.poll_interval) raise RuntimeError( f"转存任务超时: task_id={task_id}, 已轮询 {self.poll_max_attempts} 次" ) # ─── 步骤 ④:创建新分享 ───────────────────────────────────── def _create_share( self, file_ids: List[str], expiration_days: str = "-1", ) -> Tuple[str, str]: """步骤④:创建新分享链接。 POST /drive/v1/share Body: { "file_ids": ["", ...], "expiration_days": "-1" } expiration_days: "-1" 表示永久有效。 Args: file_ids: 要分享的文件 ID 列表。 expiration_days: 过期天数,"-1" 表示永久。 Returns: (share_url, pass_code) 元组。 Raises: RuntimeError: API 返回错误。 """ url = f"{XUNLEI_PAN_API}/drive/v1/share" body: Dict[str, Any] = { "file_ids": file_ids, "expiration_days": expiration_days, } # share 操作可能需要 captcha_token headers = self.credential.get_headers_with_captcha(action="share") headers.setdefault("Content-Type", "application/json") logger.info( "[XunleiTransfer] ④ Creating share: %d files", len(file_ids) ) try: resp = self.session.post( url, json=body, headers=headers, timeout=self.timeout ) resp.raise_for_status() except requests.RequestException as exc: raise RuntimeError(f"创建分享失败: {exc}") from exc data: Dict[str, Any] = resp.json() errcode = data.get("errcode", data.get("error_code", 0)) if errcode != 0: raise RuntimeError( f"创建分享失败: errcode={errcode}, message={data.get('message', data.get('error', ''))}" ) share_url: str = data.get("share_url", data.get("link", "")) pass_code: str = data.get("pass_code", data.get("code", "")) if not share_url: share_id = data.get("share_id", "") if share_id: share_url = f"https://pan.xunlei.com/s/{share_id}" logger.info( "[XunleiTransfer] ④ Share created: url=%s, pass_code=%s", share_url, pass_code, ) return share_url, pass_code # ─── 公开入口 ───────────────────────────────────────────────── def transfer( self, share_url: str, save_dir: str = "", share_password: str = "", ) -> Dict[str, Any]: """执行完整的 4 步转存流程。 从原始迅雷分享链接开始,将文件转存到自己网盘,再创建新分享。 Args: share_url: 原始迅雷分享链接,如 https://pan.xunlei.com/s/xxxxx。 save_dir: 转存目标目录 ID,空字符串表示根目录。 share_password: 新分享的密码(迅雷使用 pass_code)。 Returns: 包含以下字段的字典: - success: bool - new_file_ids: List[str] — 转存后的文件ID列表(newId) - file_name: str — 分享标题 - share_url: str — 新分享链接 - passcode: str — 新分享 pass_code Raises: RuntimeError: 任一步骤失败。 ValueError: URL 解析失败。 """ # 0. 解析 URL 提取 share_id match = SHARE_URL_PATTERN.search(share_url) if not match: raise ValueError(f"无法从URL中提取迅雷分享ID: {share_url}") share_id: str = match.group(1) logger.info( "[XunleiTransfer] Starting 4-step transfer for share_id=%s", share_id ) # ① 获取分享详情 share_info: Dict[str, Any] = self._get_share_info(share_id) files: List[Dict[str, Any]] = share_info.get("files", []) title: str = share_info.get("title", "分享") pass_code_token: str = share_info.get("pass_code_token", "") # 提取原始文件 ID file_ids: List[str] = [ f.get("file_id", f.get("fid", f.get("id", ""))) for f in files if f.get("file_id") or f.get("fid") or f.get("id") ] if not file_ids: raise RuntimeError("无法从分享中提取文件ID") # ② 发起转存 task_id: str = self._restore_files( share_id, pass_code_token, file_ids, parent_id=save_dir ) # ③ 轮询转存任务 → 获取 oldId→newId 映射 id_mapping: Dict[str, str] = self._poll_restore_task(task_id) # 从映射中提取新的文件 ID new_file_ids: List[str] = [] for old_fid in file_ids: new_fid = id_mapping.get(old_fid, "") if new_fid: new_file_ids.append(new_fid) else: logger.warning( "[XunleiTransfer] No newId mapped for old_fid=%s", old_fid ) if not new_file_ids: raise RuntimeError("转存完成但未获取到新文件ID") # ④ 创建新分享 share_url_new, pass_code = self._create_share(new_file_ids) logger.info( "[XunleiTransfer] Transfer complete: %d files, new_share=%s", len(new_file_ids), share_url_new, ) return { "success": True, "new_file_ids": new_file_ids, "file_name": title, "share_url": share_url_new, "passcode": pass_code or share_password, } @staticmethod def parse_share_url(url: str) -> Optional[str]: """从迅雷分享 URL 中提取 share_id。 Args: url: 迅雷分享链接。 Returns: share_id 字符串,解析失败返回 None。 """ match = SHARE_URL_PATTERN.search(url) return match.group(1) if match else None @staticmethod def extract_file_ids(files: List[Dict[str, Any]]) -> List[str]: """从文件列表中提取 file_id。 Args: files: 文件信息字典列表。 Returns: file_id 字符串列表。 """ return [ f.get("file_id", f.get("fid", f.get("id", ""))) for f in files if f.get("file_id") or f.get("fid") or f.get("id") ] @staticmethod def parse_trace_file_ids(trace: str) -> Dict[str, str]: """解析 trace_file_ids JSON 字符串为 oldId→newId 映射。 Args: trace: trace_file_ids JSON 字符串,如 '{"oldId":"newId"}'. Returns: {"oldId": "newId", ...} 映射字典。 """ try: return json.loads(trace) except (json.JSONDecodeError, TypeError): return {} def close(self) -> None: """关闭 HTTP 会话。""" self.session.close() def __enter__(self) -> "XunleiTransfer": return self def __exit__(self, *args: Any) -> None: self.close()