"""123云盘转存逻辑 v1.0.0""" import re import logging from typing import List, Tuple logger = logging.getLogger(__name__) class Pan123Transfer: API_BASE = "https://www.123pan.com/api" def __init__(self, session, credential_mgr, config, transfer_config): self.session = session self.credential = credential_mgr self.config = config self.transfer_config = transfer_config self._last_file_names = [] @staticmethod def parse_share_url(url: str) -> Tuple[str, str]: m = re.search(r"123pan\.com/s/([A-Za-z0-9]+)", url) if not m: raise ValueError("Invalid 123pan share URL") code = m.group(1) m2 = re.search(r"[?&]pwd=(\w+)", url) return code, m2.group(1) if m2 else "" def get_share_info(self, share_key: str, password: str = "") -> dict: payload = {"shareKey": share_key} if password: payload["sharePwd"] = password resp = self.session.post( f"{self.API_BASE}/share/info", json=payload, timeout=self.transfer_config.request_timeout, ) data = resp.json() if data.get("code") != 0: raise Exception(f"123 share info failed: {data}") info = data.get("data", {}) files = info.get("fileList", []) return { "title": info.get("shareName", ""), "files": [{"id": f.get("fileId", ""), "name": f.get("fileName", ""), "size": f.get("fileSize", 0)} for f in files], "share_id": info.get("shareId", ""), } def save_files(self, share_key: str, detail: dict, save_dir: str) -> List[str]: payload = { "shareKey": share_key, "shareId": detail.get("share_id", ""), "parentFileId": save_dir or "0", } resp = self.session.post( f"{self.API_BASE}/share/save", json=payload, timeout=self.transfer_config.request_timeout, ) data = resp.json() if data.get("code") != 0: raise Exception(f"123 save failed: {data}") return [str(data.get("data", {}).get("fileId", ""))] def create_share(self, file_ids: List[str], title: str, password: str = "") -> Tuple[str, str]: return "", "" def list_files(self, parent_id: str = "0") -> list: return []