""" CloudSearch Transfer — 转存编排器 v1.0.0 参考 search-ucmao 的 pan_operator.create_share + cloud-auto-save 的任务调度 """ import time import logging import threading from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass, field from typing import Optional, List, Dict, Any, Callable from ..adapter.base import TransferResult, VerifyResult, BaseCloudDriveAdapter from ..adapter.factory import AdapterFactory from ..config import ConfigManager from ..credential.manager import CredentialManager from ..errors import TransferError, TransferErrorCode logger = logging.getLogger(__name__) @dataclass class TransferTask: """转存任务""" task_id: str share_url: str platform: str = "" status: str = "pending" # pending/running/completed/failed result: Optional[TransferResult] = None error: Optional[str] = None created_at: float = field(default_factory=time.time) completed_at: Optional[float] = None callback: Optional[Callable] = None class TransferOrchestrator: """ 转存编排器 - 统一入口:接受分享链接 → 自动识别平台 → 转存 - 并发控制:ThreadPoolExecutor - 任务追踪:内存队列 + 回调通知 - 凭证健康检测 - 重试机制 """ def __init__(self, config_manager: ConfigManager = None): self.config = config_manager or ConfigManager() self.credential_mgr = CredentialManager() self.factory = AdapterFactory(self.config) self._executor = ThreadPoolExecutor( max_workers=self.config.transfer.max_concurrent_transfers, thread_name_prefix="transfer-", ) self._tasks: Dict[str, TransferTask] = {} self._task_lock = threading.Lock() self._seq = 0 def transfer(self, share_url: str, save_dir: str = "", share_password: str = "", callback: Callable = None) -> TransferResult: """ 转存单个分享链接(同步) Args: share_url: 分享链接 save_dir: 目标目录 share_password: 新分享密码 callback: 完成回调 callback(TransferResult) Returns: TransferResult """ start = time.time() try: adapter = self.factory.get_adapter_for_url(share_url) if not adapter: raise TransferError(TransferErrorCode.URL_INVALID) result = adapter.transfer( share_url=share_url, save_dir=save_dir, share_password=share_password, ) if callback: callback(result) return result except TransferError: raise except Exception as e: logger.exception(f"Transfer failed: {share_url}") raise TransferError(TransferErrorCode.NETWORK_ERROR, message=str(e)) def transfer_async(self, share_url: str, save_dir: str = "", share_password: str = "", callback: Callable = None) -> str: """ 异步转存 → 返回task_id Returns: task_id (str) """ with self._task_lock: self._seq += 1 task_id = f"transfer_{int(time.time())}_{self._seq}" task = TransferTask( task_id=task_id, share_url=share_url, status="pending", callback=callback, ) self._tasks[task_id] = task future = self._executor.submit( self._run_transfer, task, save_dir, share_password ) future.add_done_callback(lambda f: self._on_task_done(task, f)) return task_id def _run_transfer(self, task: TransferTask, save_dir: str, share_password: str): """在线程池中执行转存""" with self._task_lock: task.status = "running" try: result = self.transfer(task.share_url, save_dir, share_password) with self._task_lock: task.result = result task.status = "completed" task.completed_at = time.time() except TransferError as e: with self._task_lock: task.error = str(e) task.status = "failed" task.completed_at = time.time() raise def _on_task_done(self, task: TransferTask, future): """任务完成回调""" try: future.result() # 触发异常传播 except Exception: pass if task.callback: try: task.callback(task.result) except Exception: logger.exception("Callback error") def verify(self, share_url: str) -> VerifyResult: """验证分享链接有效性""" try: adapter = self.factory.get_adapter_for_url(share_url) return adapter.verify(share_url) except TransferError as e: return VerifyResult(valid=False, platform="", error=e) def get_task(self, task_id: str) -> Optional[TransferTask]: """获取任务状态""" return self._tasks.get(task_id) def list_tasks(self, status: str = None, limit: int = 50) -> List[TransferTask]: """列出任务""" tasks = list(self._tasks.values()) if status: tasks = [t for t in tasks if t.status == status] tasks.sort(key=lambda t: t.created_at, reverse=True) return tasks[:limit] def get_stats(self) -> Dict[str, Any]: """获取统计信息""" enabled = self.config.get_enabled_platforms() credentials = {} for p in enabled: status = self.credential_mgr.get_status(p) credentials[p] = { "valid": status.valid if status else False, "last_check": status.last_check if status else 0, "fail_count": status.fail_count if status else 0, } if status else {} tasks = self._tasks.values() return { "enabled_platforms": enabled, "credentials": credentials, "total_tasks": len(tasks), "pending": sum(1 for t in tasks if t.status == "pending"), "running": sum(1 for t in tasks if t.status == "running"), "completed": sum(1 for t in tasks if t.status == "completed"), "failed": sum(1 for t in tasks if t.status == "failed"), } def check_health(self) -> Dict[str, Any]: """健康检查""" results = {} for platform in self.config.get_enabled_platforms(): try: adapter = self.factory.get_adapter(platform) if adapter: results[platform] = "ok" else: results[platform] = "no_adapter" except Exception as e: results[platform] = f"error: {e}" return results def shutdown(self): """关闭编排器""" self._executor.shutdown(wait=True, cancel_futures=False) logger.info("TransferOrchestrator shutdown complete")