""" CloudSearch Transfer — 迅雷网盘适配器 v1.0.0 PLATFORM_KEY = 'xunlei' 迅雷网盘使用 refresh_token + captcha_token 双重认证。 """ from __future__ import annotations import logging from typing import List, Optional, Tuple from ..base import ( BaseCloudDriveAdapter, FileInfo, TransferResult, VerifyResult, ) from ...config import PlatformConfig, TransferConfig from ...errors import TransferError, TransferErrorCode from .credential import XunleiCredentialManager from .transfer import XunleiTransfer from .cleanup import XunleiCleanup logger = logging.getLogger(__name__) class XunleiAdapter(BaseCloudDriveAdapter): """迅雷网盘适配器""" PLATFORM_NAME = "迅雷网盘" PLATFORM_KEY = "xunlei" URL_PATTERNS = [r"pan\.xunlei\.com/s/([A-Za-z0-9]+)"] def __init__(self, config: PlatformConfig, transfer_config: TransferConfig): super().__init__(config, transfer_config) self._credential = XunleiCredentialManager(config) self._transfer_engine: Optional[XunleiTransfer] = None self._cleanup = XunleiCleanup() def _setup_session(self): """初始化 session 认证头""" headers = self._credential.get_auth_headers() if headers: self.session.headers.update(headers) def _ensure_auth(self): """确保认证头是最新的""" headers = self._credential.get_auth_headers() self.session.headers.update(headers) @property def _transfer(self) -> XunleiTransfer: """懒加载转存引擎""" if self._transfer_engine is None: self._transfer_engine = XunleiTransfer( self.session, self._credential, self.config, self.transfer_config, ) return self._transfer_engine # ─── 抽象方法实现 ────────────────────────────── def _get_share_detail(self, pwd_id: str, passcode: str = "") -> dict: self._ensure_auth() return self._transfer.get_share_info(pwd_id, passcode) def _save_files(self, pwd_id: str, detail: dict, save_dir: str) -> List[str]: self._ensure_auth() return self._transfer.save_files(pwd_id, detail, save_dir) def _create_share(self, file_ids: List[str], title: str, password: str = "") -> Tuple[str, str]: self._ensure_auth() return self._transfer.create_share(file_ids, title, password) def _extract_file_list(self, detail: dict) -> List[FileInfo]: files = detail.get("files", []) return [ FileInfo(fid=f.get("id", ""), name=f.get("name", ""), size=f.get("size", 0), is_dir=f.get("is_dir", False)) for f in files ] def _filter_ads(self, file_ids: List[str]) -> List[str]: banned = self._get_banned_keywords() return self._cleanup.filter_ad_ids( file_ids, getattr(self._transfer, "_last_file_names", []), banned, ) def get_files(self, parent_fid: str = "0") -> List[FileInfo]: self._ensure_auth() return self._transfer.list_files(parent_fid) def delete(self, file_ids: List[str]) -> bool: self._ensure_auth() return self._cleanup.delete_files( self.session, self._credential, file_ids ) def _get_banned_keywords(self) -> List[str]: return self.config.banned_keywords or self.transfer_config.default_banned_keywords def close(self): self.session.close() def __repr__(self): return f""