""" 百度网盘适配器 — CloudSearch Transfer v1.0.0 参考 cloud-auto-save 的 BaiduNetDisk + netdisk 的 PanbaiduSave 完整的 5 步转存流程 + bdstoken 管理 + 路径删除 + 广告过滤 """ import logging from typing import List, Tuple from ..base import BaseCloudDriveAdapter, FileInfo from ...config import PlatformConfig, TransferConfig from ...errors import TransferError, TransferErrorCode from .credential import BaiduCredentialManager from .transfer import BaiduTransfer from .cleanup import BaiduCleanup logger = logging.getLogger(__name__) class BaiduAdapter(BaseCloudDriveAdapter): """百度网盘适配器 完整的 Cookie + bdstoken 机制,支持: - 验证分享链接 + 提取码 - 5 步转存到自己的网盘 - 创建新分享 - 按文件名删除文件 - 广告文件过滤 """ PLATFORM_NAME = "百度网盘" PLATFORM_KEY = "baidu" URL_PATTERNS = [ r'pan\.baidu\.com/s/1([A-Za-z0-9_-]+)', ] def __init__(self, config: PlatformConfig, transfer_config: TransferConfig): super().__init__(config, transfer_config) # 凭证管理器 self.credential = BaiduCredentialManager( cookie=config.cookie, session=self.session, ) if not self.credential.validate(): raise TransferError( TransferErrorCode.NOT_LOGIN, message="百度网盘 Cookie 无效或太短 (需 >= 50 字符)", platform=self.PLATFORM_KEY, ) # 预热 bdstoken try: self.credential.get_bdstoken() except TransferError as e: logger.warning(f"预取 bdstoken 失败: {e},将在首次使用时重试") # 转存执行器 & 清理器 self._transfer = BaiduTransfer(self.session, self.credential) self._cleanup = BaiduCleanup( self.session, self.credential, ad_keywords=config.banned_keywords or None, ) # 暂存最近一次转存的文件信息(供 _filter_ads 使用) self._last_transfer_files: List[dict] = [] # ─── session 初始化 ───────────────────────────────────── def _setup_session(self): """设置 session 级别的 Cookie""" if self.config.cookie: self.session.headers["Cookie"] = self.config.cookie self.session.headers["Referer"] = "https://pan.baidu.com/" # ─── 核心抽象方法实现 ────────────────────────────────── def _get_share_detail(self, pwd_id: str, passcode: str = "") -> dict: """获取百度分享详情(步骤 ①+②) Args: pwd_id: URL 中的 surl (s/1 后面的部分) passcode: 提取码(可选) Returns: {"title": str, "fs_ids": [str], "filenames": [str], ...} """ bdstoken = self.credential.get_bdstoken() # ① 验证提取码(如果有) if passcode: self._transfer._verify_password(pwd_id, passcode, bdstoken) # ② 解析分享页 share_info = self._transfer._parse_share_page(pwd_id) return { "title": share_info.get("title", ""), "shareid": share_info["shareid"], "uk": share_info["uk"], "fs_ids": share_info["fs_ids"], "filenames": share_info["filenames"], } def _save_files(self, pwd_id: str, detail: dict, save_dir: str) -> List[str]: """转存文件到自己的百度网盘(步骤 ③+④) Args: pwd_id: surl detail: _get_share_detail 返回的 dict save_dir: 目标目录 Returns: 转存后的新 fs_id 列表 """ bdstoken = self.credential.get_bdstoken() shareid = detail["shareid"] uk = detail["uk"] fs_ids = detail["fs_ids"] filenames = detail.get("filenames", []) # ③ 转存 self._transfer._transfer_files(shareid, uk, fs_ids, save_dir, bdstoken) # ④ 列出目录匹配新 fs_id new_fs_ids = self._transfer._list_and_match(save_dir, filenames, bdstoken) # 暂存文件信息供 _filter_ads + _create_share 使用 self._last_transfer_files = [ {"fs_id": fid, "name": name} for fid, name in zip(new_fs_ids, filenames) if fid ] return new_fs_ids def _create_share(self, file_ids: List[str], title: str, password: str = "") -> Tuple[str, str]: """创建百度分享(步骤 ⑤) Args: file_ids: 转存后的新 fs_id 列表 title: 原标题 password: 分享密码 Returns: (new_share_url, share_password) """ # 如果 file_ids 中包含非数字,尝试从暂存信息中查找 numeric_ids = [] for fid in file_ids: try: int(fid) numeric_ids.append(fid) except ValueError: logger.warning(f"忽略非数字 fs_id: {fid}") return self._transfer.create_share( fids=[int(x) for x in numeric_ids] if numeric_ids else [int(x) for x in file_ids], password=password, period=0, # 永久 ) # ─── 文件列表 & 删除 ──────────────────────────────────── def get_files(self, parent_fid: str = "0") -> List[FileInfo]: """列出百度网盘目录下的文件 GET /api/list?dir={parent_fid} Args: parent_fid: 目录路径 (默认 "0" = 根目录) 注意: parent_fid 对百度网盘而言是目录路径而非数字 ID。 根目录传 "/" 或 "0"。 """ bdstoken = self.credential.get_bdstoken() dir_path = parent_fid if parent_fid != "0" else "/" url = "https://pan.baidu.com/api/list" params = {"dir": dir_path, "bdstoken": bdstoken} headers = self.credential.get_headers() try: resp = self._get(url, params=params, headers=headers) data = resp.json() except Exception as e: logger.error(f"百度列出目录失败: {e}") return [] errno = data.get("errno", -1) if errno != 0: logger.error(f"百度列出目录 errno={errno}: {data}") return [] files = [] for item in data.get("list", []): fid = str(item.get("fs_id", "")) name = item.get("server_filename", "") size = item.get("size", 0) is_dir = item.get("isdir", 0) == 1 ext = "" if not is_dir and "." in name: ext = name.rsplit(".", 1)[-1] files.append(FileInfo( fid=fid, name=name, size=size, is_dir=is_dir, ext=ext, )) return files def delete(self, file_ids: List[str]) -> bool: """删除百度网盘文件(按路径) file_ids 应为网盘中的完整路径,如 ["/dir/file.txt", "/dir/file2.zip"] Args: file_ids: 网盘路径列表 Returns: True 删除成功(或文件不存在) """ return self._cleanup.delete_files(file_ids) # ─── 广告过滤 ──────────────────────────────────────────── def _filter_ads(self, file_ids: List[str]) -> List[str]: """广告过滤 — 基于最近一次转存暂存的文件名""" if not self._last_transfer_files: return file_ids names = [] for f in self._last_transfer_files: if f["fs_id"] in file_ids: names.append(f["name"]) else: names.append("") return self._cleanup.filter_ad_ids(file_ids, names) # ─── 扩展方法 ──────────────────────────────────────────── def delete_paths(self, paths: List[str]) -> bool: """便捷删除方法(直接调用 cleanup)""" return self._cleanup.delete_files(paths)