- 修复Redis认证 (配置密码) - 启动Python管理后台 (端口9531, 15个功能开关) - 统一版本号 0.2.7 - 更新docker-compose.yml (镜像版本/Redis URL/Admin服务)
331 lines
12 KiB
Python
331 lines
12 KiB
Python
"""
|
||
CloudSearch Transfer — 适配器抽象基类 v1.0.0
|
||
参考 cloud-auto-save 的 BaseCloudDriveAdapter + netdisk 的 Pan 接口
|
||
"""
|
||
|
||
import time
|
||
import re
|
||
import logging
|
||
from abc import ABC, abstractmethod
|
||
from dataclasses import dataclass
|
||
from typing import Optional, List, Tuple, Dict, Any
|
||
from urllib.parse import urlparse, parse_qs
|
||
|
||
import requests
|
||
|
||
from ..config import PlatformConfig, TransferConfig
|
||
from ..errors import TransferError, TransferErrorCode
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
@dataclass
|
||
class FileInfo:
|
||
"""文件信息"""
|
||
fid: str # 文件ID
|
||
name: str # 文件名
|
||
size: int = 0 # 文件大小
|
||
is_dir: bool = False
|
||
ext: str = "" # 扩展名
|
||
|
||
|
||
@dataclass
|
||
class TransferResult:
|
||
"""转存结果"""
|
||
success: bool
|
||
platform: str
|
||
new_file_id: str = "" # 转存后的文件ID
|
||
file_name: str = "" # 文件名
|
||
share_url: str = "" # 新的分享链接
|
||
share_password: str = "" # 分享密码
|
||
original_url: str = "" # 原始分享链接
|
||
elapsed_ms: int = 0 # 耗时
|
||
error: Optional[TransferError] = None
|
||
|
||
|
||
@dataclass
|
||
class VerifyResult:
|
||
"""链接验证结果"""
|
||
valid: bool
|
||
platform: str
|
||
title: str = ""
|
||
file_count: int = 0
|
||
files: List[FileInfo] = None
|
||
error: Optional[TransferError] = None
|
||
|
||
def __post_init__(self):
|
||
if self.files is None:
|
||
self.files = []
|
||
|
||
|
||
class BaseCloudDriveAdapter(ABC):
|
||
"""
|
||
网盘适配器抽象基类
|
||
|
||
每个网盘平台实现此基类,统一接口:
|
||
- transfer(): 转存分享到自己网盘 → 创建新分享
|
||
- verify(): 验证分享链接有效性
|
||
- get_files(): 列出目录文件
|
||
- delete(): 删除文件
|
||
"""
|
||
|
||
# 子类必须覆盖
|
||
PLATFORM_NAME: str = ""
|
||
PLATFORM_KEY: str = "" # quark/baidu/aliyun/uc/xunlei/pan123/cloud189
|
||
|
||
# URL匹配正则(子类覆盖)
|
||
URL_PATTERNS: List[str] = []
|
||
|
||
# 默认请求头
|
||
DEFAULT_HEADERS: Dict[str, str] = {
|
||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
|
||
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
||
"Chrome/135.0.0.0 Safari/537.36",
|
||
"Accept": "application/json, text/plain, */*",
|
||
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
|
||
}
|
||
|
||
def __init__(self, config: PlatformConfig, transfer_config: TransferConfig):
|
||
self.config = config
|
||
self.transfer_config = transfer_config
|
||
self.session = requests.Session()
|
||
self.session.headers.update(self.DEFAULT_HEADERS)
|
||
self._setup_session()
|
||
|
||
def _setup_session(self):
|
||
"""子类可覆盖,初始化session特有的headers/cookies"""
|
||
pass
|
||
|
||
# ─── 公开接口 ──────────────────────────────────────────
|
||
|
||
def transfer(self, share_url: str, save_dir: str = "",
|
||
share_password: str = "") -> TransferResult:
|
||
"""
|
||
转存分享到自己网盘 → 创建新分享
|
||
|
||
Args:
|
||
share_url: 原始分享链接
|
||
save_dir: 转存到的目录(空=使用配置的默认目录)
|
||
share_password: 新分享的密码(空=使用配置的密码)
|
||
"""
|
||
start = time.time()
|
||
try:
|
||
# 1. 解析URL提取pwd_id
|
||
pwd_id, passcode = self._parse_share_url(share_url)
|
||
|
||
# 2. 获取分享详情
|
||
detail = self._get_share_detail(pwd_id, passcode)
|
||
if not detail:
|
||
raise TransferError(TransferErrorCode.SHARE_NOT_EXIST,
|
||
platform=self.PLATFORM_KEY)
|
||
|
||
# 3. 执行转存
|
||
save_dir = save_dir or self.config.save_dir or "/"
|
||
new_fids = self._save_files(pwd_id, detail, save_dir)
|
||
if not new_fids:
|
||
raise TransferError(TransferErrorCode.RESOURCE_EMPTY,
|
||
platform=self.PLATFORM_KEY)
|
||
|
||
# 4. 广告过滤
|
||
if self.transfer_config.ad_filter_enabled:
|
||
new_fids = self._filter_ads(new_fids)
|
||
if not new_fids:
|
||
raise TransferError(TransferErrorCode.RESOURCE_EMPTY,
|
||
platform=self.PLATFORM_KEY)
|
||
|
||
# 5. 创建新分享
|
||
pwd = share_password or self.config.share_password
|
||
share_url_new, share_pwd = self._create_share(new_fids, detail.get("title", ""), pwd)
|
||
|
||
elapsed = int((time.time() - start) * 1000)
|
||
return TransferResult(
|
||
success=True,
|
||
platform=self.PLATFORM_KEY,
|
||
new_file_id=",".join(new_fids),
|
||
file_name=detail.get("title", ""),
|
||
share_url=share_url_new,
|
||
share_password=share_pwd,
|
||
original_url=share_url,
|
||
elapsed_ms=elapsed,
|
||
)
|
||
|
||
except TransferError:
|
||
raise
|
||
except Exception as e:
|
||
logger.exception(f"[{self.PLATFORM_KEY}] transfer failed: {share_url}")
|
||
raise TransferError(TransferErrorCode.NETWORK_ERROR,
|
||
message=str(e), platform=self.PLATFORM_KEY)
|
||
|
||
def verify(self, share_url: str) -> VerifyResult:
|
||
"""验证分享链接有效性"""
|
||
try:
|
||
pwd_id, passcode = self._parse_share_url(share_url)
|
||
detail = self._get_share_detail(pwd_id, passcode)
|
||
files = self._extract_file_list(detail)
|
||
return VerifyResult(
|
||
valid=True,
|
||
platform=self.PLATFORM_KEY,
|
||
title=detail.get("title", ""),
|
||
file_count=len(files),
|
||
files=files,
|
||
)
|
||
except TransferError as e:
|
||
return VerifyResult(valid=False, platform=self.PLATFORM_KEY, error=e)
|
||
except Exception as e:
|
||
return VerifyResult(
|
||
valid=False,
|
||
platform=self.PLATFORM_KEY,
|
||
error=TransferError(TransferErrorCode.NETWORK_ERROR, message=str(e)),
|
||
)
|
||
|
||
@abstractmethod
|
||
def get_files(self, parent_fid: str = "0") -> List[FileInfo]:
|
||
"""列出目录下的文件"""
|
||
...
|
||
|
||
@abstractmethod
|
||
def delete(self, file_ids: List[str]) -> bool:
|
||
"""删除文件"""
|
||
...
|
||
|
||
# ─── URL解析 ──────────────────────────────────────────
|
||
|
||
def _parse_share_url(self, url: str) -> Tuple[str, str]:
|
||
"""
|
||
解析分享URL → (pwd_id, passcode)
|
||
子类可覆盖
|
||
"""
|
||
for pattern in self.URL_PATTERNS:
|
||
m = re.search(pattern, url)
|
||
if m:
|
||
pwd_id = m.group(1)
|
||
passcode = ""
|
||
# 尝试从URL参数提取密码
|
||
parsed = urlparse(url)
|
||
params = parse_qs(parsed.query)
|
||
passcode = params.get("pwd", params.get("code", [""]))[0]
|
||
return pwd_id, passcode
|
||
|
||
raise TransferError(TransferErrorCode.URL_INVALID,
|
||
message=f"无法解析{self.PLATFORM_NAME}链接: {url}")
|
||
|
||
# ─── 核心抽象方法(子类必须实现)────────────────────────
|
||
|
||
@abstractmethod
|
||
def _get_share_detail(self, pwd_id: str, passcode: str = "") -> dict:
|
||
"""获取分享详情 → {title, fid/fs_id, ...}"""
|
||
...
|
||
|
||
@abstractmethod
|
||
def _save_files(self, pwd_id: str, detail: dict, save_dir: str) -> List[str]:
|
||
"""转存文件 → 返回新文件ID列表"""
|
||
...
|
||
|
||
@abstractmethod
|
||
def _create_share(self, file_ids: List[str], title: str,
|
||
password: str = "") -> Tuple[str, str]:
|
||
"""创建分享 → (share_url, share_password)"""
|
||
...
|
||
|
||
def _extract_file_list(self, detail: dict) -> List[FileInfo]:
|
||
"""从分享详情提取文件列表(默认实现,子类可覆盖)"""
|
||
return []
|
||
|
||
def _filter_ads(self, file_ids: List[str]) -> List[str]:
|
||
"""广告过滤(默认不实现,子类可覆盖)"""
|
||
return file_ids
|
||
|
||
# ─── HTTP 工具方法 ─────────────────────────────────────
|
||
|
||
def _get(self, url: str, params: dict = None, headers: dict = None,
|
||
retry: int = None) -> requests.Response:
|
||
return self._request("GET", url, params=params, headers=headers, retry=retry)
|
||
|
||
def _post(self, url: str, json_data: dict = None, data: dict = None,
|
||
params: dict = None, headers: dict = None, retry: int = None) -> requests.Response:
|
||
return self._request("POST", url, json=json_data, data=data,
|
||
params=params, headers=headers, retry=retry)
|
||
|
||
def _request(self, method: str, url: str, **kwargs) -> requests.Response:
|
||
"""统一HTTP请求,带重试"""
|
||
retry = kwargs.pop("retry", None)
|
||
max_retries = retry if retry is not None else self.transfer_config.max_retries
|
||
|
||
last_exc = None
|
||
for attempt in range(max_retries + 1):
|
||
try:
|
||
resp = self.session.request(
|
||
method, url,
|
||
timeout=self.transfer_config.request_timeout,
|
||
**kwargs
|
||
)
|
||
return resp
|
||
except requests.RequestException as e:
|
||
last_exc = e
|
||
if attempt < max_retries:
|
||
delay = self.transfer_config.retry_delay * (2 ** attempt)
|
||
logger.warning(f"[{self.PLATFORM_KEY}] HTTP retry {attempt+1}/{max_retries} "
|
||
f"after {delay:.1f}s: {url}")
|
||
time.sleep(delay)
|
||
|
||
raise TransferError(TransferErrorCode.NETWORK_ERROR,
|
||
message=str(last_exc), platform=self.PLATFORM_KEY)
|
||
|
||
def _poll_task(self, task_url: str, task_id: str,
|
||
status_field: str = "status",
|
||
success_value: Any = 2,
|
||
result_path: str = None,
|
||
query_params: dict = None) -> dict:
|
||
"""
|
||
轮询异步任务直到完成
|
||
参考 netdisk 的任务轮询机制
|
||
"""
|
||
interval = self.transfer_config.task_poll_interval
|
||
max_attempts = self.transfer_config.task_poll_max_attempts
|
||
max_wait = self.transfer_config.task_poll_max_wait
|
||
started = time.time()
|
||
|
||
for attempt in range(max_attempts):
|
||
if time.time() - started > max_wait:
|
||
raise TransferError(TransferErrorCode.TIMEOUT,
|
||
platform=self.PLATFORM_KEY,
|
||
details={"task_id": task_id})
|
||
|
||
try:
|
||
params = query_params or {}
|
||
params["task_id"] = task_id
|
||
resp = self._get(task_url, params=params, retry=1)
|
||
data = resp.json().get("data", resp.json())
|
||
|
||
current_status = data.get(status_field)
|
||
if current_status == success_value:
|
||
if result_path:
|
||
# 支持点号路径如 "save_as.save_as_top_fids"
|
||
for key in result_path.split("."):
|
||
data = data.get(key, {}) if isinstance(data, dict) else data
|
||
return data
|
||
|
||
if current_status is False or current_status == -1:
|
||
raise TransferError(TransferErrorCode.NETWORK_ERROR,
|
||
message=f"任务失败: {data}",
|
||
platform=self.PLATFORM_KEY)
|
||
|
||
except (requests.RequestException, ValueError):
|
||
pass
|
||
|
||
time.sleep(interval)
|
||
|
||
raise TransferError(TransferErrorCode.TIMEOUT,
|
||
platform=self.PLATFORM_KEY,
|
||
details={"task_id": task_id, "attempts": max_attempts})
|
||
|
||
|
||
# ─── 工厂函数(adapter/factory.py 使用)───────────────────
|
||
|
||
def match_url(url: str, adapter_cls: type) -> bool:
|
||
"""URL是否匹配某个适配器"""
|
||
for pattern in adapter_cls.URL_PATTERNS:
|
||
if re.search(pattern, url):
|
||
return True
|
||
return False
|