""" CloudSearch Transfer — HTTP API 服务 v1.0.0 以 Flask 微服务形式运行,与 CloudSearch 主应用通过 HTTP 通信 """ import os import uuid import logging from flask import Flask, request, jsonify from config import ConfigManager from orchestration.transfer import TransferOrchestrator # ─── 初始化 ──────────────────────────────────────────── app = Flask(__name__) config = ConfigManager() orchestrator = TransferOrchestrator(config) logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) logger = logging.getLogger("transfer_api") # ─── 健康检查 ────────────────────────────────────────── @app.route("/health", methods=["GET"]) def health(): return jsonify({ "status": "ok", "version": "1.0.0", "platforms": orchestrator.get_stats(), }) # ─── 转存接口 ────────────────────────────────────────── @app.route("/api/transfer", methods=["POST"]) def transfer(): """转存分享链接""" data = request.get_json() or {} share_url = data.get("share_url", "").strip() if not share_url: return jsonify({"error": "share_url is required"}), 400 save_dir = data.get("save_dir", "") share_password = data.get("share_password", "") async_mode = data.get("async", False) try: if async_mode: task_id = orchestrator.transfer_async(share_url, save_dir, share_password) return jsonify({"task_id": task_id, "status": "pending"}) else: result = orchestrator.transfer(share_url, save_dir, share_password) return jsonify({ "success": result.success, "platform": result.platform, "new_file_id": result.new_file_id, "file_name": result.file_name, "share_url": result.share_url, "share_password": result.share_password, "elapsed_ms": result.elapsed_ms, }) except Exception as e: logger.exception("Transfer failed") return jsonify({"error": str(e), "code": getattr(e, "code", 500)}), 500 # ─── 验证接口 ────────────────────────────────────────── @app.route("/api/verify", methods=["POST"]) def verify(): """验证分享链接有效性""" data = request.get_json() or {} share_url = data.get("share_url", "").strip() if not share_url: return jsonify({"error": "share_url is required"}), 400 result = orchestrator.verify(share_url) return jsonify({ "valid": result.valid, "platform": result.platform, "title": result.title, "file_count": result.file_count, "files": [{"fid": f.fid, "name": f.name, "size": f.size} for f in (result.files or [])], "error": result.error.to_dict() if result.error else None, }) # ─── 任务查询 ────────────────────────────────────────── @app.route("/api/task/", methods=["GET"]) def get_task(task_id): """查询异步任务状态""" task = orchestrator.get_task(task_id) if not task: return jsonify({"error": "task not found"}), 404 result = { "task_id": task.task_id, "status": task.status, "share_url": task.share_url, "platform": task.platform, "created_at": task.created_at, "completed_at": task.completed_at, } if task.result: result["result"] = { "success": task.result.success, "share_url": task.result.share_url, "file_name": task.result.file_name, "elapsed_ms": task.result.elapsed_ms, } if task.error: result["error"] = task.error return jsonify(result) @app.route("/api/tasks", methods=["GET"]) def list_tasks(): """列出任务""" status = request.args.get("status") limit = int(request.args.get("limit", 50)) tasks = orchestrator.list_tasks(status=status, limit=limit) return jsonify({ "tasks": [ { "task_id": t.task_id, "status": t.status, "share_url": t.share_url[:80], "platform": t.platform, "created_at": t.created_at, } for t in tasks ], "total": len(tasks), }) # ─── 统计 ────────────────────────────────────────────── @app.route("/api/stats", methods=["GET"]) def stats(): """获取统计信息""" return jsonify(orchestrator.get_stats()) # ─── 配置管理 ────────────────────────────────────────── @app.route("/api/config/platforms", methods=["GET"]) def get_platforms(): """获取平台配置列表""" platforms = {} for name, cfg in config.platforms.items(): platforms[name] = { "enabled": cfg.enabled, "account_name": cfg.account_name, "save_dir": cfg.save_dir, "has_cookie": bool(cfg.cookie), "has_refresh_token": bool(cfg.refresh_token), } return jsonify({"platforms": platforms}) @app.route("/api/config/platforms/", methods=["PUT"]) def update_platform(name): """更新平台配置""" data = request.get_json() or {} if name not in config.platforms: from config import PlatformConfig config.platforms[name] = PlatformConfig() cfg = config.platforms[name] if "enabled" in data: cfg.enabled = data["enabled"] if "cookie" in data: cfg.cookie = data["cookie"] if "refresh_token" in data: cfg.refresh_token = data["refresh_token"] if "save_dir" in data: cfg.save_dir = data["save_dir"] if "share_password" in data: cfg.share_password = data["share_password"] config.save() orchestrator.factory.invalidate_cache(name) return jsonify({"status": "ok", "platform": name}) # ─── 启动 ────────────────────────────────────────────── if __name__ == "__main__": port = int(os.getenv("PORT", 9528)) debug = os.getenv("FLASK_DEBUG", "0") == "1" logger.info(f"Starting transfer service on port {port}") app.run(host="0.0.0.0", port=port, debug=debug)