Files
CloudSearch/cloudsearch_transfer/adapter/quark/__init__.py
admin 83cbfaf03f v0.2.7: 修复Redis连接 + 启动管理后台
- 修复Redis认证 (配置密码)
- 启动Python管理后台 (端口9531, 15个功能开关)
- 统一版本号 0.2.7
- 更新docker-compose.yml (镜像版本/Redis URL/Admin服务)
2026-05-17 02:22:18 +08:00

510 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
CloudSearch Transfer — 夸克网盘适配器 v1.0.0
将 QuarkCredentialManager、QuarkTransfer、QuarkCleanup 组合为
BaseCloudDriveAdapter 的完整实现。
夸克网盘 7 步 API 转存流程:
① POST .../share/sharepage/token → stoken
② GET .../share/sharepage/detail → fid, share_fid_token, title
③ POST .../share/sharepage/save → task_id (转存)
④ 轮询 GET .../task → save_as_top_fids
⑤ POST .../share → task_id (创建分享)
⑥ 轮询 GET .../task → share_id
⑦ POST .../share/password → share_url, passcode
参考 cloud-auto-save 的 quark 实现 + netdisk 的 Pan 接口约定。
"""
from __future__ import annotations
import logging
import time
from typing import Any, Dict, List, Optional, Tuple
from ..base import BaseCloudDriveAdapter, FileInfo, TransferResult, VerifyResult
from ...config import PlatformConfig, TransferConfig
from ...errors import TransferError, TransferErrorCode
from .credential import QuarkCredentialManager
from .transfer import QuarkTransfer, SHARE_URL_PATTERN
from .cleanup import QuarkCleanup
logger = logging.getLogger(__name__)
class QuarkAdapter(BaseCloudDriveAdapter):
"""夸克网盘适配器。
组合 credential / transfer / cleanup 三个模块,
实现 BaseCloudDriveAdapter 定义的所有抽象方法。
Attributes:
PLATFORM_NAME: 展示用平台名称。
PLATFORM_KEY: 内部平台标识。
URL_PATTERNS: 夸克分享链接匹配正则列表。
"""
# ─── 平台标识 ──────────────────────────────────────────────
PLATFORM_NAME: str = "夸克网盘"
PLATFORM_KEY: str = "quark"
# ─── URL 匹配 ──────────────────────────────────────────────
# 支持 pan.quark.cn/s/<share_id>
URL_PATTERNS: List[str] = [
r"pan\.quark\.cn/s/(\w+)",
]
def __init__(self, config: PlatformConfig, transfer_config: TransferConfig) -> None:
"""初始化夸克适配器。
Args:
config: 平台配置(含 Cookie 等)。
transfer_config: 全局转存配置(超时、重试、轮询参数等)。
"""
super().__init__(config, transfer_config)
# 初始化三个子模块
self._credential: QuarkCredentialManager = QuarkCredentialManager(
cookie=config.cookie
)
self._transfer_engine: QuarkTransfer = QuarkTransfer(
credential=self._credential,
timeout=transfer_config.request_timeout,
poll_interval=transfer_config.task_poll_interval,
poll_max_attempts=transfer_config.task_poll_max_attempts,
)
self._cleanup: QuarkCleanup = QuarkCleanup(
credential=self._credential,
timeout=transfer_config.request_timeout,
)
# ═══════════════════════════════════════════════════════════════
# 公开接口实现
# ═══════════════════════════════════════════════════════════════
def _setup_session(self) -> None:
"""将夸克 Cookie 注入 session 的默认 headers。"""
headers = self._credential.get_headers()
if headers:
self.session.headers.update(headers)
logger.debug("[QuarkAdapter] Session headers updated with Cookie")
# ─── transfer() 使用基类模板,子类实现 _transfer ──────────
def _transfer(self, share_url: str, save_dir: str = "",
share_password: str = "") -> TransferResult:
"""执行转存的核心逻辑(被基类 transfer() 调用)。
通过 QuarkTransfer 引擎执行完整的 7 步流程。
Args:
share_url: 夸克分享链接。
save_dir: 目标目录,空则使用配置的默认目录。
share_password: 新分享的密码。
Returns:
TransferResult 包含转存结果。
"""
start: float = time.time()
# 凭证检查
if not self._credential.validate():
raise TransferError(
TransferErrorCode.NOT_LOGIN,
message="夸克 Cookie 无效或长度不足",
platform=self.PLATFORM_KEY,
)
# 目标目录:默认根目录 "0"
target_dir: str = save_dir or self.config.save_dir or "0"
# 分享密码
pwd: str = share_password or self.config.share_password or ""
try:
result: Dict[str, Any] = self._transfer_engine.transfer(
share_url=share_url,
save_dir=target_dir,
share_password=pwd,
)
except ValueError as exc:
raise TransferError(
TransferErrorCode.URL_INVALID,
message=str(exc),
platform=self.PLATFORM_KEY,
) from exc
except RuntimeError as exc:
msg: str = str(exc)
if "stoken" in msg or "status" in msg:
raise TransferError(
TransferErrorCode.SHARE_NOT_EXIST,
message=msg,
platform=self.PLATFORM_KEY,
) from exc
raise TransferError(
TransferErrorCode.NETWORK_ERROR,
message=msg,
platform=self.PLATFORM_KEY,
) from exc
elapsed: int = int((time.time() - start) * 1000)
# 广告过滤:在转存完成后对 new_file_ids 进行过滤
new_fids: List[str] = result.get("new_file_ids", [])
if self.transfer_config.ad_filter_enabled and new_fids:
new_fids = self._filter_ads(new_fids)
if not new_fids:
raise TransferError(
TransferErrorCode.RESOURCE_EMPTY,
platform=self.PLATFORM_KEY,
)
return TransferResult(
success=True,
platform=self.PLATFORM_KEY,
new_file_id=",".join(new_fids),
file_name=result.get("file_name", ""),
share_url=result.get("share_url", ""),
share_password=result.get("passcode", pwd),
original_url=share_url,
elapsed_ms=elapsed,
)
# ─── verify() 使用基类模板,子类实现 _verify ───────────────
def _verify(self, share_url: str) -> VerifyResult:
"""验证夸克分享链接有效性。
通过获取 stoken → 获取详情来验证链接。
Args:
share_url: 夸克分享链接。
Returns:
VerifyResult 包含验证结果。
"""
try:
pwd_id, passcode = self._parse_share_url(share_url)
if not self._credential.validate():
return VerifyResult(
valid=False,
platform=self.PLATFORM_KEY,
error=TransferError(
TransferErrorCode.NOT_LOGIN,
platform=self.PLATFORM_KEY,
),
)
stoken: str = self._transfer_engine._get_stoken(pwd_id, passcode)
detail: Dict[str, Any] = self._transfer_engine._get_detail(pwd_id, stoken)
files: List[FileInfo] = self._extract_file_list(detail)
return VerifyResult(
valid=True,
platform=self.PLATFORM_KEY,
title=detail.get("title", ""),
file_count=len(files),
files=files,
)
except TransferError:
raise
except (ValueError, RuntimeError) as exc:
return VerifyResult(
valid=False,
platform=self.PLATFORM_KEY,
error=TransferError(
TransferErrorCode.SHARE_NOT_EXIST,
message=str(exc),
platform=self.PLATFORM_KEY,
),
)
except Exception as exc:
return VerifyResult(
valid=False,
platform=self.PLATFORM_KEY,
error=TransferError(
TransferErrorCode.NETWORK_ERROR,
message=str(exc),
platform=self.PLATFORM_KEY,
),
)
# ─── 核心抽象方法 ─────────────────────────────────────────
def _get_share_detail(self, pwd_id: str, passcode: str = "") -> dict:
"""获取夸克分享详情(基类 transfer() 流程中的步骤②)。
Args:
pwd_id: 分享 ID。
passcode: 提取码。
Returns:
分享详情字典,包含 title, fid, share_fid_token 等字段。
"""
stoken: str = self._transfer_engine._get_stoken(pwd_id, passcode)
return self._transfer_engine._get_detail(pwd_id, stoken)
def _save_files(self, pwd_id: str, detail: dict, save_dir: str) -> List[str]:
"""转存文件到自己的夸克网盘(基类 transfer() 流程中的步骤③④)。
Args:
pwd_id: 分享 ID。
detail: 分享详情(来自 _get_share_detail
save_dir: 目标目录 ID。
Returns:
转存后的新文件 ID 列表。
"""
# 需要 stoken从 detail 间接获取(重新请求)
stoken: str = self._transfer_engine._get_stoken(pwd_id)
task_id: str = self._transfer_engine._init_save(
pwd_id, stoken, detail, to_pdir_fid=save_dir
)
return self._transfer_engine._poll_save_task(task_id)
def _create_share(self, file_ids: List[str], title: str,
password: str = "") -> Tuple[str, str]:
"""创建夸克分享链接(基类 transfer() 流程中的步骤⑤⑥⑦)。
Args:
file_ids: 要分享的文件 ID 列表。
title: 分享标题。
password: 分享密码。
Returns:
(share_url, share_password) 元组。
"""
task_id: str = self._transfer_engine._init_share(file_ids, title)
share_id: str = self._transfer_engine._poll_share_task(task_id)
return self._transfer_engine._set_password(share_id, password)
def _extract_file_list(self, detail: dict) -> List[FileInfo]:
"""从夸克分享详情中提取文件列表。
夸克的 sharepage/detail 返回格式:
{
"files": [
{"fid": "...", "file_name": "...", "size": 123, "dir": false, ...},
]
}
Args:
detail: 分享详情字典。
Returns:
FileInfo 对象列表。
"""
files_data: List[Dict[str, Any]] = detail.get("files", [])
result: List[FileInfo] = []
for f in files_data:
file_info = FileInfo(
fid=str(f.get("fid", f.get("file_id", ""))),
name=str(f.get("file_name", f.get("name", ""))),
size=int(f.get("size", 0)),
is_dir=bool(f.get("dir", f.get("is_dir", False))),
ext=str(f.get("ext", f.get("file_extension", ""))),
)
result.append(file_info)
# 如果 files 为空,尝试用 detail 顶层字段构造单个文件信息
if not result and detail.get("fid"):
result.append(FileInfo(
fid=str(detail.get("fid", "")),
name=str(detail.get("title", detail.get("file_name", ""))),
size=0,
is_dir=False,
))
return result
def _filter_ads(self, file_ids: List[str]) -> List[str]:
"""过滤广告文件。
合并配置层和平台层的 banned_keywords调用 QuarkCleanup 执行过滤。
当前实现基于 file_ids 列表过滤(无文件名信息时保持原样)。
Args:
file_ids: 文件 ID 列表。
Returns:
过滤后的文件 ID 列表。
"""
keywords: List[str] = list(
set(self.config.banned_keywords)
| set(self.transfer_config.default_banned_keywords)
)
if not keywords:
return file_ids
# 获取文件信息以进行名称匹配
# 在基类 transfer() 流程中,此处 file_ids 已为转存后的新 IDs
try:
files: List[FileInfo] = self.get_files()
file_names: List[str] = [f.name for f in files]
return QuarkCleanup.filter_ad_ids(file_ids, file_names, keywords)
except Exception:
# 如果无法获取文件名列表,跳过广告过滤
logger.warning("[QuarkAdapter] Cannot fetch file list for ad filtering, skipping")
return file_ids
# ─── get_files / delete ────────────────────────────────────
def get_files(self, parent_fid: str = "0") -> List[FileInfo]:
"""列出夸克网盘指定目录下的文件。
GET /1/clouddrive/file/sort?pdir_fid=<parent_fid>&_page=1&_size=100&_sort=updated_at:desc
Args:
parent_fid: 父目录 ID默认 "0" 即根目录。
Returns:
FileInfo 列表。
"""
url: str = "https://drive-pc.quark.cn/1/clouddrive/file/sort"
params: Dict[str, str] = {
"pdir_fid": parent_fid,
"_page": "1",
"_size": "100",
"_sort": "updated_at:desc",
}
headers: Dict[str, str] = self._credential.get_headers()
try:
resp = self._get(url, params=params, headers=headers)
except Exception as exc:
raise TransferError(
TransferErrorCode.NETWORK_ERROR,
message=f"获取文件列表失败: {exc}",
platform=self.PLATFORM_KEY,
) from exc
data: Dict[str, Any] = resp.json()
status: int = data.get("status", -1)
if status != 0 and data.get("code") not in (0, None):
raise TransferError(
TransferErrorCode.NETWORK_ERROR,
message=f"获取文件列表失败: {data.get('message')}",
platform=self.PLATFORM_KEY,
)
files_data: List[Dict[str, Any]] = data.get("data", {}).get("list", [])
result: List[FileInfo] = []
for f in files_data:
result.append(FileInfo(
fid=str(f.get("fid", "")),
name=str(f.get("file_name", f.get("name", ""))),
size=int(f.get("size", 0)),
is_dir=bool(f.get("dir", f.get("is_dir", False))),
ext=str(f.get("file_extension", f.get("ext", ""))),
))
logger.debug("[QuarkAdapter] Listed %d files in dir=%s", len(result), parent_fid)
return result
def delete(self, file_ids: List[str]) -> bool:
"""删除夸克网盘文件(移到回收站)。
Args:
file_ids: 要删除的文件 ID 列表。
Returns:
True 表示删除成功。
"""
if not self._credential.validate():
raise TransferError(
TransferErrorCode.NOT_LOGIN,
platform=self.PLATFORM_KEY,
)
try:
return self._cleanup.delete_files(file_ids)
except RuntimeError as exc:
raise TransferError(
TransferErrorCode.NETWORK_ERROR,
message=str(exc),
platform=self.PLATFORM_KEY,
) from exc
def delete_permanent(self, file_ids: List[str]) -> bool:
"""彻底删除夸克网盘文件(不可恢复)。
Args:
file_ids: 要彻底删除的文件 ID 列表。
Returns:
True 表示删除成功。
"""
if not self._credential.validate():
raise TransferError(
TransferErrorCode.NOT_LOGIN,
platform=self.PLATFORM_KEY,
)
try:
return self._cleanup.delete_files_permanent(file_ids)
except RuntimeError as exc:
raise TransferError(
TransferErrorCode.NETWORK_ERROR,
message=str(exc),
platform=self.PLATFORM_KEY,
) from exc
# ─── 工具方法 ─────────────────────────────────────────────
def _parse_share_url(self, url: str) -> Tuple[str, str]:
"""解析夸克分享 URL 提取 (pwd_id, passcode)。
夸克链接格式https://pan.quark.cn/s/<pwd_id> 或带 ?pwd=xxxx
Args:
url: 夸克分享链接。
Returns:
(pwd_id, passcode) 元组。
Raises:
TransferError: URL 格式无法识别。
"""
pwd_id: Optional[str] = QuarkTransfer.parse_share_url(url)
if not pwd_id:
raise TransferError(
TransferErrorCode.URL_INVALID,
message=f"无法解析夸克链接: {url}",
platform=self.PLATFORM_KEY,
)
# 提取密码参数
from urllib.parse import urlparse, parse_qs
parsed = urlparse(url)
params = parse_qs(parsed.query)
passcode: str = params.get("pwd", params.get("code", [""]))[0]
return pwd_id, passcode
def update_cookie(self, cookie: str) -> None:
"""动态更新 Cookie 并同步到 session headers。
Args:
cookie: 新的 Cookie 字符串。
"""
self._credential.update_cookie(cookie)
self._setup_session()
logger.info("[QuarkAdapter] Cookie updated, new length=%d", len(cookie))
def close(self) -> None:
"""关闭所有子模块的 HTTP 会话。"""
self._transfer_engine.close()
self._cleanup.close()
self.session.close()
def __repr__(self) -> str:
return (
f"QuarkAdapter(name={self.PLATFORM_NAME}, "
f"account={self.config.account_name}, "
f"credential_valid={self._credential.validate()})"
)