- 修复Redis认证 (配置密码) - 启动Python管理后台 (端口9531, 15个功能开关) - 统一版本号 0.2.7 - 更新docker-compose.yml (镜像版本/Redis URL/Admin服务)
215 lines
7.1 KiB
Python
215 lines
7.1 KiB
Python
"""
|
||
CloudSearch Transfer — 转存编排器 v1.0.0
|
||
参考 search-ucmao 的 pan_operator.create_share + cloud-auto-save 的任务调度
|
||
"""
|
||
|
||
import time
|
||
import logging
|
||
import threading
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
from dataclasses import dataclass, field
|
||
from typing import Optional, List, Dict, Any, Callable
|
||
|
||
from ..adapter.base import TransferResult, VerifyResult, BaseCloudDriveAdapter
|
||
from ..adapter.factory import AdapterFactory
|
||
from ..config import ConfigManager
|
||
from ..credential.manager import CredentialManager
|
||
from ..errors import TransferError, TransferErrorCode
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
@dataclass
|
||
class TransferTask:
|
||
"""转存任务"""
|
||
task_id: str
|
||
share_url: str
|
||
platform: str = ""
|
||
status: str = "pending" # pending/running/completed/failed
|
||
result: Optional[TransferResult] = None
|
||
error: Optional[str] = None
|
||
created_at: float = field(default_factory=time.time)
|
||
completed_at: Optional[float] = None
|
||
callback: Optional[Callable] = None
|
||
|
||
|
||
class TransferOrchestrator:
|
||
"""
|
||
转存编排器
|
||
- 统一入口:接受分享链接 → 自动识别平台 → 转存
|
||
- 并发控制:ThreadPoolExecutor
|
||
- 任务追踪:内存队列 + 回调通知
|
||
- 凭证健康检测
|
||
- 重试机制
|
||
"""
|
||
|
||
def __init__(self, config_manager: ConfigManager = None):
|
||
self.config = config_manager or ConfigManager()
|
||
self.credential_mgr = CredentialManager()
|
||
self.factory = AdapterFactory(self.config)
|
||
self._executor = ThreadPoolExecutor(
|
||
max_workers=self.config.transfer.max_concurrent_transfers,
|
||
thread_name_prefix="transfer-",
|
||
)
|
||
self._tasks: Dict[str, TransferTask] = {}
|
||
self._task_lock = threading.Lock()
|
||
self._seq = 0
|
||
|
||
def transfer(self, share_url: str, save_dir: str = "",
|
||
share_password: str = "",
|
||
callback: Callable = None) -> TransferResult:
|
||
"""
|
||
转存单个分享链接(同步)
|
||
|
||
Args:
|
||
share_url: 分享链接
|
||
save_dir: 目标目录
|
||
share_password: 新分享密码
|
||
callback: 完成回调 callback(TransferResult)
|
||
|
||
Returns:
|
||
TransferResult
|
||
"""
|
||
start = time.time()
|
||
try:
|
||
adapter = self.factory.get_adapter_for_url(share_url)
|
||
if not adapter:
|
||
raise TransferError(TransferErrorCode.URL_INVALID)
|
||
|
||
result = adapter.transfer(
|
||
share_url=share_url,
|
||
save_dir=save_dir,
|
||
share_password=share_password,
|
||
)
|
||
|
||
if callback:
|
||
callback(result)
|
||
|
||
return result
|
||
|
||
except TransferError:
|
||
raise
|
||
except Exception as e:
|
||
logger.exception(f"Transfer failed: {share_url}")
|
||
raise TransferError(TransferErrorCode.NETWORK_ERROR, message=str(e))
|
||
|
||
def transfer_async(self, share_url: str, save_dir: str = "",
|
||
share_password: str = "",
|
||
callback: Callable = None) -> str:
|
||
"""
|
||
异步转存 → 返回task_id
|
||
|
||
Returns:
|
||
task_id (str)
|
||
"""
|
||
with self._task_lock:
|
||
self._seq += 1
|
||
task_id = f"transfer_{int(time.time())}_{self._seq}"
|
||
|
||
task = TransferTask(
|
||
task_id=task_id,
|
||
share_url=share_url,
|
||
status="pending",
|
||
callback=callback,
|
||
)
|
||
self._tasks[task_id] = task
|
||
|
||
future = self._executor.submit(
|
||
self._run_transfer, task, save_dir, share_password
|
||
)
|
||
future.add_done_callback(lambda f: self._on_task_done(task, f))
|
||
|
||
return task_id
|
||
|
||
def _run_transfer(self, task: TransferTask, save_dir: str, share_password: str):
|
||
"""在线程池中执行转存"""
|
||
with self._task_lock:
|
||
task.status = "running"
|
||
|
||
try:
|
||
result = self.transfer(task.share_url, save_dir, share_password)
|
||
with self._task_lock:
|
||
task.result = result
|
||
task.status = "completed"
|
||
task.completed_at = time.time()
|
||
except TransferError as e:
|
||
with self._task_lock:
|
||
task.error = str(e)
|
||
task.status = "failed"
|
||
task.completed_at = time.time()
|
||
raise
|
||
|
||
def _on_task_done(self, task: TransferTask, future):
|
||
"""任务完成回调"""
|
||
try:
|
||
future.result() # 触发异常传播
|
||
except Exception:
|
||
pass
|
||
if task.callback:
|
||
try:
|
||
task.callback(task.result)
|
||
except Exception:
|
||
logger.exception("Callback error")
|
||
|
||
def verify(self, share_url: str) -> VerifyResult:
|
||
"""验证分享链接有效性"""
|
||
try:
|
||
adapter = self.factory.get_adapter_for_url(share_url)
|
||
return adapter.verify(share_url)
|
||
except TransferError as e:
|
||
return VerifyResult(valid=False, platform="", error=e)
|
||
|
||
def get_task(self, task_id: str) -> Optional[TransferTask]:
|
||
"""获取任务状态"""
|
||
return self._tasks.get(task_id)
|
||
|
||
def list_tasks(self, status: str = None, limit: int = 50) -> List[TransferTask]:
|
||
"""列出任务"""
|
||
tasks = list(self._tasks.values())
|
||
if status:
|
||
tasks = [t for t in tasks if t.status == status]
|
||
tasks.sort(key=lambda t: t.created_at, reverse=True)
|
||
return tasks[:limit]
|
||
|
||
def get_stats(self) -> Dict[str, Any]:
|
||
"""获取统计信息"""
|
||
enabled = self.config.get_enabled_platforms()
|
||
credentials = {}
|
||
for p in enabled:
|
||
status = self.credential_mgr.get_status(p)
|
||
credentials[p] = {
|
||
"valid": status.valid if status else False,
|
||
"last_check": status.last_check if status else 0,
|
||
"fail_count": status.fail_count if status else 0,
|
||
} if status else {}
|
||
|
||
tasks = self._tasks.values()
|
||
return {
|
||
"enabled_platforms": enabled,
|
||
"credentials": credentials,
|
||
"total_tasks": len(tasks),
|
||
"pending": sum(1 for t in tasks if t.status == "pending"),
|
||
"running": sum(1 for t in tasks if t.status == "running"),
|
||
"completed": sum(1 for t in tasks if t.status == "completed"),
|
||
"failed": sum(1 for t in tasks if t.status == "failed"),
|
||
}
|
||
|
||
def check_health(self) -> Dict[str, Any]:
|
||
"""健康检查"""
|
||
results = {}
|
||
for platform in self.config.get_enabled_platforms():
|
||
try:
|
||
adapter = self.factory.get_adapter(platform)
|
||
if adapter:
|
||
results[platform] = "ok"
|
||
else:
|
||
results[platform] = "no_adapter"
|
||
except Exception as e:
|
||
results[platform] = f"error: {e}"
|
||
return results
|
||
|
||
def shutdown(self):
|
||
"""关闭编排器"""
|
||
self._executor.shutdown(wait=True, cancel_futures=False)
|
||
logger.info("TransferOrchestrator shutdown complete")
|