Files
CloudSearch/cloudsearch_transfer/adapter/xunlei/transfer.py
admin 83cbfaf03f v0.2.7: 修复Redis连接 + 启动管理后台
- 修复Redis认证 (配置密码)
- 启动Python管理后台 (端口9531, 15个功能开关)
- 统一版本号 0.2.7
- 更新docker-compose.yml (镜像版本/Redis URL/Admin服务)
2026-05-17 02:22:18 +08:00

519 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
CloudSearch Transfer — 迅雷网盘转存核心 v1.0.0
迅雷网盘 4 步转存流程:
① GET .../drive/v1/share?share_id=xx → pass_code_token, files[], title
② POST .../share/restore → restore_task_id (转存)
③ 轮询 GET .../tasks/{task_id} → progress==100, trace_file_ids → oldId→newId映射
④ POST .../share → share_url + pass_code
迅雷网盘需要 refresh_token + captcha_token 双重认证。
"""
from __future__ import annotations
import json
import logging
import re
import time
from typing import Any, Dict, List, Optional, Tuple
import requests
from .credential import XunleiCredentialManager
logger = logging.getLogger(__name__)
# ─── 迅雷 API 基础地址 ──────────────────────────────────────────────
XUNLEI_PAN_API = "https://api-pan.xunlei.com"
# ─── URL 解析正则 ───────────────────────────────────────────────────
# 匹配 pan.xunlei.com/s/<share_id>
SHARE_URL_PATTERN = re.compile(r"pan\.xunlei\.com/s/([A-Za-z0-9]+)")
class XunleiTransfer:
"""迅雷网盘转存引擎。
封装完整的 4 步 API 流程:获取分享详情 → 转存文件 →
轮询转存任务 → 创建新分享。
Attributes:
credential: 迅雷凭证管理器实例。
session: 复用的 requests.Session。
timeout: 请求超时(秒)。
poll_interval: 轮询间隔(秒)。
poll_max_attempts: 最大轮询次数。
"""
def __init__(
self,
credential: XunleiCredentialManager,
timeout: int = 30,
poll_interval: float = 1.0,
poll_max_attempts: int = 60,
) -> None:
"""初始化转存引擎。
Args:
credential: 有效的迅雷凭证管理器。
timeout: HTTP 请求超时秒数。
poll_interval: 异步任务轮询间隔秒数。
poll_max_attempts: 异步任务最大轮询次数。
"""
self.credential: XunleiCredentialManager = credential
self.timeout: int = timeout
self.poll_interval: float = poll_interval
self.poll_max_attempts: int = poll_max_attempts
self.session: requests.Session = requests.Session()
# ─── 步骤 ①:获取分享详情 ─────────────────────────────────────
def _get_share_info(self, share_id: str) -> Dict[str, Any]:
"""步骤①:获取分享详情。
GET /drive/v1/share?share_id=<share_id>
返回字段包含pass_code_token, files[], title 等。
Args:
share_id: 分享 ID从 URL 解析)。
Returns:
分享信息字典,包含 files, title, pass_code_token。
Raises:
RuntimeError: API 返回错误。
"""
url = f"{XUNLEI_PAN_API}/drive/v1/share"
params: Dict[str, str] = {"share_id": share_id}
headers = self.credential.get_headers()
logger.info("[XunleiTransfer] ① Fetching share info for share_id=%s", share_id)
try:
resp = self.session.get(
url, params=params, headers=headers, timeout=self.timeout
)
resp.raise_for_status()
except requests.RequestException as exc:
raise RuntimeError(f"获取分享详情失败: {exc}") from exc
data: Dict[str, Any] = resp.json()
# 检查业务错误
errcode = data.get("errcode", data.get("error_code", 0))
if errcode != 0:
raise RuntimeError(
f"分享详情API返回错误: errcode={errcode}, message={data.get('message', data.get('error', ''))}"
)
# 提取关键字段
pass_code_token: str = data.get("pass_code_token", "")
files: List[Dict[str, Any]] = data.get("files", [])
title: str = data.get("title", data.get("share_name", ""))
if not files:
raise RuntimeError("分享内容为空")
logger.info(
"[XunleiTransfer] ① Share info: title=%s, files=%d, has_pass_code_token=%s",
title,
len(files),
bool(pass_code_token),
)
return {
"pass_code_token": pass_code_token,
"files": files,
"title": title,
"share_id": share_id,
}
# ─── 步骤 ②:转存文件 ─────────────────────────────────────────
def _restore_files(
self,
share_id: str,
pass_code_token: str,
file_ids: List[str],
parent_id: str = "",
) -> str:
"""步骤②:转存文件到自己的迅雷网盘。
POST /drive/v1/share/restore
Body: {
"file_ids": ["<fid1>", ...],
"pass_code_token": "<token>",
"share_id": "<share_id>",
"parent_id": "",
"specify_parent_id": true
}
Args:
share_id: 分享 ID。
pass_code_token: 步骤①获取的 pass_code_token。
file_ids: 要转存的文件 ID 列表。
parent_id: 目标父目录 ID空字符串表示根目录。
Returns:
restore_task_id 字符串,用于步骤③轮询。
Raises:
RuntimeError: API 返回错误。
"""
url = f"{XUNLEI_PAN_API}/drive/v1/share/restore"
body: Dict[str, Any] = {
"file_ids": file_ids,
"pass_code_token": pass_code_token,
"share_id": share_id,
"parent_id": parent_id or "",
"specify_parent_id": True,
}
# restore 操作可能需要 captcha_token
headers = self.credential.get_headers_with_captcha(action="restore")
headers.setdefault("Content-Type", "application/json")
logger.info(
"[XunleiTransfer] ② Restoring %d files from share_id=%s",
len(file_ids),
share_id,
)
try:
resp = self.session.post(
url, json=body, headers=headers, timeout=self.timeout
)
resp.raise_for_status()
except requests.RequestException as exc:
raise RuntimeError(f"转存请求失败: {exc}") from exc
data: Dict[str, Any] = resp.json()
errcode = data.get("errcode", data.get("error_code", 0))
if errcode != 0:
raise RuntimeError(
f"转存请求失败: errcode={errcode}, message={data.get('message', data.get('error', ''))}"
)
task_id: Optional[str] = data.get("restore_task_id", data.get("task_id"))
if not task_id:
raise RuntimeError(f"转存 task_id 缺失, response: {data}")
logger.info("[XunleiTransfer] ② Restore task created: task_id=%s", task_id)
return task_id
# ─── 步骤 ③:轮询转存任务 ─────────────────────────────────────
def _poll_restore_task(self, task_id: str) -> Dict[str, str]:
"""步骤③:轮询转存任务直到完成。
GET /drive/v1/tasks/{task_id}
当 progress==100 时表示完成,返回 oldId→newId 映射。
从 params.trace_file_ids 解析 JSON 字符串获取映射关系。
Args:
task_id: 步骤②返回的 restore_task_id。
Returns:
{"oldId": "newId", ...} 文件 ID 映射字典。
Raises:
RuntimeError: 任务失败或超时。
"""
url = f"{XUNLEI_PAN_API}/drive/v1/tasks/{task_id}"
headers = self.credential.get_headers()
for attempt in range(1, self.poll_max_attempts + 1):
try:
resp = self.session.get(url, headers=headers, timeout=self.timeout)
resp.raise_for_status()
except requests.RequestException:
logger.warning(
"[XunleiTransfer] ③ Poll attempt %d/%d failed, retrying...",
attempt,
self.poll_max_attempts,
)
time.sleep(self.poll_interval)
continue
data: Dict[str, Any] = resp.json()
progress: int = data.get("progress", 0)
status: str = data.get("status", "")
logger.debug(
"[XunleiTransfer] ③ Poll %d/%d: progress=%d, status=%s",
attempt,
self.poll_max_attempts,
progress,
status,
)
if status == "failed" or status == "error":
raise RuntimeError(
f"转存任务失败: task_id={task_id}, status={status}"
)
if progress == 100:
# 从 params.trace_file_ids 解析 oldId→newId 映射
params: Dict[str, Any] = data.get("params", {})
trace_file_ids: str = params.get("trace_file_ids", "")
if trace_file_ids:
try:
id_mapping: Dict[str, str] = json.loads(trace_file_ids)
logger.info(
"[XunleiTransfer] ③ Restore completed: %d files mapped",
len(id_mapping),
)
return id_mapping
except json.JSONDecodeError:
logger.warning(
"[XunleiTransfer] ③ Failed to parse trace_file_ids: %s",
trace_file_ids,
)
# fallback: 检查 result 字段
result = data.get("result", {})
if result:
logger.info("[XunleiTransfer] ③ Restore completed via result field")
return result
# 最后的 fallback: 返回空映射
logger.warning(
"[XunleiTransfer] ③ Restore completed but no file mapping found"
)
return {}
if progress < 0:
raise RuntimeError(
f"转存任务异常: task_id={task_id}, progress={progress}"
)
time.sleep(self.poll_interval)
raise RuntimeError(
f"转存任务超时: task_id={task_id}, 已轮询 {self.poll_max_attempts}"
)
# ─── 步骤 ④:创建新分享 ─────────────────────────────────────
def _create_share(
self,
file_ids: List[str],
expiration_days: str = "-1",
) -> Tuple[str, str]:
"""步骤④:创建新分享链接。
POST /drive/v1/share
Body: {
"file_ids": ["<fid1>", ...],
"expiration_days": "-1"
}
expiration_days: "-1" 表示永久有效。
Args:
file_ids: 要分享的文件 ID 列表。
expiration_days: 过期天数,"-1" 表示永久。
Returns:
(share_url, pass_code) 元组。
Raises:
RuntimeError: API 返回错误。
"""
url = f"{XUNLEI_PAN_API}/drive/v1/share"
body: Dict[str, Any] = {
"file_ids": file_ids,
"expiration_days": expiration_days,
}
# share 操作可能需要 captcha_token
headers = self.credential.get_headers_with_captcha(action="share")
headers.setdefault("Content-Type", "application/json")
logger.info(
"[XunleiTransfer] ④ Creating share: %d files", len(file_ids)
)
try:
resp = self.session.post(
url, json=body, headers=headers, timeout=self.timeout
)
resp.raise_for_status()
except requests.RequestException as exc:
raise RuntimeError(f"创建分享失败: {exc}") from exc
data: Dict[str, Any] = resp.json()
errcode = data.get("errcode", data.get("error_code", 0))
if errcode != 0:
raise RuntimeError(
f"创建分享失败: errcode={errcode}, message={data.get('message', data.get('error', ''))}"
)
share_url: str = data.get("share_url", data.get("link", ""))
pass_code: str = data.get("pass_code", data.get("code", ""))
if not share_url:
share_id = data.get("share_id", "")
if share_id:
share_url = f"https://pan.xunlei.com/s/{share_id}"
logger.info(
"[XunleiTransfer] ④ Share created: url=%s, pass_code=%s",
share_url,
pass_code,
)
return share_url, pass_code
# ─── 公开入口 ─────────────────────────────────────────────────
def transfer(
self,
share_url: str,
save_dir: str = "",
share_password: str = "",
) -> Dict[str, Any]:
"""执行完整的 4 步转存流程。
从原始迅雷分享链接开始,将文件转存到自己网盘,再创建新分享。
Args:
share_url: 原始迅雷分享链接,如 https://pan.xunlei.com/s/xxxxx。
save_dir: 转存目标目录 ID空字符串表示根目录。
share_password: 新分享的密码(迅雷使用 pass_code
Returns:
包含以下字段的字典:
- success: bool
- new_file_ids: List[str] — 转存后的文件ID列表newId
- file_name: str — 分享标题
- share_url: str — 新分享链接
- passcode: str — 新分享 pass_code
Raises:
RuntimeError: 任一步骤失败。
ValueError: URL 解析失败。
"""
# 0. 解析 URL 提取 share_id
match = SHARE_URL_PATTERN.search(share_url)
if not match:
raise ValueError(f"无法从URL中提取迅雷分享ID: {share_url}")
share_id: str = match.group(1)
logger.info(
"[XunleiTransfer] Starting 4-step transfer for share_id=%s", share_id
)
# ① 获取分享详情
share_info: Dict[str, Any] = self._get_share_info(share_id)
files: List[Dict[str, Any]] = share_info.get("files", [])
title: str = share_info.get("title", "分享")
pass_code_token: str = share_info.get("pass_code_token", "")
# 提取原始文件 ID
file_ids: List[str] = [
f.get("file_id", f.get("fid", f.get("id", "")))
for f in files
if f.get("file_id") or f.get("fid") or f.get("id")
]
if not file_ids:
raise RuntimeError("无法从分享中提取文件ID")
# ② 发起转存
task_id: str = self._restore_files(
share_id, pass_code_token, file_ids, parent_id=save_dir
)
# ③ 轮询转存任务 → 获取 oldId→newId 映射
id_mapping: Dict[str, str] = self._poll_restore_task(task_id)
# 从映射中提取新的文件 ID
new_file_ids: List[str] = []
for old_fid in file_ids:
new_fid = id_mapping.get(old_fid, "")
if new_fid:
new_file_ids.append(new_fid)
else:
logger.warning(
"[XunleiTransfer] No newId mapped for old_fid=%s", old_fid
)
if not new_file_ids:
raise RuntimeError("转存完成但未获取到新文件ID")
# ④ 创建新分享
share_url_new, pass_code = self._create_share(new_file_ids)
logger.info(
"[XunleiTransfer] Transfer complete: %d files, new_share=%s",
len(new_file_ids),
share_url_new,
)
return {
"success": True,
"new_file_ids": new_file_ids,
"file_name": title,
"share_url": share_url_new,
"passcode": pass_code or share_password,
}
@staticmethod
def parse_share_url(url: str) -> Optional[str]:
"""从迅雷分享 URL 中提取 share_id。
Args:
url: 迅雷分享链接。
Returns:
share_id 字符串,解析失败返回 None。
"""
match = SHARE_URL_PATTERN.search(url)
return match.group(1) if match else None
@staticmethod
def extract_file_ids(files: List[Dict[str, Any]]) -> List[str]:
"""从文件列表中提取 file_id。
Args:
files: 文件信息字典列表。
Returns:
file_id 字符串列表。
"""
return [
f.get("file_id", f.get("fid", f.get("id", "")))
for f in files
if f.get("file_id") or f.get("fid") or f.get("id")
]
@staticmethod
def parse_trace_file_ids(trace: str) -> Dict[str, str]:
"""解析 trace_file_ids JSON 字符串为 oldId→newId 映射。
Args:
trace: trace_file_ids JSON 字符串,如 '{"oldId":"newId"}'.
Returns:
{"oldId": "newId", ...} 映射字典。
"""
try:
return json.loads(trace)
except (json.JSONDecodeError, TypeError):
return {}
def close(self) -> None:
"""关闭 HTTP 会话。"""
self.session.close()
def __enter__(self) -> "XunleiTransfer":
return self
def __exit__(self, *args: Any) -> None:
self.close()