"""115网盘转存逻辑 v1.0.0""" import re import logging from typing import List, Tuple logger = logging.getLogger(__name__) class Pan115Transfer: def __init__(self, session, credential_mgr, config, transfer_config): self.session = session self.credential = credential_mgr self.config = config self.transfer_config = transfer_config self._last_file_names = [] def parse_share_url(url: str) -> Tuple[str, str]: m = re.search(r"115\.com/s/([a-z0-9]+)", url) if not m: raise ValueError("Invalid 115 share URL") code = m.group(1) m2 = re.search(r"password[=:](\w+)", url) return code, m2.group(1) if m2 else "" def get_share_info(self, code: str, password: str = "") -> dict: params = {"share_code": code} if password: params["receive_code"] = password resp = self.session.get( "https://webapi.115.com/share/snap", params=params, timeout=self.transfer_config.request_timeout, ) data = resp.json() if not data.get("state"): raise Exception(f"115 share info failed: {data}") snap = data.get("data", {}) files = snap.get("list", []) return { "title": snap.get("shareinfo", {}).get("share_title", ""), "files": [{"id": f.get("fid", ""), "name": f.get("n", ""), "size": int(f.get("s", 0))} for f in files], "cid": files[0].get("cid", "") if files else "", } def save_files(self, share_code: str, detail: dict, save_dir: str) -> List[str]: cid = detail.get("cid", "0") payload = {"share_code": share_code, "receive_code": "", "cid": cid, "pick_code": ""} resp = self.session.post( "https://webapi.115.com/share/receive", json=payload, timeout=self.transfer_config.request_timeout, ) data = resp.json() if not data.get("state"): raise Exception(f"115 save failed: {data}") return [str(data.get("data", {}).get("cid", ""))] def create_share(self, file_ids: List[str], title: str, password: str = "") -> Tuple[str, str]: return "", "" def list_files(self, cid: str = "0") -> list: return [] parse_share_url = staticmethod(Pan115Transfer.parse_share_url)