"""天翼云盘转存逻辑 v1.0.0""" import re import logging from typing import List, Tuple logger = logging.getLogger(__name__) class Cloud189Transfer: API_BASE = "https://cloud.189.cn/api/open/share" def __init__(self, session, credential_mgr, config, transfer_config): self.session = session self.credential = credential_mgr self.config = config self.transfer_config = transfer_config self._last_file_names = [] @staticmethod def parse_share_url(url: str) -> Tuple[str, str]: m = re.search(r"cloud\.189\.cn/t/([A-Za-z0-9]+)", url) if not m: raise ValueError("Invalid 189 cloud share URL") return m.group(1), "" def get_share_info(self, share_code: str, password: str = "") -> dict: params = {"shareCode": share_code} if password: params["accessCode"] = password resp = self.session.get( f"{self.API_BASE}/getShareInfoByShareId.action", params=params, timeout=self.transfer_config.request_timeout, ) data = resp.json() if not data.get("res_code") == 0: raise Exception(f"189 share info failed: {data}") info = data.get("data", {}) files = info.get("fileList", []) return { "title": info.get("shareName", ""), "files": [{"id": f.get("fileId", ""), "name": f.get("fileName", ""), "size": int(f.get("fileSize", 0))} for f in files], "share_id": info.get("shareId", ""), } def save_files(self, share_code: str, detail: dict, save_dir: str) -> List[str]: payload = { "shareId": detail.get("share_id", ""), "parentId": save_dir or "-11", } resp = self.session.post( f"{self.API_BASE}/shareToMe.action", data=payload, timeout=self.transfer_config.request_timeout, ) data = resp.json() if not data.get("res_code") == 0: raise Exception(f"189 save failed: {data}") return ["0"] def create_share(self, file_ids: List[str], title: str, password: str = "") -> Tuple[str, str]: return "", "" def list_files(self, parent_id: str = "-11") -> list: return []