Files
CloudSearch/cloudsearch_transfer/adapter/cloud189/transfer.py
admin 83cbfaf03f v0.2.7: 修复Redis连接 + 启动管理后台
- 修复Redis认证 (配置密码)
- 启动Python管理后台 (端口9531, 15个功能开关)
- 统一版本号 0.2.7
- 更新docker-compose.yml (镜像版本/Redis URL/Admin服务)
2026-05-17 02:22:18 +08:00

69 lines
2.3 KiB
Python

"""天翼云盘转存逻辑 v1.0.0"""
import re
import logging
from typing import List, Tuple
logger = logging.getLogger(__name__)
class Cloud189Transfer:
API_BASE = "https://cloud.189.cn/api/open/share"
def __init__(self, session, credential_mgr, config, transfer_config):
self.session = session
self.credential = credential_mgr
self.config = config
self.transfer_config = transfer_config
self._last_file_names = []
@staticmethod
def parse_share_url(url: str) -> Tuple[str, str]:
m = re.search(r"cloud\.189\.cn/t/([A-Za-z0-9]+)", url)
if not m:
raise ValueError("Invalid 189 cloud share URL")
return m.group(1), ""
def get_share_info(self, share_code: str, password: str = "") -> dict:
params = {"shareCode": share_code}
if password:
params["accessCode"] = password
resp = self.session.get(
f"{self.API_BASE}/getShareInfoByShareId.action",
params=params,
timeout=self.transfer_config.request_timeout,
)
data = resp.json()
if not data.get("res_code") == 0:
raise Exception(f"189 share info failed: {data}")
info = data.get("data", {})
files = info.get("fileList", [])
return {
"title": info.get("shareName", ""),
"files": [{"id": f.get("fileId", ""), "name": f.get("fileName", ""),
"size": int(f.get("fileSize", 0))} for f in files],
"share_id": info.get("shareId", ""),
}
def save_files(self, share_code: str, detail: dict, save_dir: str) -> List[str]:
payload = {
"shareId": detail.get("share_id", ""),
"parentId": save_dir or "-11",
}
resp = self.session.post(
f"{self.API_BASE}/shareToMe.action",
data=payload,
timeout=self.transfer_config.request_timeout,
)
data = resp.json()
if not data.get("res_code") == 0:
raise Exception(f"189 save failed: {data}")
return ["0"]
def create_share(self, file_ids: List[str], title: str,
password: str = "") -> Tuple[str, str]:
return "", ""
def list_files(self, parent_id: str = "-11") -> list:
return []