Files
CloudSearch/cloudsearch_transfer/adapter/uc/transfer.py
admin 83cbfaf03f v0.2.7: 修复Redis连接 + 启动管理后台
- 修复Redis认证 (配置密码)
- 启动Python管理后台 (端口9531, 15个功能开关)
- 统一版本号 0.2.7
- 更新docker-compose.yml (镜像版本/Redis URL/Admin服务)
2026-05-17 02:22:18 +08:00

620 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
CloudSearch Transfer — UC网盘转存核心 v1.0.0
UC网盘 7 步转存流程与夸克高度相似API 域名不同):
① POST .../share/sharepage/v2/detail?pr=UCBrowser&fr=pc → stoken
② GET .../share/sharepage/detail → fid, share_fid_token, title
③ POST .../share/sharepage/save → task_id (转存)
④ 轮询 GET .../task → save_as_top_fids (status==2 完成)
⑤ POST .../share → task_id (创建分享)
⑥ 轮询 GET .../task → share_id
⑦ POST .../share/password → share_url, passcode
参考 cloud-auto-save 的 quark 实现,域名从 drive-pc.quark.cn 改为 pc-api.uc.cn。
"""
from __future__ import annotations
import logging
import re
import time
from typing import Any, Dict, List, Optional, Tuple
import requests
from .credential import UcCredentialManager
logger = logging.getLogger(__name__)
# ─── UC API 基础地址 ────────────────────────────────────────────────
UC_API_BASE = "https://pc-api.uc.cn"
UC_SHARE_API = f"{UC_API_BASE}/1/clouddrive/share"
# ─── URL 解析正则 ───────────────────────────────────────────────────
# 匹配 drive.uc.cn/s/<share_id>
SHARE_URL_PATTERN = re.compile(r"drive\.uc\.cn/s/(\w+)")
class UcTransfer:
"""UC 网盘转存引擎。
封装完整的 7 步 API 流程:获取 stoken → 获取详情 → 保存文件 →
创建分享 → 设置密码。
Attributes:
credential: UC 凭证管理器实例。
session: 复用的 requests.Session。
timeout: 请求超时(秒)。
poll_interval: 轮询间隔(秒)。
poll_max_attempts: 最大轮询次数。
"""
def __init__(
self,
credential: UcCredentialManager,
timeout: int = 30,
poll_interval: float = 0.5,
poll_max_attempts: int = 50,
) -> None:
"""初始化转存引擎。
Args:
credential: 有效的 UC 凭证管理器。
timeout: HTTP 请求超时秒数。
poll_interval: 异步任务轮询间隔秒数。
poll_max_attempts: 异步任务最大轮询次数。
"""
self.credential: UcCredentialManager = credential
self.timeout: int = timeout
self.poll_interval: float = poll_interval
self.poll_max_attempts: int = poll_max_attempts
self.session: requests.Session = requests.Session()
# ─── 步骤 ①:获取 stoken ───────────────────────────────────────
def _get_stoken(self, pwd_id: str, passcode: str = "") -> str:
"""步骤①:向 UC 交换 stoken。
POST /1/clouddrive/share/sharepage/v2/detail?pr=UCBrowser&fr=pc
Body: {"passcode": "", "pwd_id": "<share_id>"}
响应: data.token_info.stoken
UC 使用 v2/detail 接口获取 stoken与夸克的 sharepage/token 不同。
Args:
pwd_id: 分享 ID从 URL 解析)。
passcode: 分享提取码,无密码时为空字符串。
Returns:
stoken 字符串。
Raises:
RuntimeError: API 返回错误或 stoken 缺失。
"""
url = f"{UC_SHARE_API}/sharepage/v2/detail"
params: Dict[str, str] = {
"pr": "UCBrowser",
"fr": "pc",
}
body: Dict[str, str] = {
"passcode": passcode,
"pwd_id": pwd_id,
}
headers = self.credential.get_headers()
headers.setdefault("Content-Type", "application/json")
logger.info("[UcTransfer] ① Getting stoken for pwd_id=%s", pwd_id)
try:
resp = self.session.post(
url, json=body, params=params, headers=headers, timeout=self.timeout
)
resp.raise_for_status()
except requests.RequestException as exc:
raise RuntimeError(f"获取 stoken 失败: {exc}") from exc
data: Dict[str, Any] = resp.json()
# UC 的 stoken 在 data.token_info.stoken
stoken: Optional[str] = data.get("data", {}).get("token_info", {}).get("stoken")
if not stoken:
raise RuntimeError(f"stoken 缺失, response: {data}")
logger.info("[UcTransfer] ① stoken obtained")
return stoken
# ─── 步骤 ②:获取分享详情 ─────────────────────────────────────
def _get_detail(self, pwd_id: str, stoken: str) -> Dict[str, Any]:
"""步骤②:获取分享详情。
GET /1/clouddrive/share/sharepage/detail?pwd_id=xx&stoken=xx&_fetch_share=1
返回字段包含title, fid, share_fid_token 等。
Args:
pwd_id: 分享 ID。
stoken: 步骤①获取的 stoken。
Returns:
分享详情字典。
Raises:
RuntimeError: API 返回错误。
"""
url = f"{UC_SHARE_API}/sharepage/detail"
params: Dict[str, str] = {
"pwd_id": pwd_id,
"stoken": stoken,
"_fetch_share": "1",
}
headers = self.credential.get_headers()
logger.info("[UcTransfer] ② Fetching share detail for pwd_id=%s", pwd_id)
try:
resp = self.session.get(
url, params=params, headers=headers, timeout=self.timeout
)
resp.raise_for_status()
except requests.RequestException as exc:
raise RuntimeError(f"获取分享详情失败: {exc}") from exc
data: Dict[str, Any] = resp.json()
status: int = data.get("status", -1)
if status != 0 and data.get("code") not in (0, None):
raise RuntimeError(
f"分享详情API返回错误: status={status}, message={data.get('message')}"
)
detail: Optional[Dict[str, Any]] = data.get("data")
if not detail:
raise RuntimeError(f"分享详情数据为空, response: {data}")
logger.info(
"[UcTransfer] ② Detail: title=%s, fid=%s",
detail.get("title"),
detail.get("fid"),
)
return detail
# ─── 步骤 ③:发起转存 ─────────────────────────────────────────
def _init_save(
self,
pwd_id: str,
stoken: str,
detail: Dict[str, Any],
to_pdir_fid: str = "0",
) -> str:
"""步骤③:发起转存请求。
POST /1/clouddrive/share/sharepage/save
Body: {
"fid_list": [<fid>, ...],
"fid_token_list": [<share_fid_token>, ...],
"to_pdir_fid": "0",
"pwd_id": "<pwd_id>",
"stoken": "<stoken>",
"pdir_fid": "0",
"scene": "link"
}
Args:
pwd_id: 分享 ID。
stoken: stoken。
detail: 步骤②的分享详情。
to_pdir_fid: 目标目录 ID默认 "0" 即根目录。
Returns:
task_id 字符串,用于步骤④轮询。
Raises:
RuntimeError: API 返回错误。
"""
url = f"{UC_SHARE_API}/sharepage/save"
fid_list: List[str] = detail.get("fid_list", [detail.get("fid", [])])
fid_token_list: List[str] = detail.get(
"fid_token_list", [detail.get("share_fid_token", [])]
)
# 如果 detail 的 fid/fid_token 是单值而非列表,则包装为列表
if not isinstance(fid_list, list):
fid_list = [fid_list] if fid_list else []
if not isinstance(fid_token_list, list):
fid_token_list = [fid_token_list] if fid_token_list else []
body: Dict[str, Any] = {
"fid_list": fid_list,
"fid_token_list": fid_token_list,
"to_pdir_fid": to_pdir_fid,
"pwd_id": pwd_id,
"stoken": stoken,
"pdir_fid": "0",
"scene": "link",
}
headers = self.credential.get_headers()
headers.setdefault("Content-Type", "application/json")
logger.info(
"[UcTransfer] ③ Initiating save: %d files to dir=%s",
len(fid_list),
to_pdir_fid,
)
try:
resp = self.session.post(
url, json=body, headers=headers, timeout=self.timeout
)
resp.raise_for_status()
except requests.RequestException as exc:
raise RuntimeError(f"发起转存失败: {exc}") from exc
data: Dict[str, Any] = resp.json()
status: int = data.get("status", -1)
if status != 0:
raise RuntimeError(
f"转存请求失败: status={status}, message={data.get('message')}"
)
task_id: Optional[str] = data.get("data", {}).get("task_id")
if not task_id:
raise RuntimeError(f"转存 task_id 缺失, response: {data}")
logger.info("[UcTransfer] ③ Save task created: task_id=%s", task_id)
return task_id
# ─── 步骤 ④:轮询转存任务 ─────────────────────────────────────
def _poll_save_task(self, task_id: str) -> List[str]:
"""步骤④:轮询转存任务直到完成。
GET /1/clouddrive/task?task_id=<task_id>&retry_index=0
当 status==2 时表示任务成功完成status==-1 表示失败。
Args:
task_id: 步骤③返回的 task_id。
Returns:
save_as_top_fids 列表(转存后的文件 ID
Raises:
RuntimeError: 任务失败或超时。
"""
url = f"{UC_API_BASE}/1/clouddrive/task"
headers = self.credential.get_headers()
for attempt in range(1, self.poll_max_attempts + 1):
params: Dict[str, str] = {
"task_id": task_id,
"retry_index": "0",
}
try:
resp = self.session.get(
url, params=params, headers=headers, timeout=self.timeout
)
resp.raise_for_status()
except requests.RequestException:
logger.warning(
"[UcTransfer] ④ Poll attempt %d/%d failed, retrying...",
attempt,
self.poll_max_attempts,
)
time.sleep(self.poll_interval)
continue
data: Dict[str, Any] = resp.json()
task_status: int = data.get("data", {}).get("status", -1)
logger.debug(
"[UcTransfer] ④ Poll %d/%d: status=%d",
attempt,
self.poll_max_attempts,
task_status,
)
if task_status == 2: # 成功
save_as_top_fids: List[str] = (
data.get("data", {})
.get("save_as", {})
.get("save_as_top_fids", [])
)
logger.info(
"[UcTransfer] ④ Save completed: %d files saved",
len(save_as_top_fids),
)
return save_as_top_fids
if task_status == -1:
raise RuntimeError(
f"转存任务失败: task_id={task_id}, response={data}"
)
time.sleep(self.poll_interval)
raise RuntimeError(
f"转存任务超时: task_id={task_id}, 已轮询 {self.poll_max_attempts}"
)
# ─── 步骤 ⑤:发起创建分享 ─────────────────────────────────────
def _init_share(
self, fid_list: List[str], title: str, expired_type: int = 1
) -> str:
"""步骤⑤:创建分享链接。
POST /1/clouddrive/share
Body: {"fid_list": [<fid>, ...], "title": "<title>", "expired_type": 1}
Args:
fid_list: 要分享的文件 ID 列表。
title: 分享标题。
expired_type: 过期类型1=永久有效(默认)。
Returns:
task_id 字符串,用于步骤⑥轮询。
Raises:
RuntimeError: API 返回错误。
"""
url = f"{UC_SHARE_API}"
body: Dict[str, Any] = {
"fid_list": fid_list,
"title": title or "分享",
"expired_type": expired_type,
}
headers = self.credential.get_headers()
headers.setdefault("Content-Type", "application/json")
logger.info(
"[UcTransfer] ⑤ Creating share: %d files, title='%s'", len(fid_list), title
)
try:
resp = self.session.post(
url, json=body, headers=headers, timeout=self.timeout
)
resp.raise_for_status()
except requests.RequestException as exc:
raise RuntimeError(f"创建分享失败: {exc}") from exc
data: Dict[str, Any] = resp.json()
status: int = data.get("status", -1)
if status != 0 and data.get("code") not in (0, None):
raise RuntimeError(
f"创建分享请求失败: status={status}, message={data.get('message')}"
)
task_id: Optional[str] = data.get("data", {}).get("task_id")
if not task_id:
raise RuntimeError(f"分享 task_id 缺失, response: {data}")
logger.info("[UcTransfer] ⑤ Share task created: task_id=%s", task_id)
return task_id
# ─── 步骤 ⑥:轮询分享任务 ─────────────────────────────────────
def _poll_share_task(self, task_id: str) -> str:
"""步骤⑥:轮询分享任务直到完成。
GET /1/clouddrive/task?task_id=<task_id>&retry_index=0
status==2 完成,返回 share_id。
Args:
task_id: 步骤⑤返回的 task_id。
Returns:
share_id 字符串。
Raises:
RuntimeError: 任务失败或超时。
"""
url = f"{UC_API_BASE}/1/clouddrive/task"
headers = self.credential.get_headers()
for attempt in range(1, self.poll_max_attempts + 1):
params: Dict[str, str] = {
"task_id": task_id,
"retry_index": "0",
}
try:
resp = self.session.get(
url, params=params, headers=headers, timeout=self.timeout
)
resp.raise_for_status()
except requests.RequestException:
logger.warning(
"[UcTransfer] ⑥ Poll attempt %d/%d failed, retrying...",
attempt,
self.poll_max_attempts,
)
time.sleep(self.poll_interval)
continue
data: Dict[str, Any] = resp.json()
task_status: int = data.get("data", {}).get("status", -1)
logger.debug(
"[UcTransfer] ⑥ Poll %d/%d: status=%d",
attempt,
self.poll_max_attempts,
task_status,
)
if task_status == 2: # 成功
share_id: Optional[str] = data.get("data", {}).get("share_id")
if not share_id:
share_id = (
data.get("data", {}).get("result", {}).get("share_id", "")
)
if not share_id:
raise RuntimeError(f"分享完成但 share_id 缺失: {data}")
logger.info("[UcTransfer] ⑥ Share completed: share_id=%s", share_id)
return share_id
if task_status == -1:
raise RuntimeError(
f"分享任务失败: task_id={task_id}, response={data}"
)
time.sleep(self.poll_interval)
raise RuntimeError(
f"分享任务超时: task_id={task_id}, 已轮询 {self.poll_max_attempts}"
)
# ─── 步骤 ⑦:设置分享密码 ─────────────────────────────────────
def _set_password(self, share_id: str, password: str = "") -> Tuple[str, str]:
"""步骤⑦:设置分享密码并获取分享链接。
POST /1/clouddrive/share/password
Body: {"share_id": "<share_id>"}
Args:
share_id: 步骤⑥返回的 share_id。
password: 分享密码,空字符串表示无密码。
Returns:
(share_url, passcode) 元组。
Raises:
RuntimeError: API 返回错误。
"""
url = f"{UC_SHARE_API}/password"
body: Dict[str, str] = {
"share_id": share_id,
}
headers = self.credential.get_headers()
headers.setdefault("Content-Type", "application/json")
logger.info("[UcTransfer] ⑦ Setting password for share_id=%s", share_id)
try:
resp = self.session.post(
url, json=body, headers=headers, timeout=self.timeout
)
resp.raise_for_status()
except requests.RequestException as exc:
raise RuntimeError(f"设置分享密码失败: {exc}") from exc
data: Dict[str, Any] = resp.json()
status: int = data.get("status", -1)
if status != 0 and data.get("code") not in (0, None):
raise RuntimeError(
f"设置密码失败: status={status}, message={data.get('message')}"
)
share_url: str = data.get("data", {}).get("share_url", "")
passcode: str = data.get("data", {}).get("passcode", password)
if not share_url:
# 用 share_id 构造默认分享链接
share_url = f"https://drive.uc.cn/s/{share_id}"
logger.info(
"[UcTransfer] ⑦ Password set: share_url=%s, passcode=%s",
share_url,
passcode,
)
return share_url, passcode
# ─── 公开入口 ─────────────────────────────────────────────────
def transfer(
self,
share_url: str,
save_dir: str = "0",
share_password: str = "",
) -> Dict[str, Any]:
"""执行完整的 7 步转存流程。
从原始 UC 分享链接开始,将文件转存到自己网盘,再创建新分享。
Args:
share_url: 原始 UC 分享链接,如 https://drive.uc.cn/s/xxxxx。
save_dir: 转存目标目录 ID默认 "0"(根目录)。
share_password: 新分享的密码,空字符串表示无密码。
Returns:
包含以下字段的字典:
- success: bool
- new_file_ids: List[str] — 转存后的文件ID列表
- file_name: str — 分享标题
- share_url: str — 新分享链接
- passcode: str — 新分享密码
Raises:
RuntimeError: 任一步骤失败。
ValueError: URL 解析失败。
"""
# 0. 解析 URL 提取 pwd_id
match = SHARE_URL_PATTERN.search(share_url)
if not match:
raise ValueError(f"无法从URL中提取UC分享ID: {share_url}")
pwd_id: str = match.group(1)
logger.info("[UcTransfer] Starting 7-step transfer for pwd_id=%s", pwd_id)
# ① 获取 stoken
stoken: str = self._get_stoken(pwd_id)
# ② 获取分享详情
detail: Dict[str, Any] = self._get_detail(pwd_id, stoken)
# ③ 发起转存 → ④ 轮询
task_id: str = self._init_save(pwd_id, stoken, detail, to_pdir_fid=save_dir)
new_file_ids: List[str] = self._poll_save_task(task_id)
if not new_file_ids:
raise RuntimeError("转存完成但未获取到文件ID")
# ⑤ 创建分享 → ⑥ 轮询
title: str = detail.get("title", "分享")
share_task_id: str = self._init_share(new_file_ids, title)
share_id: str = self._poll_share_task(share_task_id)
# ⑦ 设置密码
share_url_new, passcode = self._set_password(share_id, share_password)
logger.info(
"[UcTransfer] Transfer complete: %d files, new_share=%s",
len(new_file_ids),
share_url_new,
)
return {
"success": True,
"new_file_ids": new_file_ids,
"file_name": title,
"share_url": share_url_new,
"passcode": passcode,
}
@staticmethod
def parse_share_url(url: str) -> Optional[str]:
"""从 UC 分享 URL 中提取 pwd_id。
Args:
url: UC 分享链接。
Returns:
pwd_id 字符串,解析失败返回 None。
"""
match = SHARE_URL_PATTERN.search(url)
return match.group(1) if match else None
def close(self) -> None:
"""关闭 HTTP 会话。"""
self.session.close()
def __enter__(self) -> "UcTransfer":
return self
def __exit__(self, *args: Any) -> None:
self.close()