""" 百度网盘转存核心 — 5 步转存流程 参考 netdisk 的 PanbaiduSave + cloud-auto-save 的 BaiduNetDisk.transfer 流程: ① 验证提取码 → POST /share/verify ② 解析分享页 → GET /s/1{surl} ③ 转存文件 → POST /share/transfer ④ 列出目录 → GET /api/list ⑤ 创建分享 → POST /share/set """ import re import json import logging from typing import List, Tuple import requests from ...errors import TransferError, TransferErrorCode from .credential import BaiduCredentialManager, BAIDU_PAN_API logger = logging.getLogger(__name__) # ─── 正则 ────────────────────────────────────────────────── # 从 HTML 中提取 shareid RE_SHAREID = re.compile(r"""shareid["\s:=]+(\d+)""") # 从 HTML 中提取 uk RE_UK = re.compile(r"""uk["\s:=]+(\d+)""") # 从 HTML 中提取 fs_id RE_FS_ID = re.compile(r'"fs_id"\s*:\s*(\d+)') # 从 HTML 中提取 server_filename RE_FILENAME = re.compile(r'"server_filename"\s*:\s*"([^"]*)"') # 从 HTML/JSON 中提取标题 RE_TITLE = re.compile(r'"title"\s*:\s*"([^"]*)"') # 从 HTML 中提取文件列表 JSON 块 (file_list 对象) — 标记位置 RE_FILE_LIST_MARK = re.compile(r'"file_list"\s*:\s*(\{)', re.DOTALL) # 提取单个文件条目 (fallback) RE_FILE_ENTRY = re.compile(r'\{"fs_id":(\d+),"server_filename":"([^"]+)"') class BaiduTransfer: """百度网盘 5 步转存执行器 每个实例绑定一个 Session + Cookie + bdstoken, 执行完整的「验证→解析→转存→查目录→创建分享」流程。 """ def __init__(self, session: requests.Session, credential: BaiduCredentialManager): self.session = session self.credential = credential self.cookie = credential.cookie # ─── 5 步主流程 ──────────────────────────────────────── def execute(self, surl: str, password: str, save_dir: str = "/") -> Tuple[List[str], dict]: """执行完整的 5 步转存流程 Args: surl: 分享短码 (s/1 后面的部分) password: 提取码 save_dir: 转存目标目录 Returns: (new_fs_ids, file_info_dict) new_fs_ids: 转存后的文件 fs_id 列表 file_info_dict: {fs_id: name} 映射 Raises: TransferError: 任何一步失败 """ bdstoken = self.credential.get_bdstoken() # ① 验证提取码 logger.info(f"[百度转存] ① 验证提取码 surl={surl}") self._verify_password(surl, password, bdstoken) # ② 解析分享页 logger.info(f"[百度转存] ② 解析分享页 surl={surl}") share_info = self._parse_share_page(surl) shareid = share_info["shareid"] uk = share_info["uk"] fs_ids = share_info["fs_ids"] filenames = share_info["filenames"] title = share_info.get("title", "") if not fs_ids: raise TransferError( TransferErrorCode.RESOURCE_EMPTY, message="分享中没有找到可转存的文件", platform="baidu", ) # ③ 转存到自己的网盘 logger.info(f"[百度转存] ③ 转存 {len(fs_ids)} 个文件到 {save_dir}") self._transfer_files(shareid, uk, fs_ids, save_dir, bdstoken) # ④ 列出目标目录,按文件名匹配新的 fs_id logger.info(f"[百度转存] ④ 列出目录 {save_dir} 匹配新 fs_id") new_fs_ids = self._list_and_match(save_dir, filenames, bdstoken) if not new_fs_ids: raise TransferError( TransferErrorCode.NETWORK_ERROR, message="转存后无法匹配到新文件 ID", platform="baidu", ) # 构建返回的 info dict file_info = {} for name, fid in zip(filenames, new_fs_ids) if len(filenames) == len(new_fs_ids) else []: file_info[fid] = name if not file_info: for fid in new_fs_ids: file_info[fid] = title or fid return new_fs_ids, file_info def create_share(self, fids: List[int], password: str = "", period: int = 0) -> Tuple[str, str]: """⑤ 创建新分享 Args: fids: 转存后的文件 fs_id 列表 password: 分享密码(空 = 无密码) period: 分享有效期 (0=永久) Returns: (share_url, share_password) """ bdstoken = self.credential.get_bdstoken() url = f"{BAIDU_PAN_API}/share/set" params = { "channel": "chunlei", "clienttype": "0", "web": "1", "bdstoken": bdstoken, } data = { "fid_list": json.dumps(fids), "period": period, "pwd": password, } headers = self.credential.get_headers() try: resp = self.session.post( url, params=params, data=data, headers=headers, timeout=30 ) resp.raise_for_status() except Exception as e: raise TransferError( TransferErrorCode.NETWORK_ERROR, message=f"创建分享请求失败: {e}", platform="baidu", ) result = resp.json() errno = result.get("errno", -1) if errno == 9219: raise TransferError( TransferErrorCode.SHARE_LIMIT, message="百度今日分享次数过多", platform="baidu", ) if errno != 0: raise TransferError( TransferErrorCode.SHARE_LINK_FAIL, message=f"创建分享失败 (errno={errno})", platform="baidu", details=result, ) share_url = result.get("link", "") share_password = result.get("pwd", password) or password logger.info(f"[百度转存] ⑤ 分享创建成功: {share_url}") return share_url, share_password # ─── 5 步内部方法 ────────────────────────────────────── def _verify_password(self, surl: str, password: str, bdstoken: str): """① 验证提取码 POST /share/verify?surl={surl}&bdstoken={bdstoken} Body: {"pwd": "xxxx"} errno=0 表示通过;errno=-9 表示提取码错误;errno=2 表示分享不存在 """ url = f"{BAIDU_PAN_API}/share/verify" params = { "surl": surl, "bdstoken": bdstoken, } data = {"pwd": password} headers = self.credential.get_headers() headers["Content-Type"] = "application/x-www-form-urlencoded" try: resp = self.session.post( url, params=params, data=data, headers=headers, timeout=15 ) resp.raise_for_status() except Exception as e: raise TransferError( TransferErrorCode.NETWORK_ERROR, message=f"验证提取码请求失败: {e}", platform="baidu", ) result = resp.json() errno = result.get("errno", -1) if errno == 0: logger.info("提取码验证通过") return if errno == -9 or errno == -62: raise TransferError( TransferErrorCode.PASSCODE_WRONG, message="百度提取码错误", platform="baidu", ) if errno == 2 or errno == 118: raise TransferError( TransferErrorCode.SHARE_NOT_EXIST, message="百度分享不存在或已失效", platform="baidu", ) raise TransferError( TransferErrorCode.NETWORK_ERROR, message=f"验证提取码失败 (errno={errno})", platform="baidu", details=result, ) def _parse_share_page(self, surl: str) -> dict: """② 解析分享页面 HTML GET /s/1{surl} 从 HTML 中正则提取 shareid, uk, fs_id[], server_filename[] """ url = f"{BAIDU_PAN_API}/s/1{surl}" headers = self.credential.get_headers() try: resp = self.session.get(url, headers=headers, timeout=20) resp.raise_for_status() html = resp.text except Exception as e: raise TransferError( TransferErrorCode.NETWORK_ERROR, message=f"打开分享页面失败: {e}", platform="baidu", ) # 提取 shareid m_shareid = RE_SHAREID.search(html) if not m_shareid: raise TransferError( TransferErrorCode.SHARE_NOT_EXIST, message="无法从页面中提取 shareid,分享可能已失效", platform="baidu", ) shareid = m_shareid.group(1) # 提取 uk m_uk = RE_UK.search(html) uk = m_uk.group(1) if m_uk else "" # 提取标题 m_title = RE_TITLE.search(html) title = m_title.group(1) if m_title else "" # 提取文件列表 — 优先从 file_list JSON 块中提取 fs_ids = [] filenames = [] # 方法1:查找 file_list JSON 块(使用括号计数提取平衡 JSON) m_fl = RE_FILE_LIST_MARK.search(html) if m_fl: start = m_fl.start(1) # { 的位置 depth = 1 end = start + 1 while end < len(html) and depth > 0: if html[end] == '{': depth += 1 elif html[end] == '}': depth -= 1 end += 1 file_list_json = html[start:end] try: file_list = json.loads(file_list_json) for entry in file_list.get("list", []): fs_ids.append(str(entry.get("fs_id", ""))) filenames.append(entry.get("server_filename", "")) except json.JSONDecodeError: pass # 方法2:退化为正则提取所有 fs_id + server_filename if not fs_ids: for m in RE_FILE_ENTRY.finditer(html): fs_ids.append(m.group(1)) filenames.append(m.group(2)) if not fs_ids: # 可能只有一个文件,尝试单个提取 m_fsid = RE_FS_ID.search(html) m_name = RE_FILENAME.search(html) if m_fsid: fs_ids.append(m_fsid.group(1)) filenames.append(m_name.group(1) if m_name else "") logger.info( f"解析分享页: shareid={shareid}, uk={uk}, " f"文件数={len(fs_ids)}, title={title[:30]}" ) return { "shareid": shareid, "uk": uk, "fs_ids": fs_ids, "filenames": filenames, "title": title, } def _transfer_files(self, shareid: str, uk: str, fs_ids: List[str], save_dir: str, bdstoken: str): """③ 转存文件到自己的网盘 POST /share/transfer?shareid={shareid}&from={uk}&bdstoken={bdstoken} Body: fsidlist=[1,2,3]&path=/dir """ url = f"{BAIDU_PAN_API}/share/transfer" params = { "shareid": shareid, "from": uk, "bdstoken": bdstoken, } data = { "fsidlist": json.dumps([int(x) for x in fs_ids]), "path": save_dir, } headers = self.credential.get_headers() headers["Content-Type"] = "application/x-www-form-urlencoded" try: resp = self.session.post( url, params=params, data=data, headers=headers, timeout=30 ) resp.raise_for_status() except Exception as e: raise TransferError( TransferErrorCode.NETWORK_ERROR, message=f"转存请求失败: {e}", platform="baidu", ) result = resp.json() errno = result.get("errno", -1) if errno == 0: logger.info(f"转存成功: {len(fs_ids)} 个文件 → {save_dir}") return if errno == 12: raise TransferError( TransferErrorCode.CAPACITY_FULL, message="百度网盘空间不足", platform="baidu", ) if errno == 9013: raise TransferError( TransferErrorCode.SENSITIVE_RESOURCE, message="文件包含违规内容,无法转存", platform="baidu", ) raise TransferError( TransferErrorCode.NETWORK_ERROR, message=f"转存失败 (errno={errno})", platform="baidu", details=result, ) def _list_and_match(self, save_dir: str, filenames: List[str], bdstoken: str) -> List[str]: """④ 列出目标目录,按文件名匹配新的 fs_id GET /api/list?dir={dir}&bdstoken={bdstoken} 从返回的 list 中按 server_filename 匹配,返回按原顺序排列的 fs_id 列表 """ url = f"{BAIDU_PAN_API}/api/list" params = { "dir": save_dir, "bdstoken": bdstoken, } headers = self.credential.get_headers() try: resp = self.session.get(url, params=params, headers=headers, timeout=15) resp.raise_for_status() data = resp.json() except Exception as e: raise TransferError( TransferErrorCode.NETWORK_ERROR, message=f"列出目录失败: {e}", platform="baidu", ) errno = data.get("errno", -1) if errno == -12: raise TransferError( TransferErrorCode.DIR_NOT_EXIST, message=f"百度目录不存在: {save_dir}", platform="baidu", ) if errno != 0: raise TransferError( TransferErrorCode.NETWORK_ERROR, message=f"列出目录失败 (errno={errno})", platform="baidu", details=data, ) file_list = data.get("list", []) # 构建文件名 → fs_id 映射 name_to_fid = {} for item in file_list: name = item.get("server_filename", "") fid = str(item.get("fs_id", "")) if name and fid: name_to_fid[name] = fid # 按原文件名顺序匹配 new_fs_ids = [] for fname in filenames: if fname in name_to_fid: new_fs_ids.append(name_to_fid[fname]) else: logger.warning(f"目录中未找到文件: {fname}") logger.info( f"目录匹配: 期望 {len(filenames)} 个, 匹配到 {len(new_fs_ids)} 个" ) return new_fs_ids