v0.2.7: 修复Redis连接 + 启动管理后台

- 修复Redis认证 (配置密码)
- 启动Python管理后台 (端口9531, 15个功能开关)
- 统一版本号 0.2.7
- 更新docker-compose.yml (镜像版本/Redis URL/Admin服务)
This commit is contained in:
2026-05-17 02:22:18 +08:00
commit 83cbfaf03f
164 changed files with 25195 additions and 0 deletions

View File

@@ -0,0 +1,493 @@
"""
CloudSearch Transfer — UC网盘适配器 v1.0.0
将 UcCredentialManager、UcTransfer、UcCleanup 组合为
BaseCloudDriveAdapter 的完整实现。
UC网盘 7 步 API 转存流程与夸克高度相似API 域名不同):
① POST .../share/sharepage/v2/detail?pr=UCBrowser&fr=pc → stoken
② GET .../share/sharepage/detail → fid, share_fid_token, title
③ POST .../share/sharepage/save → task_id (转存)
④ 轮询 GET .../task → save_as_top_fids
⑤ POST .../share → task_id (创建分享)
⑥ 轮询 GET .../task → share_id
⑦ POST .../share/password → share_url, passcode
参考 cloud-auto-save 的 quark 实现,域名从 drive-pc.quark.cn 改为 pc-api.uc.cn。
"""
from __future__ import annotations
import logging
import time
from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import urlparse, parse_qs
from ..base import BaseCloudDriveAdapter, FileInfo, TransferResult, VerifyResult
from ...config import PlatformConfig, TransferConfig
from ...errors import TransferError, TransferErrorCode
from .credential import UcCredentialManager
from .transfer import UcTransfer, SHARE_URL_PATTERN
from .cleanup import UcCleanup
logger = logging.getLogger(__name__)
class UcAdapter(BaseCloudDriveAdapter):
"""UC网盘适配器。
组合 credential / transfer / cleanup 三个模块,
实现 BaseCloudDriveAdapter 定义的所有抽象方法。
Attributes:
PLATFORM_NAME: 展示用平台名称。
PLATFORM_KEY: 内部平台标识。
URL_PATTERNS: UC 分享链接匹配正则列表。
"""
# ─── 平台标识 ──────────────────────────────────────────────
PLATFORM_NAME: str = "UC网盘"
PLATFORM_KEY: str = "uc"
# ─── URL 匹配 ──────────────────────────────────────────────
# 支持 drive.uc.cn/s/<share_id>
URL_PATTERNS: List[str] = [
r"drive\.uc\.cn/s/(\w+)",
]
def __init__(self, config: PlatformConfig, transfer_config: TransferConfig) -> None:
"""初始化 UC 适配器。
Args:
config: 平台配置(含 Cookie 等)。
transfer_config: 全局转存配置(超时、重试、轮询参数等)。
"""
super().__init__(config, transfer_config)
# 初始化三个子模块
self._credential: UcCredentialManager = UcCredentialManager(
cookie=config.cookie
)
self._transfer_engine: UcTransfer = UcTransfer(
credential=self._credential,
timeout=transfer_config.request_timeout,
poll_interval=transfer_config.task_poll_interval,
poll_max_attempts=transfer_config.task_poll_max_attempts,
)
self._cleanup: UcCleanup = UcCleanup(
credential=self._credential,
timeout=transfer_config.request_timeout,
)
# ═══════════════════════════════════════════════════════════════
# 公开接口实现
# ═══════════════════════════════════════════════════════════════
def _setup_session(self) -> None:
"""将 UC Cookie 注入 session 的默认 headers。"""
headers = self._credential.get_headers()
if headers:
self.session.headers.update(headers)
logger.debug("[UcAdapter] Session headers updated with Cookie")
def transfer(self, share_url: str, save_dir: str = "",
share_password: str = "") -> TransferResult:
"""执行转存的核心逻辑(覆盖基类实现 UC 专用流程)。
通过 UcTransfer 引擎执行完整的 7 步流程。
Args:
share_url: UC 分享链接。
save_dir: 目标目录,空则使用配置的默认目录。
share_password: 新分享的密码。
Returns:
TransferResult 包含转存结果。
"""
start: float = time.time()
# 凭证检查
if not self._credential.validate():
raise TransferError(
TransferErrorCode.NOT_LOGIN,
message="UC Cookie 无效或长度不足",
platform=self.PLATFORM_KEY,
)
# 目标目录:默认根目录 "0"
target_dir: str = save_dir or self.config.save_dir or "0"
# 分享密码
pwd: str = share_password or self.config.share_password or ""
try:
result: Dict[str, Any] = self._transfer_engine.transfer(
share_url=share_url,
save_dir=target_dir,
share_password=pwd,
)
except ValueError as exc:
raise TransferError(
TransferErrorCode.URL_INVALID,
message=str(exc),
platform=self.PLATFORM_KEY,
) from exc
except RuntimeError as exc:
msg: str = str(exc)
if "stoken" in msg or "status" in msg:
raise TransferError(
TransferErrorCode.SHARE_NOT_EXIST,
message=msg,
platform=self.PLATFORM_KEY,
) from exc
raise TransferError(
TransferErrorCode.NETWORK_ERROR,
message=msg,
platform=self.PLATFORM_KEY,
) from exc
elapsed: int = int((time.time() - start) * 1000)
# 广告过滤
new_fids: List[str] = result.get("new_file_ids", [])
if self.transfer_config.ad_filter_enabled and new_fids:
new_fids = self._filter_ads(new_fids)
if not new_fids:
raise TransferError(
TransferErrorCode.RESOURCE_EMPTY,
platform=self.PLATFORM_KEY,
)
return TransferResult(
success=True,
platform=self.PLATFORM_KEY,
new_file_id=",".join(new_fids),
file_name=result.get("file_name", ""),
share_url=result.get("share_url", ""),
share_password=result.get("passcode", pwd),
original_url=share_url,
elapsed_ms=elapsed,
)
def verify(self, share_url: str) -> VerifyResult:
"""验证 UC 分享链接有效性。
Args:
share_url: UC 分享链接。
Returns:
VerifyResult 包含验证结果。
"""
try:
pwd_id, passcode = self._parse_share_url(share_url)
if not self._credential.validate():
return VerifyResult(
valid=False,
platform=self.PLATFORM_KEY,
error=TransferError(
TransferErrorCode.NOT_LOGIN,
platform=self.PLATFORM_KEY,
),
)
stoken: str = self._transfer_engine._get_stoken(pwd_id, passcode)
detail: Dict[str, Any] = self._transfer_engine._get_detail(pwd_id, stoken)
files: List[FileInfo] = self._extract_file_list(detail)
return VerifyResult(
valid=True,
platform=self.PLATFORM_KEY,
title=detail.get("title", ""),
file_count=len(files),
files=files,
)
except TransferError:
raise
except (ValueError, RuntimeError) as exc:
return VerifyResult(
valid=False,
platform=self.PLATFORM_KEY,
error=TransferError(
TransferErrorCode.SHARE_NOT_EXIST,
message=str(exc),
platform=self.PLATFORM_KEY,
),
)
except Exception as exc:
return VerifyResult(
valid=False,
platform=self.PLATFORM_KEY,
error=TransferError(
TransferErrorCode.NETWORK_ERROR,
message=str(exc),
platform=self.PLATFORM_KEY,
),
)
# ─── 核心抽象方法 ─────────────────────────────────────────
def _get_share_detail(self, pwd_id: str, passcode: str = "") -> dict:
"""获取 UC 分享详情。
Args:
pwd_id: 分享 ID。
passcode: 提取码。
Returns:
分享详情字典,包含 title, fid, share_fid_token 等字段。
"""
stoken: str = self._transfer_engine._get_stoken(pwd_id, passcode)
return self._transfer_engine._get_detail(pwd_id, stoken)
def _save_files(self, pwd_id: str, detail: dict, save_dir: str) -> List[str]:
"""转存文件到自己的 UC 网盘。
Args:
pwd_id: 分享 ID。
detail: 分享详情(来自 _get_share_detail
save_dir: 目标目录 ID。
Returns:
转存后的新文件 ID 列表。
"""
stoken: str = self._transfer_engine._get_stoken(pwd_id)
task_id: str = self._transfer_engine._init_save(
pwd_id, stoken, detail, to_pdir_fid=save_dir
)
return self._transfer_engine._poll_save_task(task_id)
def _create_share(
self, file_ids: List[str], title: str, password: str = ""
) -> Tuple[str, str]:
"""创建 UC 分享链接。
Args:
file_ids: 要分享的文件 ID 列表。
title: 分享标题。
password: 分享密码。
Returns:
(share_url, share_password) 元组。
"""
task_id: str = self._transfer_engine._init_share(file_ids, title)
share_id: str = self._transfer_engine._poll_share_task(task_id)
return self._transfer_engine._set_password(share_id, password)
def _extract_file_list(self, detail: dict) -> List[FileInfo]:
"""从 UC 分享详情中提取文件列表。
UC 的 sharepage/detail 返回格式与夸克一致:
{
"files": [
{"fid": "...", "file_name": "...", "size": 123, "dir": false, ...},
]
}
Args:
detail: 分享详情字典。
Returns:
FileInfo 对象列表。
"""
files_data: List[Dict[str, Any]] = detail.get("files", [])
result: List[FileInfo] = []
for f in files_data:
file_info = FileInfo(
fid=str(f.get("fid", f.get("file_id", ""))),
name=str(f.get("file_name", f.get("name", ""))),
size=int(f.get("size", 0)),
is_dir=bool(f.get("dir", f.get("is_dir", False))),
ext=str(f.get("ext", f.get("file_extension", ""))),
)
result.append(file_info)
# 如果 files 为空,尝试用 detail 顶层字段构造单个文件信息
if not result and detail.get("fid"):
result.append(
FileInfo(
fid=str(detail.get("fid", "")),
name=str(detail.get("title", detail.get("file_name", ""))),
size=0,
is_dir=False,
)
)
return result
def _filter_ads(self, file_ids: List[str]) -> List[str]:
"""过滤广告文件。
Args:
file_ids: 文件 ID 列表。
Returns:
过滤后的文件 ID 列表。
"""
keywords: List[str] = list(
set(self.config.banned_keywords)
| set(self.transfer_config.default_banned_keywords)
)
if not keywords:
return file_ids
try:
files: List[FileInfo] = self.get_files()
file_names: List[str] = [f.name for f in files]
return UcCleanup.filter_ad_ids(file_ids, file_names, keywords)
except Exception:
logger.warning(
"[UcAdapter] Cannot fetch file list for ad filtering, skipping"
)
return file_ids
# ─── get_files / delete ────────────────────────────────────
def get_files(self, parent_fid: str = "0") -> List[FileInfo]:
"""列出 UC 网盘指定目录下的文件。
GET /1/clouddrive/file/sort?pdir_fid=<parent_fid>&_page=1&_size=100&_sort=updated_at:desc
Args:
parent_fid: 父目录 ID默认 "0" 即根目录。
Returns:
FileInfo 列表。
"""
url: str = f"https://pc-api.uc.cn/1/clouddrive/file/sort"
params: Dict[str, str] = {
"pdir_fid": parent_fid,
"_page": "1",
"_size": "100",
"_sort": "updated_at:desc",
}
headers: Dict[str, str] = self._credential.get_headers()
try:
resp = self._get(url, params=params, headers=headers)
except Exception as exc:
raise TransferError(
TransferErrorCode.NETWORK_ERROR,
message=f"获取文件列表失败: {exc}",
platform=self.PLATFORM_KEY,
) from exc
data: Dict[str, Any] = resp.json()
status: int = data.get("status", -1)
if status != 0 and data.get("code") not in (0, None):
raise TransferError(
TransferErrorCode.NETWORK_ERROR,
message=f"获取文件列表失败: {data.get('message')}",
platform=self.PLATFORM_KEY,
)
files_data: List[Dict[str, Any]] = data.get("data", {}).get("list", [])
result: List[FileInfo] = []
for f in files_data:
result.append(
FileInfo(
fid=str(f.get("fid", "")),
name=str(f.get("file_name", f.get("name", ""))),
size=int(f.get("size", 0)),
is_dir=bool(f.get("dir", f.get("is_dir", False))),
ext=str(f.get("file_extension", f.get("ext", ""))),
)
)
logger.debug("[UcAdapter] Listed %d files in dir=%s", len(result), parent_fid)
return result
def delete(self, file_ids: List[str]) -> bool:
"""删除 UC 网盘文件(移到回收站)。
Args:
file_ids: 要删除的文件 ID 列表。
Returns:
True 表示删除成功。
"""
if not self._credential.validate():
raise TransferError(
TransferErrorCode.NOT_LOGIN,
platform=self.PLATFORM_KEY,
)
try:
return self._cleanup.delete_files(file_ids)
except RuntimeError as exc:
raise TransferError(
TransferErrorCode.NETWORK_ERROR,
message=str(exc),
platform=self.PLATFORM_KEY,
) from exc
def delete_permanent(self, file_ids: List[str]) -> bool:
"""彻底删除 UC 网盘文件(不可恢复)。
Args:
file_ids: 要彻底删除的文件 ID 列表。
Returns:
True 表示删除成功。
"""
if not self._credential.validate():
raise TransferError(
TransferErrorCode.NOT_LOGIN,
platform=self.PLATFORM_KEY,
)
try:
return self._cleanup.delete_files_permanent(file_ids)
except RuntimeError as exc:
raise TransferError(
TransferErrorCode.NETWORK_ERROR,
message=str(exc),
platform=self.PLATFORM_KEY,
) from exc
# ─── 工具方法 ─────────────────────────────────────────────
def _parse_share_url(self, url: str) -> Tuple[str, str]:
"""解析 UC 分享 URL 提取 (pwd_id, passcode)。
UC 链接格式https://drive.uc.cn/s/<pwd_id> 或带 ?pwd=xxxx
Args:
url: UC 分享链接。
Returns:
(pwd_id, passcode) 元组。
Raises:
TransferError: URL 格式无法识别。
"""
pwd_id: Optional[str] = UcTransfer.parse_share_url(url)
if not pwd_id:
raise TransferError(
TransferErrorCode.URL_INVALID,
message=f"无法解析UC链接: {url}",
platform=self.PLATFORM_KEY,
)
parsed = urlparse(url)
params = parse_qs(parsed.query)
passcode: str = params.get("pwd", params.get("code", [""]))[0]
return pwd_id, passcode
def update_cookie(self, cookie: str) -> None:
"""动态更新 Cookie 并同步到 session headers。
Args:
cookie: 新的 Cookie 字符串。
"""
self._credential.update_cookie(cookie)
self._setup_session()
logger.info("[UcAdapter] Cookie updated, new length=%d", len(cookie))
def close(self) -> None:
"""关闭所有子模块的 HTTP 会话。"""
self._transfer_engine.close()

View File

@@ -0,0 +1,218 @@
"""
CloudSearch Transfer — UC网盘清理模块 v1.0.0
提供文件删除和广告过滤功能。API 与夸克相同,仅域名不同。
"""
from __future__ import annotations
import logging
from typing import Any, Dict, List
import requests
from .credential import UcCredentialManager
logger = logging.getLogger(__name__)
# ─── UC API ─────────────────────────────────────────────────────────
UC_API_BASE = "https://pc-api.uc.cn"
UC_FILE_API = f"{UC_API_BASE}/1/clouddrive/file"
class UcCleanup:
"""UC 网盘文件清理器。
提供批量删除文件和广告文件过滤功能。
Attributes:
credential: UC 凭证管理器。
session: 复用的 requests.Session。
timeout: HTTP 请求超时秒数。
"""
def __init__(
self,
credential: UcCredentialManager,
timeout: int = 30,
) -> None:
"""初始化清理器。
Args:
credential: 有效的 UC 凭证管理器。
timeout: HTTP 请求超时秒数。
"""
self.credential: UcCredentialManager = credential
self.timeout: int = timeout
self.session: requests.Session = requests.Session()
def delete_files(self, file_ids: List[str]) -> bool:
"""批量删除文件(回收站方式)。
POST /1/clouddrive/file/delete
Body: {
"action_type": 2,
"filelist": ["<fid1>", "<fid2>", ...]
}
action_type=1 表示彻底删除action_type=2 表示移入回收站。
Args:
file_ids: 要删除的文件 ID 列表。
Returns:
True 表示删除请求已提交成功False 表示失败。
Raises:
RuntimeError: HTTP 请求错误。
"""
if not file_ids:
logger.warning("[UcCleanup] delete_files called with empty list")
return True
url: str = f"{UC_FILE_API}/delete"
body: Dict[str, Any] = {
"action_type": 2, # 2=回收站, 1=彻底删除
"filelist": file_ids,
}
headers = self.credential.get_headers()
headers.setdefault("Content-Type", "application/json")
logger.info("[UcCleanup] Deleting %d files: %s", len(file_ids), file_ids)
try:
resp = self.session.post(
url, json=body, headers=headers, timeout=self.timeout
)
resp.raise_for_status()
except requests.RequestException as exc:
raise RuntimeError(f"删除文件失败: {exc}") from exc
data: Dict[str, Any] = resp.json()
status: int = data.get("status", -1)
if status != 0 and data.get("code") not in (0, None):
logger.error(
"[UcCleanup] Delete returned error: status=%s, message=%s",
status,
data.get("message"),
)
return False
logger.info("[UcCleanup] Delete succeeded for %d files", len(file_ids))
return True
def delete_files_permanent(self, file_ids: List[str]) -> bool:
"""彻底删除文件(不从回收站恢复)。
与 delete_files 类似,但 action_type=1。
Args:
file_ids: 要彻底删除的文件 ID 列表。
Returns:
True 表示删除请求已提交成功。
"""
if not file_ids:
return True
url: str = f"{UC_FILE_API}/delete"
body: Dict[str, Any] = {
"action_type": 1, # 1=彻底删除
"filelist": file_ids,
}
headers = self.credential.get_headers()
headers.setdefault("Content-Type", "application/json")
logger.info("[UcCleanup] Permanently deleting %d files", len(file_ids))
try:
resp = self.session.post(
url, json=body, headers=headers, timeout=self.timeout
)
resp.raise_for_status()
except requests.RequestException as exc:
raise RuntimeError(f"彻底删除失败: {exc}") from exc
data: Dict[str, Any] = resp.json()
return data.get("status") == 0 or data.get("code") in (0, None)
@staticmethod
def filter_ads(
files: List[Dict[str, Any]],
banned_keywords: List[str],
) -> List[Dict[str, Any]]:
"""按关键词过滤文件列表中的广告文件。
遍历文件列表,剔除文件名中包含任一 banned_keywords 的文件。
匹配方式:不区分大小写的子串匹配。
Args:
files: 文件信息字典列表,每个字典需包含 "name" 字段。
banned_keywords: 被禁关键词列表(匹配不区分大小写)。
Returns:
过滤后的文件信息列表。
"""
if not banned_keywords:
return files
filtered: List[Dict[str, Any]] = []
removed_count: int = 0
for f in files:
name: str = f.get("name", "")
name_lower: str = str(name).lower()
if any(keyword.lower() in name_lower for keyword in banned_keywords):
logger.info("[UcCleanup] Filtered ad file: '%s'", name)
removed_count += 1
continue
filtered.append(f)
if removed_count > 0:
logger.info(
"[UcCleanup] Ad filter removed %d/%d files", removed_count, len(files)
)
return filtered
@staticmethod
def filter_ad_ids(
file_ids: List[str],
file_names: List[str],
banned_keywords: List[str],
) -> List[str]:
"""按关键词过滤文件 ID 列表。
根据 file_names 判断是否为广告,返回对应的 file_ids。
Args:
file_ids: 文件 ID 列表。
file_names: 与 file_ids 一一对应的文件名列表。
banned_keywords: 被禁关键词列表。
Returns:
过滤后的 file_ids 列表。
"""
if not banned_keywords or len(file_ids) != len(file_names):
return file_ids
filtered_ids: List[str] = []
for fid, name in zip(file_ids, file_names):
name_lower: str = str(name).lower()
if any(kw.lower() in name_lower for kw in banned_keywords):
logger.info("[UcCleanup] Filtered ad file: '%s' (id=%s)", name, fid)
continue
filtered_ids.append(fid)
return filtered_ids
def close(self) -> None:
"""关闭 HTTP 会话。"""
self.session.close()
def __enter__(self) -> "UcCleanup":
return self
def __exit__(self, *args: Any) -> None:
self.close()

View File

@@ -0,0 +1,95 @@
"""
CloudSearch Transfer — UC网盘凭证管理 v1.0.0
UC网盘使用 Cookie 直传(与夸克高度相似),无需 token 刷新机制。
验证方式:检查 Cookie 字符串长度是否 >= 50。
"""
from __future__ import annotations
import logging
from typing import Dict
logger = logging.getLogger(__name__)
class UcCredentialManager:
"""UC 网盘凭证管理器。
UC 网盘的转存 API 直接从 Cookie 中读取认证信息,
与夸克网盘机制完全一致,只是 API 域名不同pc-api.uc.cn
Attributes:
cookie: 存储的 UC Cookie 字符串。
"""
# UC Cookie 最小长度阈值(与夸克一致)
MIN_COOKIE_LENGTH: int = 50
# UC 网盘 Referer
REFERER: str = "https://drive.uc.cn/"
def __init__(self, cookie: str = "") -> None:
"""初始化凭证管理器。
Args:
cookie: UC 网盘的 Cookie 字符串。
"""
self.cookie: str = cookie
def validate(self) -> bool:
"""验证 Cookie 是否满足最小长度要求。
Returns:
True 表示 Cookie 长度 >= MIN_COOKIE_LENGTH否则为 False。
"""
if not self.cookie:
logger.warning("[UcCredential] Cookie is empty")
return False
valid = len(self.cookie) >= self.MIN_COOKIE_LENGTH
if not valid:
logger.warning(
"[UcCredential] Cookie too short: len=%d, min=%d",
len(self.cookie),
self.MIN_COOKIE_LENGTH,
)
return valid
def is_valid(self) -> bool:
"""validate() 的别名,便于适配器层调用。"""
return self.validate()
def get_headers(self) -> Dict[str, str]:
"""构建带 Cookie 认证的 HTTP 请求头。
UC API 需要在每次请求头中携带完整的 Cookie 字符串,
以及 Referer: https://drive.uc.cn/。
Returns:
包含 Cookie 和 Referer 字段的请求头字典。
Cookie 无效时仍返回空字典。
"""
if not self.validate():
logger.warning("[UcCredential] Cannot build headers: cookie invalid")
return {}
return {
"Cookie": self.cookie,
"Referer": self.REFERER,
}
def update_cookie(self, cookie: str) -> None:
"""更新 Cookie 字符串(用于手动刷新场景)。
Args:
cookie: 新的 Cookie 字符串。
"""
self.cookie = cookie
logger.info("[UcCredential] Cookie updated, new length=%d", len(cookie))
def __repr__(self) -> str:
return (
f"UcCredentialManager(cookie_len={len(self.cookie) if self.cookie else 0}, "
f"valid={self.validate()})"
)

View File

@@ -0,0 +1,619 @@
"""
CloudSearch Transfer — UC网盘转存核心 v1.0.0
UC网盘 7 步转存流程与夸克高度相似API 域名不同):
① POST .../share/sharepage/v2/detail?pr=UCBrowser&fr=pc → stoken
② GET .../share/sharepage/detail → fid, share_fid_token, title
③ POST .../share/sharepage/save → task_id (转存)
④ 轮询 GET .../task → save_as_top_fids (status==2 完成)
⑤ POST .../share → task_id (创建分享)
⑥ 轮询 GET .../task → share_id
⑦ POST .../share/password → share_url, passcode
参考 cloud-auto-save 的 quark 实现,域名从 drive-pc.quark.cn 改为 pc-api.uc.cn。
"""
from __future__ import annotations
import logging
import re
import time
from typing import Any, Dict, List, Optional, Tuple
import requests
from .credential import UcCredentialManager
logger = logging.getLogger(__name__)
# ─── UC API 基础地址 ────────────────────────────────────────────────
UC_API_BASE = "https://pc-api.uc.cn"
UC_SHARE_API = f"{UC_API_BASE}/1/clouddrive/share"
# ─── URL 解析正则 ───────────────────────────────────────────────────
# 匹配 drive.uc.cn/s/<share_id>
SHARE_URL_PATTERN = re.compile(r"drive\.uc\.cn/s/(\w+)")
class UcTransfer:
"""UC 网盘转存引擎。
封装完整的 7 步 API 流程:获取 stoken → 获取详情 → 保存文件 →
创建分享 → 设置密码。
Attributes:
credential: UC 凭证管理器实例。
session: 复用的 requests.Session。
timeout: 请求超时(秒)。
poll_interval: 轮询间隔(秒)。
poll_max_attempts: 最大轮询次数。
"""
def __init__(
self,
credential: UcCredentialManager,
timeout: int = 30,
poll_interval: float = 0.5,
poll_max_attempts: int = 50,
) -> None:
"""初始化转存引擎。
Args:
credential: 有效的 UC 凭证管理器。
timeout: HTTP 请求超时秒数。
poll_interval: 异步任务轮询间隔秒数。
poll_max_attempts: 异步任务最大轮询次数。
"""
self.credential: UcCredentialManager = credential
self.timeout: int = timeout
self.poll_interval: float = poll_interval
self.poll_max_attempts: int = poll_max_attempts
self.session: requests.Session = requests.Session()
# ─── 步骤 ①:获取 stoken ───────────────────────────────────────
def _get_stoken(self, pwd_id: str, passcode: str = "") -> str:
"""步骤①:向 UC 交换 stoken。
POST /1/clouddrive/share/sharepage/v2/detail?pr=UCBrowser&fr=pc
Body: {"passcode": "", "pwd_id": "<share_id>"}
响应: data.token_info.stoken
UC 使用 v2/detail 接口获取 stoken与夸克的 sharepage/token 不同。
Args:
pwd_id: 分享 ID从 URL 解析)。
passcode: 分享提取码,无密码时为空字符串。
Returns:
stoken 字符串。
Raises:
RuntimeError: API 返回错误或 stoken 缺失。
"""
url = f"{UC_SHARE_API}/sharepage/v2/detail"
params: Dict[str, str] = {
"pr": "UCBrowser",
"fr": "pc",
}
body: Dict[str, str] = {
"passcode": passcode,
"pwd_id": pwd_id,
}
headers = self.credential.get_headers()
headers.setdefault("Content-Type", "application/json")
logger.info("[UcTransfer] ① Getting stoken for pwd_id=%s", pwd_id)
try:
resp = self.session.post(
url, json=body, params=params, headers=headers, timeout=self.timeout
)
resp.raise_for_status()
except requests.RequestException as exc:
raise RuntimeError(f"获取 stoken 失败: {exc}") from exc
data: Dict[str, Any] = resp.json()
# UC 的 stoken 在 data.token_info.stoken
stoken: Optional[str] = data.get("data", {}).get("token_info", {}).get("stoken")
if not stoken:
raise RuntimeError(f"stoken 缺失, response: {data}")
logger.info("[UcTransfer] ① stoken obtained")
return stoken
# ─── 步骤 ②:获取分享详情 ─────────────────────────────────────
def _get_detail(self, pwd_id: str, stoken: str) -> Dict[str, Any]:
"""步骤②:获取分享详情。
GET /1/clouddrive/share/sharepage/detail?pwd_id=xx&stoken=xx&_fetch_share=1
返回字段包含title, fid, share_fid_token 等。
Args:
pwd_id: 分享 ID。
stoken: 步骤①获取的 stoken。
Returns:
分享详情字典。
Raises:
RuntimeError: API 返回错误。
"""
url = f"{UC_SHARE_API}/sharepage/detail"
params: Dict[str, str] = {
"pwd_id": pwd_id,
"stoken": stoken,
"_fetch_share": "1",
}
headers = self.credential.get_headers()
logger.info("[UcTransfer] ② Fetching share detail for pwd_id=%s", pwd_id)
try:
resp = self.session.get(
url, params=params, headers=headers, timeout=self.timeout
)
resp.raise_for_status()
except requests.RequestException as exc:
raise RuntimeError(f"获取分享详情失败: {exc}") from exc
data: Dict[str, Any] = resp.json()
status: int = data.get("status", -1)
if status != 0 and data.get("code") not in (0, None):
raise RuntimeError(
f"分享详情API返回错误: status={status}, message={data.get('message')}"
)
detail: Optional[Dict[str, Any]] = data.get("data")
if not detail:
raise RuntimeError(f"分享详情数据为空, response: {data}")
logger.info(
"[UcTransfer] ② Detail: title=%s, fid=%s",
detail.get("title"),
detail.get("fid"),
)
return detail
# ─── 步骤 ③:发起转存 ─────────────────────────────────────────
def _init_save(
self,
pwd_id: str,
stoken: str,
detail: Dict[str, Any],
to_pdir_fid: str = "0",
) -> str:
"""步骤③:发起转存请求。
POST /1/clouddrive/share/sharepage/save
Body: {
"fid_list": [<fid>, ...],
"fid_token_list": [<share_fid_token>, ...],
"to_pdir_fid": "0",
"pwd_id": "<pwd_id>",
"stoken": "<stoken>",
"pdir_fid": "0",
"scene": "link"
}
Args:
pwd_id: 分享 ID。
stoken: stoken。
detail: 步骤②的分享详情。
to_pdir_fid: 目标目录 ID默认 "0" 即根目录。
Returns:
task_id 字符串,用于步骤④轮询。
Raises:
RuntimeError: API 返回错误。
"""
url = f"{UC_SHARE_API}/sharepage/save"
fid_list: List[str] = detail.get("fid_list", [detail.get("fid", [])])
fid_token_list: List[str] = detail.get(
"fid_token_list", [detail.get("share_fid_token", [])]
)
# 如果 detail 的 fid/fid_token 是单值而非列表,则包装为列表
if not isinstance(fid_list, list):
fid_list = [fid_list] if fid_list else []
if not isinstance(fid_token_list, list):
fid_token_list = [fid_token_list] if fid_token_list else []
body: Dict[str, Any] = {
"fid_list": fid_list,
"fid_token_list": fid_token_list,
"to_pdir_fid": to_pdir_fid,
"pwd_id": pwd_id,
"stoken": stoken,
"pdir_fid": "0",
"scene": "link",
}
headers = self.credential.get_headers()
headers.setdefault("Content-Type", "application/json")
logger.info(
"[UcTransfer] ③ Initiating save: %d files to dir=%s",
len(fid_list),
to_pdir_fid,
)
try:
resp = self.session.post(
url, json=body, headers=headers, timeout=self.timeout
)
resp.raise_for_status()
except requests.RequestException as exc:
raise RuntimeError(f"发起转存失败: {exc}") from exc
data: Dict[str, Any] = resp.json()
status: int = data.get("status", -1)
if status != 0:
raise RuntimeError(
f"转存请求失败: status={status}, message={data.get('message')}"
)
task_id: Optional[str] = data.get("data", {}).get("task_id")
if not task_id:
raise RuntimeError(f"转存 task_id 缺失, response: {data}")
logger.info("[UcTransfer] ③ Save task created: task_id=%s", task_id)
return task_id
# ─── 步骤 ④:轮询转存任务 ─────────────────────────────────────
def _poll_save_task(self, task_id: str) -> List[str]:
"""步骤④:轮询转存任务直到完成。
GET /1/clouddrive/task?task_id=<task_id>&retry_index=0
当 status==2 时表示任务成功完成status==-1 表示失败。
Args:
task_id: 步骤③返回的 task_id。
Returns:
save_as_top_fids 列表(转存后的文件 ID
Raises:
RuntimeError: 任务失败或超时。
"""
url = f"{UC_API_BASE}/1/clouddrive/task"
headers = self.credential.get_headers()
for attempt in range(1, self.poll_max_attempts + 1):
params: Dict[str, str] = {
"task_id": task_id,
"retry_index": "0",
}
try:
resp = self.session.get(
url, params=params, headers=headers, timeout=self.timeout
)
resp.raise_for_status()
except requests.RequestException:
logger.warning(
"[UcTransfer] ④ Poll attempt %d/%d failed, retrying...",
attempt,
self.poll_max_attempts,
)
time.sleep(self.poll_interval)
continue
data: Dict[str, Any] = resp.json()
task_status: int = data.get("data", {}).get("status", -1)
logger.debug(
"[UcTransfer] ④ Poll %d/%d: status=%d",
attempt,
self.poll_max_attempts,
task_status,
)
if task_status == 2: # 成功
save_as_top_fids: List[str] = (
data.get("data", {})
.get("save_as", {})
.get("save_as_top_fids", [])
)
logger.info(
"[UcTransfer] ④ Save completed: %d files saved",
len(save_as_top_fids),
)
return save_as_top_fids
if task_status == -1:
raise RuntimeError(
f"转存任务失败: task_id={task_id}, response={data}"
)
time.sleep(self.poll_interval)
raise RuntimeError(
f"转存任务超时: task_id={task_id}, 已轮询 {self.poll_max_attempts}"
)
# ─── 步骤 ⑤:发起创建分享 ─────────────────────────────────────
def _init_share(
self, fid_list: List[str], title: str, expired_type: int = 1
) -> str:
"""步骤⑤:创建分享链接。
POST /1/clouddrive/share
Body: {"fid_list": [<fid>, ...], "title": "<title>", "expired_type": 1}
Args:
fid_list: 要分享的文件 ID 列表。
title: 分享标题。
expired_type: 过期类型1=永久有效(默认)。
Returns:
task_id 字符串,用于步骤⑥轮询。
Raises:
RuntimeError: API 返回错误。
"""
url = f"{UC_SHARE_API}"
body: Dict[str, Any] = {
"fid_list": fid_list,
"title": title or "分享",
"expired_type": expired_type,
}
headers = self.credential.get_headers()
headers.setdefault("Content-Type", "application/json")
logger.info(
"[UcTransfer] ⑤ Creating share: %d files, title='%s'", len(fid_list), title
)
try:
resp = self.session.post(
url, json=body, headers=headers, timeout=self.timeout
)
resp.raise_for_status()
except requests.RequestException as exc:
raise RuntimeError(f"创建分享失败: {exc}") from exc
data: Dict[str, Any] = resp.json()
status: int = data.get("status", -1)
if status != 0 and data.get("code") not in (0, None):
raise RuntimeError(
f"创建分享请求失败: status={status}, message={data.get('message')}"
)
task_id: Optional[str] = data.get("data", {}).get("task_id")
if not task_id:
raise RuntimeError(f"分享 task_id 缺失, response: {data}")
logger.info("[UcTransfer] ⑤ Share task created: task_id=%s", task_id)
return task_id
# ─── 步骤 ⑥:轮询分享任务 ─────────────────────────────────────
def _poll_share_task(self, task_id: str) -> str:
"""步骤⑥:轮询分享任务直到完成。
GET /1/clouddrive/task?task_id=<task_id>&retry_index=0
status==2 完成,返回 share_id。
Args:
task_id: 步骤⑤返回的 task_id。
Returns:
share_id 字符串。
Raises:
RuntimeError: 任务失败或超时。
"""
url = f"{UC_API_BASE}/1/clouddrive/task"
headers = self.credential.get_headers()
for attempt in range(1, self.poll_max_attempts + 1):
params: Dict[str, str] = {
"task_id": task_id,
"retry_index": "0",
}
try:
resp = self.session.get(
url, params=params, headers=headers, timeout=self.timeout
)
resp.raise_for_status()
except requests.RequestException:
logger.warning(
"[UcTransfer] ⑥ Poll attempt %d/%d failed, retrying...",
attempt,
self.poll_max_attempts,
)
time.sleep(self.poll_interval)
continue
data: Dict[str, Any] = resp.json()
task_status: int = data.get("data", {}).get("status", -1)
logger.debug(
"[UcTransfer] ⑥ Poll %d/%d: status=%d",
attempt,
self.poll_max_attempts,
task_status,
)
if task_status == 2: # 成功
share_id: Optional[str] = data.get("data", {}).get("share_id")
if not share_id:
share_id = (
data.get("data", {}).get("result", {}).get("share_id", "")
)
if not share_id:
raise RuntimeError(f"分享完成但 share_id 缺失: {data}")
logger.info("[UcTransfer] ⑥ Share completed: share_id=%s", share_id)
return share_id
if task_status == -1:
raise RuntimeError(
f"分享任务失败: task_id={task_id}, response={data}"
)
time.sleep(self.poll_interval)
raise RuntimeError(
f"分享任务超时: task_id={task_id}, 已轮询 {self.poll_max_attempts}"
)
# ─── 步骤 ⑦:设置分享密码 ─────────────────────────────────────
def _set_password(self, share_id: str, password: str = "") -> Tuple[str, str]:
"""步骤⑦:设置分享密码并获取分享链接。
POST /1/clouddrive/share/password
Body: {"share_id": "<share_id>"}
Args:
share_id: 步骤⑥返回的 share_id。
password: 分享密码,空字符串表示无密码。
Returns:
(share_url, passcode) 元组。
Raises:
RuntimeError: API 返回错误。
"""
url = f"{UC_SHARE_API}/password"
body: Dict[str, str] = {
"share_id": share_id,
}
headers = self.credential.get_headers()
headers.setdefault("Content-Type", "application/json")
logger.info("[UcTransfer] ⑦ Setting password for share_id=%s", share_id)
try:
resp = self.session.post(
url, json=body, headers=headers, timeout=self.timeout
)
resp.raise_for_status()
except requests.RequestException as exc:
raise RuntimeError(f"设置分享密码失败: {exc}") from exc
data: Dict[str, Any] = resp.json()
status: int = data.get("status", -1)
if status != 0 and data.get("code") not in (0, None):
raise RuntimeError(
f"设置密码失败: status={status}, message={data.get('message')}"
)
share_url: str = data.get("data", {}).get("share_url", "")
passcode: str = data.get("data", {}).get("passcode", password)
if not share_url:
# 用 share_id 构造默认分享链接
share_url = f"https://drive.uc.cn/s/{share_id}"
logger.info(
"[UcTransfer] ⑦ Password set: share_url=%s, passcode=%s",
share_url,
passcode,
)
return share_url, passcode
# ─── 公开入口 ─────────────────────────────────────────────────
def transfer(
self,
share_url: str,
save_dir: str = "0",
share_password: str = "",
) -> Dict[str, Any]:
"""执行完整的 7 步转存流程。
从原始 UC 分享链接开始,将文件转存到自己网盘,再创建新分享。
Args:
share_url: 原始 UC 分享链接,如 https://drive.uc.cn/s/xxxxx。
save_dir: 转存目标目录 ID默认 "0"(根目录)。
share_password: 新分享的密码,空字符串表示无密码。
Returns:
包含以下字段的字典:
- success: bool
- new_file_ids: List[str] — 转存后的文件ID列表
- file_name: str — 分享标题
- share_url: str — 新分享链接
- passcode: str — 新分享密码
Raises:
RuntimeError: 任一步骤失败。
ValueError: URL 解析失败。
"""
# 0. 解析 URL 提取 pwd_id
match = SHARE_URL_PATTERN.search(share_url)
if not match:
raise ValueError(f"无法从URL中提取UC分享ID: {share_url}")
pwd_id: str = match.group(1)
logger.info("[UcTransfer] Starting 7-step transfer for pwd_id=%s", pwd_id)
# ① 获取 stoken
stoken: str = self._get_stoken(pwd_id)
# ② 获取分享详情
detail: Dict[str, Any] = self._get_detail(pwd_id, stoken)
# ③ 发起转存 → ④ 轮询
task_id: str = self._init_save(pwd_id, stoken, detail, to_pdir_fid=save_dir)
new_file_ids: List[str] = self._poll_save_task(task_id)
if not new_file_ids:
raise RuntimeError("转存完成但未获取到文件ID")
# ⑤ 创建分享 → ⑥ 轮询
title: str = detail.get("title", "分享")
share_task_id: str = self._init_share(new_file_ids, title)
share_id: str = self._poll_share_task(share_task_id)
# ⑦ 设置密码
share_url_new, passcode = self._set_password(share_id, share_password)
logger.info(
"[UcTransfer] Transfer complete: %d files, new_share=%s",
len(new_file_ids),
share_url_new,
)
return {
"success": True,
"new_file_ids": new_file_ids,
"file_name": title,
"share_url": share_url_new,
"passcode": passcode,
}
@staticmethod
def parse_share_url(url: str) -> Optional[str]:
"""从 UC 分享 URL 中提取 pwd_id。
Args:
url: UC 分享链接。
Returns:
pwd_id 字符串,解析失败返回 None。
"""
match = SHARE_URL_PATTERN.search(url)
return match.group(1) if match else None
def close(self) -> None:
"""关闭 HTTP 会话。"""
self.session.close()
def __enter__(self) -> "UcTransfer":
return self
def __exit__(self, *args: Any) -> None:
self.close()