""" CloudSearch Transfer — 夸克网盘转存核心 v1.0.0 夸克网盘 7 步转存流程: ① POST .../share/sharepage/token → stoken ② GET .../share/sharepage/detail → fid, share_fid_token, title ③ POST .../share/sharepage/save → task_id (转存任务) ④ 轮询 GET .../task → save_as_top_fids (status==2 完成) ⑤ POST .../share → task_id (创建分享任务) ⑥ 轮询 GET .../task → share_id ⑦ POST .../share/password → share_url, passcode 参考 cloud-auto-save 的 quark.py 实现。 """ from __future__ import annotations import logging import re import time from typing import Any, Dict, List, Optional, Tuple import requests from .credential import QuarkCredentialManager logger = logging.getLogger(__name__) # ─── 夸克 API 基础地址 ────────────────────────────────────────────── QUARK_API_BASE = "https://drive-pc.quark.cn" QUARK_SHARE_API = f"{QUARK_API_BASE}/1/clouddrive/share" # ─── URL 解析正则 ─────────────────────────────────────────────────── # 匹配 pan.quark.cn/s/ SHARE_URL_PATTERN = re.compile(r"pan\.quark\.cn/s/(\w+)") class QuarkTransfer: """夸克网盘转存引擎。 封装完整的 7 步 API 流程:获取 stoken → 获取详情 → 保存文件 → 创建分享 → 设置密码。 Attributes: credential: 夸克凭证管理器实例。 session: 复用的 requests.Session。 timeout: 请求超时(秒)。 poll_interval: 轮询间隔(秒)。 poll_max_attempts: 最大轮询次数。 """ def __init__( self, credential: QuarkCredentialManager, timeout: int = 30, poll_interval: float = 0.5, poll_max_attempts: int = 50, ) -> None: """初始化转存引擎。 Args: credential: 有效的夸克凭证管理器。 timeout: HTTP 请求超时秒数。 poll_interval: 异步任务轮询间隔秒数。 poll_max_attempts: 异步任务最大轮询次数(默认 50,同 base 层配置)。 """ self.credential: QuarkCredentialManager = credential self.timeout: int = timeout self.poll_interval: float = poll_interval self.poll_max_attempts: int = poll_max_attempts self.session: requests.Session = requests.Session() # ─── 步骤 ①:获取 stoken ─────────────────────────────────────── def _get_stoken(self, pwd_id: str, passcode: str = "") -> str: """步骤①:向夸克交换 stoken。 POST /1/clouddrive/share/sharepage/token Body: {"passcode": "", "pwd_id": ""} Args: pwd_id: 分享 ID(从 URL 解析)。 passcode: 分享提取码,无密码时为空字符串。 Returns: stoken 字符串。 Raises: RuntimeError: API 返回错误或 stoken 缺失。 """ url = f"{QUARK_SHARE_API}/sharepage/token" body: Dict[str, str] = { "passcode": passcode, "pwd_id": pwd_id, } headers = self.credential.get_headers() headers.setdefault("Content-Type", "application/json") logger.info("[QuarkTransfer] ① Getting stoken for pwd_id=%s", pwd_id) try: resp = self.session.post(url, json=body, headers=headers, timeout=self.timeout) resp.raise_for_status() except requests.RequestException as exc: raise RuntimeError(f"获取 stoken 失败: {exc}") from exc data: Dict[str, Any] = resp.json() stoken: Optional[str] = data.get("data", {}).get("stoken") if not stoken: raise RuntimeError(f"stoken 缺失, response: {data}") logger.info("[QuarkTransfer] ① stoken obtained") return stoken # ─── 步骤 ②:获取分享详情 ───────────────────────────────────── def _get_detail(self, pwd_id: str, stoken: str) -> Dict[str, Any]: """步骤②:获取分享详情。 GET /1/clouddrive/share/sharepage/detail?pwd_id=xx&stoken=xx&_fetch_share=1 返回字段包含:title, fid, share_fid_token 等。 Args: pwd_id: 分享 ID。 stoken: 步骤①获取的 stoken。 Returns: 分享详情字典。 Raises: RuntimeError: API 返回错误。 """ url = f"{QUARK_SHARE_API}/sharepage/detail" params: Dict[str, str] = { "pwd_id": pwd_id, "stoken": stoken, "_fetch_share": "1", } headers = self.credential.get_headers() logger.info("[QuarkTransfer] ② Fetching share detail for pwd_id=%s", pwd_id) try: resp = self.session.get(url, params=params, headers=headers, timeout=self.timeout) resp.raise_for_status() except requests.RequestException as exc: raise RuntimeError(f"获取分享详情失败: {exc}") from exc data: Dict[str, Any] = resp.json() status: int = data.get("status", -1) if status != 0 and data.get("code") not in (0, None): raise RuntimeError(f"分享详情API返回错误: status={status}, message={data.get('message')}") detail: Optional[Dict[str, Any]] = data.get("data") if not detail: raise RuntimeError(f"分享详情数据为空, response: {data}") # 提取关键字段供后续使用 logger.info( "[QuarkTransfer] ② Detail: title=%s, fid=%s", detail.get("title"), detail.get("fid"), ) return detail # ─── 步骤 ③:发起转存 ───────────────────────────────────────── def _init_save(self, pwd_id: str, stoken: str, detail: Dict[str, Any], to_pdir_fid: str = "0") -> str: """步骤③:发起转存请求。 POST /1/clouddrive/share/sharepage/save Body: { "fid_list": [, ...], "fid_token_list": [, ...], "to_pdir_fid": "0", "pwd_id": "", "stoken": "", "pdir_fid": "0", "scene": "link" } Args: pwd_id: 分享 ID。 stoken: stoken。 detail: 步骤②的分享详情。 to_pdir_fid: 目标目录 ID,默认 "0" 即根目录。 Returns: task_id 字符串,用于步骤④轮询。 Raises: RuntimeError: API 返回错误。 """ url = f"{QUARK_SHARE_API}/sharepage/save" fid_list: List[str] = detail.get("fid_list", [detail.get("fid", [])]) fid_token_list: List[str] = detail.get("fid_token_list", [detail.get("share_fid_token", [])]) # 如果 detail 的 fid/fid_token 是单值而非列表,则包装为列表 if not isinstance(fid_list, list): fid_list = [fid_list] if fid_list else [] if not isinstance(fid_token_list, list): fid_token_list = [fid_token_list] if fid_token_list else [] body: Dict[str, Any] = { "fid_list": fid_list, "fid_token_list": fid_token_list, "to_pdir_fid": to_pdir_fid, "pwd_id": pwd_id, "stoken": stoken, "pdir_fid": "0", "scene": "link", } headers = self.credential.get_headers() headers.setdefault("Content-Type", "application/json") logger.info("[QuarkTransfer] ③ Initiating save: %d files to dir=%s", len(fid_list), to_pdir_fid) try: resp = self.session.post(url, json=body, headers=headers, timeout=self.timeout) resp.raise_for_status() except requests.RequestException as exc: raise RuntimeError(f"发起转存失败: {exc}") from exc data: Dict[str, Any] = resp.json() status: int = data.get("status", -1) if status != 0: raise RuntimeError(f"转存请求失败: status={status}, message={data.get('message')}") task_id: Optional[str] = data.get("data", {}).get("task_id") if not task_id: raise RuntimeError(f"转存 task_id 缺失, response: {data}") logger.info("[QuarkTransfer] ③ Save task created: task_id=%s", task_id) return task_id # ─── 步骤 ④:轮询转存任务 ───────────────────────────────────── def _poll_save_task(self, task_id: str) -> List[str]: """步骤④:轮询转存任务直到完成。 GET /1/clouddrive/task?task_id=&retry_index=0 轮询最多 poll_max_attempts 次, 当 status==2 时表示任务成功完成, status==-1 表示失败。 Args: task_id: 步骤③返回的 task_id。 Returns: save_as_top_fids 列表(转存后的文件 ID)。 Raises: RuntimeError: 任务失败或超时。 """ url = f"{QUARK_API_BASE}/1/clouddrive/task" headers = self.credential.get_headers() for attempt in range(1, self.poll_max_attempts + 1): params: Dict[str, str] = { "task_id": task_id, "retry_index": "0", } try: resp = self.session.get(url, params=params, headers=headers, timeout=self.timeout) resp.raise_for_status() except requests.RequestException: logger.warning("[QuarkTransfer] ④ Poll attempt %d/%d failed, retrying...", attempt, self.poll_max_attempts) time.sleep(self.poll_interval) continue data: Dict[str, Any] = resp.json() task_status: int = data.get("data", {}).get("status", -1) logger.debug("[QuarkTransfer] ④ Poll %d/%d: status=%d", attempt, self.poll_max_attempts, task_status) if task_status == 2: # 成功 save_as_top_fids: List[str] = ( data.get("data", {}).get("save_as", {}).get("save_as_top_fids", []) ) logger.info("[QuarkTransfer] ④ Save completed: %d files saved", len(save_as_top_fids)) return save_as_top_fids if task_status == -1: raise RuntimeError(f"转存任务失败: task_id={task_id}, response={data}") time.sleep(self.poll_interval) raise RuntimeError( f"转存任务超时: task_id={task_id}, 已轮询 {self.poll_max_attempts} 次" ) # ─── 步骤 ⑤:发起创建分享 ───────────────────────────────────── def _init_share(self, fid_list: List[str], title: str, expired_type: int = 1) -> str: """步骤⑤:创建分享链接。 POST /1/clouddrive/share Body: { "fid_list": [, ...], "title": "", "expired_type": 1 } Args: fid_list: 要分享的文件 ID 列表。 title: 分享标题。 expired_type: 过期类型,1=永久有效(默认)。 Returns: task_id 字符串,用于步骤⑥轮询。 Raises: RuntimeError: API 返回错误。 """ url = f"{QUARK_SHARE_API}" body: Dict[str, Any] = { "fid_list": fid_list, "title": title or "分享", "expired_type": expired_type, } headers = self.credential.get_headers() headers.setdefault("Content-Type", "application/json") logger.info("[QuarkTransfer] ⑤ Creating share: %d files, title='%s'", len(fid_list), title) try: resp = self.session.post(url, json=body, headers=headers, timeout=self.timeout) resp.raise_for_status() except requests.RequestException as exc: raise RuntimeError(f"创建分享失败: {exc}") from exc data: Dict[str, Any] = resp.json() status: int = data.get("status", -1) if status != 0 and data.get("code") not in (0, None): raise RuntimeError(f"创建分享请求失败: status={status}, message={data.get('message')}") task_id: Optional[str] = data.get("data", {}).get("task_id") if not task_id: raise RuntimeError(f"分享 task_id 缺失, response: {data}") logger.info("[QuarkTransfer] ⑤ Share task created: task_id=%s", task_id) return task_id # ─── 步骤 ⑥:轮询分享任务 ───────────────────────────────────── def _poll_share_task(self, task_id: str) -> str: """步骤⑥:轮询分享任务直到完成。 GET /1/clouddrive/task?task_id=<task_id>&retry_index=0 轮询最多 poll_max_attempts 次,status==2 完成, 返回 share_id。 Args: task_id: 步骤⑤返回的 task_id。 Returns: share_id 字符串。 Raises: RuntimeError: 任务失败或超时。 """ url = f"{QUARK_API_BASE}/1/clouddrive/task" headers = self.credential.get_headers() for attempt in range(1, self.poll_max_attempts + 1): params: Dict[str, str] = { "task_id": task_id, "retry_index": "0", } try: resp = self.session.get(url, params=params, headers=headers, timeout=self.timeout) resp.raise_for_status() except requests.RequestException: logger.warning("[QuarkTransfer] ⑥ Poll attempt %d/%d failed, retrying...", attempt, self.poll_max_attempts) time.sleep(self.poll_interval) continue data: Dict[str, Any] = resp.json() task_status: int = data.get("data", {}).get("status", -1) logger.debug("[QuarkTransfer] ⑥ Poll %d/%d: status=%d", attempt, self.poll_max_attempts, task_status) if task_status == 2: # 成功 share_id: Optional[str] = data.get("data", {}).get("share_id") if not share_id: # 有时 share_id 在嵌套位置 share_id = data.get("data", {}).get("result", {}).get("share_id", "") if not share_id: raise RuntimeError(f"分享完成但 share_id 缺失: {data}") logger.info("[QuarkTransfer] ⑥ Share completed: share_id=%s", share_id) return share_id if task_status == -1: raise RuntimeError(f"分享任务失败: task_id={task_id}, response={data}") time.sleep(self.poll_interval) raise RuntimeError( f"分享任务超时: task_id={task_id}, 已轮询 {self.poll_max_attempts} 次" ) # ─── 步骤 ⑦:设置分享密码 ───────────────────────────────────── def _set_password(self, share_id: str, password: str = "") -> Tuple[str, str]: """步骤⑦:设置分享密码并获取分享链接。 POST /1/clouddrive/share/password Body: {"share_id": "<share_id>"} 即使不设密码也要调用此 API 以获取正式的 share_url。 Args: share_id: 步骤⑥返回的 share_id。 password: 分享密码,空字符串表示无密码。 Returns: (share_url, passcode) 元组。 Raises: RuntimeError: API 返回错误。 """ url = f"{QUARK_SHARE_API}/password" body: Dict[str, str] = { "share_id": share_id, } headers = self.credential.get_headers() headers.setdefault("Content-Type", "application/json") logger.info("[QuarkTransfer] ⑦ Setting password for share_id=%s", share_id) try: resp = self.session.post(url, json=body, headers=headers, timeout=self.timeout) resp.raise_for_status() except requests.RequestException as exc: raise RuntimeError(f"设置分享密码失败: {exc}") from exc data: Dict[str, Any] = resp.json() status: int = data.get("status", -1) if status != 0 and data.get("code") not in (0, None): raise RuntimeError(f"设置密码失败: status={status}, message={data.get('message')}") share_url: str = data.get("data", {}).get("share_url", "") passcode: str = data.get("data", {}).get("passcode", password) if not share_url: # 用 share_id 构造默认分享链接 share_url = f"https://pan.quark.cn/s/{share_id}" logger.info("[QuarkTransfer] ⑦ Password set: share_url=%s, passcode=%s", share_url, passcode) return share_url, passcode # ─── 公开入口 ───────────────────────────────────────────────── def transfer( self, share_url: str, save_dir: str = "0", share_password: str = "", ) -> Dict[str, Any]: """执行完整的 7 步转存流程。 从原始夸克分享链接开始,将文件转存到自己网盘,再创建新分享。 Args: share_url: 原始夸克分享链接,如 https://pan.quark.cn/s/xxxxx。 save_dir: 转存目标目录 ID,默认 "0"(根目录)。 share_password: 新分享的密码,空字符串表示无密码。 Returns: 包含以下字段的字典: - success: bool - new_file_ids: List[str] — 转存后的文件ID列表 - file_name: str — 分享标题 - share_url: str — 新分享链接 - passcode: str — 新分享密码 Raises: RuntimeError: 任一步骤失败。 ValueError: URL 解析失败。 """ # 0. 解析 URL 提取 pwd_id match = SHARE_URL_PATTERN.search(share_url) if not match: raise ValueError(f"无法从URL中提取夸克分享ID: {share_url}") pwd_id: str = match.group(1) logger.info("[QuarkTransfer] Starting 7-step transfer for pwd_id=%s", pwd_id) # ① 获取 stoken stoken: str = self._get_stoken(pwd_id) # ② 获取分享详情 detail: Dict[str, Any] = self._get_detail(pwd_id, stoken) # ③ 发起转存 save_task_id: str = self._init_save(pwd_id, stoken, detail, to_pdir_fid=save_dir) # ④ 轮询转存任务 new_fids: List[str] = self._poll_save_task(save_task_id) if not new_fids: raise RuntimeError("转存完成但未获取到文件ID") # ⑤ 发起创建分享 title: str = detail.get("title", "分享") share_task_id: str = self._init_share(new_fids, title) # ⑥ 轮询分享任务 share_id: str = self._poll_share_task(share_task_id) # ⑦ 设置密码 new_share_url, passcode = self._set_password(share_id, share_password) result: Dict[str, Any] = { "success": True, "new_file_ids": new_fids, "file_name": title, "share_url": new_share_url, "passcode": passcode, } logger.info("[QuarkTransfer] 7-step transfer complete: %s", result) return result @staticmethod def parse_share_url(url: str) -> Optional[str]: """从夸克分享链接中提取 pwd_id。 Args: url: 夸克分享链接。 Returns: pwd_id 字符串,解析失败返回 None。 """ match = SHARE_URL_PATTERN.search(url) return match.group(1) if match else None def close(self) -> None: """关闭 HTTP 会话。""" self.session.close() def __enter__(self) -> "QuarkTransfer": return self def __exit__(self, *args: Any) -> None: self.close()