- 修复Redis认证 (配置密码) - 启动Python管理后台 (端口9531, 15个功能开关) - 统一版本号 0.2.7 - 更新docker-compose.yml (镜像版本/Redis URL/Admin服务)
70 lines
2.4 KiB
Python
70 lines
2.4 KiB
Python
"""115网盘转存逻辑 v1.0.0"""
|
|
|
|
import re
|
|
import logging
|
|
from typing import List, Tuple
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class Pan115Transfer:
|
|
def __init__(self, session, credential_mgr, config, transfer_config):
|
|
self.session = session
|
|
self.credential = credential_mgr
|
|
self.config = config
|
|
self.transfer_config = transfer_config
|
|
self._last_file_names = []
|
|
|
|
def parse_share_url(url: str) -> Tuple[str, str]:
|
|
m = re.search(r"115\.com/s/([a-z0-9]+)", url)
|
|
if not m:
|
|
raise ValueError("Invalid 115 share URL")
|
|
code = m.group(1)
|
|
m2 = re.search(r"password[=:](\w+)", url)
|
|
return code, m2.group(1) if m2 else ""
|
|
|
|
def get_share_info(self, code: str, password: str = "") -> dict:
|
|
params = {"share_code": code}
|
|
if password:
|
|
params["receive_code"] = password
|
|
resp = self.session.get(
|
|
"https://webapi.115.com/share/snap",
|
|
params=params,
|
|
timeout=self.transfer_config.request_timeout,
|
|
)
|
|
data = resp.json()
|
|
if not data.get("state"):
|
|
raise Exception(f"115 share info failed: {data}")
|
|
snap = data.get("data", {})
|
|
files = snap.get("list", [])
|
|
return {
|
|
"title": snap.get("shareinfo", {}).get("share_title", ""),
|
|
"files": [{"id": f.get("fid", ""), "name": f.get("n", ""),
|
|
"size": int(f.get("s", 0))} for f in files],
|
|
"cid": files[0].get("cid", "") if files else "",
|
|
}
|
|
|
|
def save_files(self, share_code: str, detail: dict, save_dir: str) -> List[str]:
|
|
cid = detail.get("cid", "0")
|
|
payload = {"share_code": share_code, "receive_code": "",
|
|
"cid": cid, "pick_code": ""}
|
|
resp = self.session.post(
|
|
"https://webapi.115.com/share/receive",
|
|
json=payload,
|
|
timeout=self.transfer_config.request_timeout,
|
|
)
|
|
data = resp.json()
|
|
if not data.get("state"):
|
|
raise Exception(f"115 save failed: {data}")
|
|
return [str(data.get("data", {}).get("cid", ""))]
|
|
|
|
def create_share(self, file_ids: List[str], title: str,
|
|
password: str = "") -> Tuple[str, str]:
|
|
return "", ""
|
|
|
|
def list_files(self, cid: str = "0") -> list:
|
|
return []
|
|
|
|
|
|
parse_share_url = staticmethod(Pan115Transfer.parse_share_url)
|