Files
CloudSearch/cloudsearch_transfer/adapter/quark/transfer.py
admin 83cbfaf03f v0.2.7: 修复Redis连接 + 启动管理后台
- 修复Redis认证 (配置密码)
- 启动Python管理后台 (端口9531, 15个功能开关)
- 统一版本号 0.2.7
- 更新docker-compose.yml (镜像版本/Redis URL/Admin服务)
2026-05-17 02:22:18 +08:00

555 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
CloudSearch Transfer — 夸克网盘转存核心 v1.0.0
夸克网盘 7 步转存流程:
① POST .../share/sharepage/token → stoken
② GET .../share/sharepage/detail → fid, share_fid_token, title
③ POST .../share/sharepage/save → task_id (转存任务)
④ 轮询 GET .../task → save_as_top_fids (status==2 完成)
⑤ POST .../share → task_id (创建分享任务)
⑥ 轮询 GET .../task → share_id
⑦ POST .../share/password → share_url, passcode
参考 cloud-auto-save 的 quark.py 实现。
"""
from __future__ import annotations
import logging
import re
import time
from typing import Any, Dict, List, Optional, Tuple
import requests
from .credential import QuarkCredentialManager
logger = logging.getLogger(__name__)
# ─── 夸克 API 基础地址 ──────────────────────────────────────────────
QUARK_API_BASE = "https://drive-pc.quark.cn"
QUARK_SHARE_API = f"{QUARK_API_BASE}/1/clouddrive/share"
# ─── URL 解析正则 ───────────────────────────────────────────────────
# 匹配 pan.quark.cn/s/<share_id>
SHARE_URL_PATTERN = re.compile(r"pan\.quark\.cn/s/(\w+)")
class QuarkTransfer:
"""夸克网盘转存引擎。
封装完整的 7 步 API 流程:获取 stoken → 获取详情 → 保存文件 →
创建分享 → 设置密码。
Attributes:
credential: 夸克凭证管理器实例。
session: 复用的 requests.Session。
timeout: 请求超时(秒)。
poll_interval: 轮询间隔(秒)。
poll_max_attempts: 最大轮询次数。
"""
def __init__(
self,
credential: QuarkCredentialManager,
timeout: int = 30,
poll_interval: float = 0.5,
poll_max_attempts: int = 50,
) -> None:
"""初始化转存引擎。
Args:
credential: 有效的夸克凭证管理器。
timeout: HTTP 请求超时秒数。
poll_interval: 异步任务轮询间隔秒数。
poll_max_attempts: 异步任务最大轮询次数(默认 50同 base 层配置)。
"""
self.credential: QuarkCredentialManager = credential
self.timeout: int = timeout
self.poll_interval: float = poll_interval
self.poll_max_attempts: int = poll_max_attempts
self.session: requests.Session = requests.Session()
# ─── 步骤 ①:获取 stoken ───────────────────────────────────────
def _get_stoken(self, pwd_id: str, passcode: str = "") -> str:
"""步骤①:向夸克交换 stoken。
POST /1/clouddrive/share/sharepage/token
Body: {"passcode": "", "pwd_id": "<share_id>"}
Args:
pwd_id: 分享 ID从 URL 解析)。
passcode: 分享提取码,无密码时为空字符串。
Returns:
stoken 字符串。
Raises:
RuntimeError: API 返回错误或 stoken 缺失。
"""
url = f"{QUARK_SHARE_API}/sharepage/token"
body: Dict[str, str] = {
"passcode": passcode,
"pwd_id": pwd_id,
}
headers = self.credential.get_headers()
headers.setdefault("Content-Type", "application/json")
logger.info("[QuarkTransfer] ① Getting stoken for pwd_id=%s", pwd_id)
try:
resp = self.session.post(url, json=body, headers=headers, timeout=self.timeout)
resp.raise_for_status()
except requests.RequestException as exc:
raise RuntimeError(f"获取 stoken 失败: {exc}") from exc
data: Dict[str, Any] = resp.json()
stoken: Optional[str] = data.get("data", {}).get("stoken")
if not stoken:
raise RuntimeError(f"stoken 缺失, response: {data}")
logger.info("[QuarkTransfer] ① stoken obtained")
return stoken
# ─── 步骤 ②:获取分享详情 ─────────────────────────────────────
def _get_detail(self, pwd_id: str, stoken: str) -> Dict[str, Any]:
"""步骤②:获取分享详情。
GET /1/clouddrive/share/sharepage/detail?pwd_id=xx&stoken=xx&_fetch_share=1
返回字段包含title, fid, share_fid_token 等。
Args:
pwd_id: 分享 ID。
stoken: 步骤①获取的 stoken。
Returns:
分享详情字典。
Raises:
RuntimeError: API 返回错误。
"""
url = f"{QUARK_SHARE_API}/sharepage/detail"
params: Dict[str, str] = {
"pwd_id": pwd_id,
"stoken": stoken,
"_fetch_share": "1",
}
headers = self.credential.get_headers()
logger.info("[QuarkTransfer] ② Fetching share detail for pwd_id=%s", pwd_id)
try:
resp = self.session.get(url, params=params, headers=headers, timeout=self.timeout)
resp.raise_for_status()
except requests.RequestException as exc:
raise RuntimeError(f"获取分享详情失败: {exc}") from exc
data: Dict[str, Any] = resp.json()
status: int = data.get("status", -1)
if status != 0 and data.get("code") not in (0, None):
raise RuntimeError(f"分享详情API返回错误: status={status}, message={data.get('message')}")
detail: Optional[Dict[str, Any]] = data.get("data")
if not detail:
raise RuntimeError(f"分享详情数据为空, response: {data}")
# 提取关键字段供后续使用
logger.info(
"[QuarkTransfer] ② Detail: title=%s, fid=%s",
detail.get("title"),
detail.get("fid"),
)
return detail
# ─── 步骤 ③:发起转存 ─────────────────────────────────────────
def _init_save(self, pwd_id: str, stoken: str, detail: Dict[str, Any],
to_pdir_fid: str = "0") -> str:
"""步骤③:发起转存请求。
POST /1/clouddrive/share/sharepage/save
Body: {
"fid_list": [<fid>, ...],
"fid_token_list": [<share_fid_token>, ...],
"to_pdir_fid": "0",
"pwd_id": "<pwd_id>",
"stoken": "<stoken>",
"pdir_fid": "0",
"scene": "link"
}
Args:
pwd_id: 分享 ID。
stoken: stoken。
detail: 步骤②的分享详情。
to_pdir_fid: 目标目录 ID默认 "0" 即根目录。
Returns:
task_id 字符串,用于步骤④轮询。
Raises:
RuntimeError: API 返回错误。
"""
url = f"{QUARK_SHARE_API}/sharepage/save"
fid_list: List[str] = detail.get("fid_list", [detail.get("fid", [])])
fid_token_list: List[str] = detail.get("fid_token_list", [detail.get("share_fid_token", [])])
# 如果 detail 的 fid/fid_token 是单值而非列表,则包装为列表
if not isinstance(fid_list, list):
fid_list = [fid_list] if fid_list else []
if not isinstance(fid_token_list, list):
fid_token_list = [fid_token_list] if fid_token_list else []
body: Dict[str, Any] = {
"fid_list": fid_list,
"fid_token_list": fid_token_list,
"to_pdir_fid": to_pdir_fid,
"pwd_id": pwd_id,
"stoken": stoken,
"pdir_fid": "0",
"scene": "link",
}
headers = self.credential.get_headers()
headers.setdefault("Content-Type", "application/json")
logger.info("[QuarkTransfer] ③ Initiating save: %d files to dir=%s", len(fid_list), to_pdir_fid)
try:
resp = self.session.post(url, json=body, headers=headers, timeout=self.timeout)
resp.raise_for_status()
except requests.RequestException as exc:
raise RuntimeError(f"发起转存失败: {exc}") from exc
data: Dict[str, Any] = resp.json()
status: int = data.get("status", -1)
if status != 0:
raise RuntimeError(f"转存请求失败: status={status}, message={data.get('message')}")
task_id: Optional[str] = data.get("data", {}).get("task_id")
if not task_id:
raise RuntimeError(f"转存 task_id 缺失, response: {data}")
logger.info("[QuarkTransfer] ③ Save task created: task_id=%s", task_id)
return task_id
# ─── 步骤 ④:轮询转存任务 ─────────────────────────────────────
def _poll_save_task(self, task_id: str) -> List[str]:
"""步骤④:轮询转存任务直到完成。
GET /1/clouddrive/task?task_id=<task_id>&retry_index=0
轮询最多 poll_max_attempts 次,
当 status==2 时表示任务成功完成,
status==-1 表示失败。
Args:
task_id: 步骤③返回的 task_id。
Returns:
save_as_top_fids 列表(转存后的文件 ID
Raises:
RuntimeError: 任务失败或超时。
"""
url = f"{QUARK_API_BASE}/1/clouddrive/task"
headers = self.credential.get_headers()
for attempt in range(1, self.poll_max_attempts + 1):
params: Dict[str, str] = {
"task_id": task_id,
"retry_index": "0",
}
try:
resp = self.session.get(url, params=params, headers=headers, timeout=self.timeout)
resp.raise_for_status()
except requests.RequestException:
logger.warning("[QuarkTransfer] ④ Poll attempt %d/%d failed, retrying...",
attempt, self.poll_max_attempts)
time.sleep(self.poll_interval)
continue
data: Dict[str, Any] = resp.json()
task_status: int = data.get("data", {}).get("status", -1)
logger.debug("[QuarkTransfer] ④ Poll %d/%d: status=%d", attempt, self.poll_max_attempts, task_status)
if task_status == 2: # 成功
save_as_top_fids: List[str] = (
data.get("data", {}).get("save_as", {}).get("save_as_top_fids", [])
)
logger.info("[QuarkTransfer] ④ Save completed: %d files saved", len(save_as_top_fids))
return save_as_top_fids
if task_status == -1:
raise RuntimeError(f"转存任务失败: task_id={task_id}, response={data}")
time.sleep(self.poll_interval)
raise RuntimeError(
f"转存任务超时: task_id={task_id}, 已轮询 {self.poll_max_attempts}"
)
# ─── 步骤 ⑤:发起创建分享 ─────────────────────────────────────
def _init_share(self, fid_list: List[str], title: str,
expired_type: int = 1) -> str:
"""步骤⑤:创建分享链接。
POST /1/clouddrive/share
Body: {
"fid_list": [<fid>, ...],
"title": "<title>",
"expired_type": 1
}
Args:
fid_list: 要分享的文件 ID 列表。
title: 分享标题。
expired_type: 过期类型1=永久有效(默认)。
Returns:
task_id 字符串,用于步骤⑥轮询。
Raises:
RuntimeError: API 返回错误。
"""
url = f"{QUARK_SHARE_API}"
body: Dict[str, Any] = {
"fid_list": fid_list,
"title": title or "分享",
"expired_type": expired_type,
}
headers = self.credential.get_headers()
headers.setdefault("Content-Type", "application/json")
logger.info("[QuarkTransfer] ⑤ Creating share: %d files, title='%s'", len(fid_list), title)
try:
resp = self.session.post(url, json=body, headers=headers, timeout=self.timeout)
resp.raise_for_status()
except requests.RequestException as exc:
raise RuntimeError(f"创建分享失败: {exc}") from exc
data: Dict[str, Any] = resp.json()
status: int = data.get("status", -1)
if status != 0 and data.get("code") not in (0, None):
raise RuntimeError(f"创建分享请求失败: status={status}, message={data.get('message')}")
task_id: Optional[str] = data.get("data", {}).get("task_id")
if not task_id:
raise RuntimeError(f"分享 task_id 缺失, response: {data}")
logger.info("[QuarkTransfer] ⑤ Share task created: task_id=%s", task_id)
return task_id
# ─── 步骤 ⑥:轮询分享任务 ─────────────────────────────────────
def _poll_share_task(self, task_id: str) -> str:
"""步骤⑥:轮询分享任务直到完成。
GET /1/clouddrive/task?task_id=<task_id>&retry_index=0
轮询最多 poll_max_attempts 次status==2 完成,
返回 share_id。
Args:
task_id: 步骤⑤返回的 task_id。
Returns:
share_id 字符串。
Raises:
RuntimeError: 任务失败或超时。
"""
url = f"{QUARK_API_BASE}/1/clouddrive/task"
headers = self.credential.get_headers()
for attempt in range(1, self.poll_max_attempts + 1):
params: Dict[str, str] = {
"task_id": task_id,
"retry_index": "0",
}
try:
resp = self.session.get(url, params=params, headers=headers, timeout=self.timeout)
resp.raise_for_status()
except requests.RequestException:
logger.warning("[QuarkTransfer] ⑥ Poll attempt %d/%d failed, retrying...",
attempt, self.poll_max_attempts)
time.sleep(self.poll_interval)
continue
data: Dict[str, Any] = resp.json()
task_status: int = data.get("data", {}).get("status", -1)
logger.debug("[QuarkTransfer] ⑥ Poll %d/%d: status=%d", attempt, self.poll_max_attempts, task_status)
if task_status == 2: # 成功
share_id: Optional[str] = data.get("data", {}).get("share_id")
if not share_id:
# 有时 share_id 在嵌套位置
share_id = data.get("data", {}).get("result", {}).get("share_id", "")
if not share_id:
raise RuntimeError(f"分享完成但 share_id 缺失: {data}")
logger.info("[QuarkTransfer] ⑥ Share completed: share_id=%s", share_id)
return share_id
if task_status == -1:
raise RuntimeError(f"分享任务失败: task_id={task_id}, response={data}")
time.sleep(self.poll_interval)
raise RuntimeError(
f"分享任务超时: task_id={task_id}, 已轮询 {self.poll_max_attempts}"
)
# ─── 步骤 ⑦:设置分享密码 ─────────────────────────────────────
def _set_password(self, share_id: str, password: str = "") -> Tuple[str, str]:
"""步骤⑦:设置分享密码并获取分享链接。
POST /1/clouddrive/share/password
Body: {"share_id": "<share_id>"}
即使不设密码也要调用此 API 以获取正式的 share_url。
Args:
share_id: 步骤⑥返回的 share_id。
password: 分享密码,空字符串表示无密码。
Returns:
(share_url, passcode) 元组。
Raises:
RuntimeError: API 返回错误。
"""
url = f"{QUARK_SHARE_API}/password"
body: Dict[str, str] = {
"share_id": share_id,
}
headers = self.credential.get_headers()
headers.setdefault("Content-Type", "application/json")
logger.info("[QuarkTransfer] ⑦ Setting password for share_id=%s", share_id)
try:
resp = self.session.post(url, json=body, headers=headers, timeout=self.timeout)
resp.raise_for_status()
except requests.RequestException as exc:
raise RuntimeError(f"设置分享密码失败: {exc}") from exc
data: Dict[str, Any] = resp.json()
status: int = data.get("status", -1)
if status != 0 and data.get("code") not in (0, None):
raise RuntimeError(f"设置密码失败: status={status}, message={data.get('message')}")
share_url: str = data.get("data", {}).get("share_url", "")
passcode: str = data.get("data", {}).get("passcode", password)
if not share_url:
# 用 share_id 构造默认分享链接
share_url = f"https://pan.quark.cn/s/{share_id}"
logger.info("[QuarkTransfer] ⑦ Password set: share_url=%s, passcode=%s", share_url, passcode)
return share_url, passcode
# ─── 公开入口 ─────────────────────────────────────────────────
def transfer(
self,
share_url: str,
save_dir: str = "0",
share_password: str = "",
) -> Dict[str, Any]:
"""执行完整的 7 步转存流程。
从原始夸克分享链接开始,将文件转存到自己网盘,再创建新分享。
Args:
share_url: 原始夸克分享链接,如 https://pan.quark.cn/s/xxxxx。
save_dir: 转存目标目录 ID默认 "0"(根目录)。
share_password: 新分享的密码,空字符串表示无密码。
Returns:
包含以下字段的字典:
- success: bool
- new_file_ids: List[str] — 转存后的文件ID列表
- file_name: str — 分享标题
- share_url: str — 新分享链接
- passcode: str — 新分享密码
Raises:
RuntimeError: 任一步骤失败。
ValueError: URL 解析失败。
"""
# 0. 解析 URL 提取 pwd_id
match = SHARE_URL_PATTERN.search(share_url)
if not match:
raise ValueError(f"无法从URL中提取夸克分享ID: {share_url}")
pwd_id: str = match.group(1)
logger.info("[QuarkTransfer] Starting 7-step transfer for pwd_id=%s", pwd_id)
# ① 获取 stoken
stoken: str = self._get_stoken(pwd_id)
# ② 获取分享详情
detail: Dict[str, Any] = self._get_detail(pwd_id, stoken)
# ③ 发起转存
save_task_id: str = self._init_save(pwd_id, stoken, detail, to_pdir_fid=save_dir)
# ④ 轮询转存任务
new_fids: List[str] = self._poll_save_task(save_task_id)
if not new_fids:
raise RuntimeError("转存完成但未获取到文件ID")
# ⑤ 发起创建分享
title: str = detail.get("title", "分享")
share_task_id: str = self._init_share(new_fids, title)
# ⑥ 轮询分享任务
share_id: str = self._poll_share_task(share_task_id)
# ⑦ 设置密码
new_share_url, passcode = self._set_password(share_id, share_password)
result: Dict[str, Any] = {
"success": True,
"new_file_ids": new_fids,
"file_name": title,
"share_url": new_share_url,
"passcode": passcode,
}
logger.info("[QuarkTransfer] 7-step transfer complete: %s", result)
return result
@staticmethod
def parse_share_url(url: str) -> Optional[str]:
"""从夸克分享链接中提取 pwd_id。
Args:
url: 夸克分享链接。
Returns:
pwd_id 字符串,解析失败返回 None。
"""
match = SHARE_URL_PATTERN.search(url)
return match.group(1) if match else None
def close(self) -> None:
"""关闭 HTTP 会话。"""
self.session.close()
def __enter__(self) -> "QuarkTransfer":
return self
def __exit__(self, *args: Any) -> None:
self.close()