Files
CloudSearch/cloudsearch_transfer/adapter/xunlei/credential.py
admin 83cbfaf03f v0.2.7: 修复Redis连接 + 启动管理后台
- 修复Redis认证 (配置密码)
- 启动Python管理后台 (端口9531, 15个功能开关)
- 统一版本号 0.2.7
- 更新docker-compose.yml (镜像版本/Redis URL/Admin服务)
2026-05-17 02:22:18 +08:00

340 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
CloudSearch Transfer — 迅雷网盘凭证管理器 v1.0.0
迅雷网盘使用 refresh_token + captcha_token 双重认证机制:
1. refresh_token → access_token (OAuth)
POST https://xluser-ssl.xunlei.com/v1/auth/token
Body: {"grant_type": "refresh_token", "refresh_token": "...", "client_id": "..."}
2. captcha_token 获取(某些操作需要)
POST /v1/shield/captcha/init
Body: {"client_id": "...", "action": "...", "device_id": "...", "meta": {"captcha_sign": "..."}}
3. get_headers() 返回所有需要的认证头:
Authorization: Bearer <access_token>
x-captcha-token: <captcha_token>
x-client-id: <client_id>
x-device-id: <device_id>
"""
from __future__ import annotations
import logging
import time
import threading
from typing import Dict, Optional
import requests
logger = logging.getLogger(__name__)
# ─── 常量 ───────────────────────────────────────────────────────────
# 迅雷网盘 OAuth 认证端点
XUNLEI_AUTH_API = "https://xluser-ssl.xunlei.com"
# 迅雷网盘客户端标识(固定值)
CLIENT_ID = "Xqp0kJBXWhwaTpB6"
DEVICE_ID = "925b7631473a13716b791d7f28289cad"
# ─── 默认请求头 ─────────────────────────────────────────────────────
DEFAULT_HEADERS: Dict[str, str] = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/135.0.0.0 Safari/537.36"
),
"Accept": "application/json, text/plain, */*",
"Content-Type": "application/json",
}
class XunleiCredentialManager:
"""迅雷网盘凭证管理器。
职责:
- 使用 refresh_token 换取 access_token
- 获取 captcha_token特定 action 需要)
- 构建包含所有认证头的请求头字典
- 访问令牌过期前自动刷新(提前 60s
用法:
mgr = XunleiCredentialManager(refresh_token="xxx")
mgr.refresh_access_token() # 刷新 access_token
captcha = mgr.get_captcha_token("restore") # 获取验证码令牌
headers = mgr.get_headers() # 获取完整的认证请求头
is_ok = mgr.validate() # 验证凭证有效性
Attributes:
CLIENT_ID: 迅雷客户端 ID。
DEVICE_ID: 设备标识。
"""
# ─── 类常量 ────────────────────────────────────────────────
CLIENT_ID: str = CLIENT_ID
DEVICE_ID: str = DEVICE_ID
def __init__(self, refresh_token: str = "") -> None:
"""初始化迅雷凭证管理器。
Args:
refresh_token: 迅雷网盘的 refresh_token。
"""
self._refresh_token: str = refresh_token.strip()
self._access_token: str = ""
self._expires_at: float = 0.0
self._captcha_tokens: Dict[str, str] = {} # action → captcha_token
self._lock: threading.Lock = threading.Lock()
self._session: requests.Session = requests.Session()
self._session.headers.update(DEFAULT_HEADERS)
# ─── 公开 API ──────────────────────────────────────────────
def validate(self) -> bool:
"""验证 refresh_token 是否有效。
要求 refresh_token 长度 >= 20且能成功换取 access_token。
Returns:
True 表示凭证有效。
"""
if not self._refresh_token or len(self._refresh_token) < 20:
logger.warning(
"[XunleiCredential] refresh_token 长度不足 20验证失败"
)
return False
return self.refresh_access_token()
def is_valid(self) -> bool:
"""validate() 的别名。"""
return self.validate()
def refresh_access_token(self) -> bool:
"""使用 refresh_token 换取 access_token。
POST /v1/auth/token
Body: {"grant_type": "refresh_token", "refresh_token": "...", "client_id": "..."}
返回 True 表示成功False 表示失败。
"""
with self._lock:
return self._do_refresh()
def get_captcha_token(self, action: str) -> str:
"""获取指定 action 的 captcha_token。
POST /v1/shield/captcha/init
Body: {
"client_id": "...",
"action": "...",
"device_id": "...",
"meta": {"captcha_sign": "..."}
}
captcha_token 会按 action 缓存,避免重复获取。
Args:
action: 操作类型,如 "restore""share" 等。
Returns:
captcha_token 字符串,获取失败返回空字符串。
"""
with self._lock:
# 检查缓存
if action in self._captcha_tokens:
return self._captcha_tokens[action]
return self._do_get_captcha(action)
def get_headers(self) -> Dict[str, str]:
"""构建包含所有认证头的请求头字典。
返回:
- Authorization: Bearer <access_token>
- x-captcha-token: <captcha_token> (如有)
- x-client-id: <client_id>
- x-device-id: <device_id>
Returns:
认证请求头字典。
"""
self._ensure_token_valid()
headers: Dict[str, str] = {
"x-client-id": self.CLIENT_ID,
"x-device-id": self.DEVICE_ID,
}
if self._access_token:
headers["Authorization"] = f"Bearer {self._access_token}"
return headers
def get_headers_with_captcha(self, action: str = "") -> Dict[str, str]:
"""获取带 captcha_token 的完整认证头。
Args:
action: captcha 操作类型,空字符串表示不需要 captcha。
Returns:
包含 Authorization + x-captcha-token 的请求头字典。
"""
headers = self.get_headers()
if action:
captcha = self.get_captcha_token(action)
if captcha:
headers["x-captcha-token"] = captcha
return headers
def get_access_token(self) -> str:
"""获取当前有效的 access_token必要时自动刷新"""
self._ensure_token_valid()
return self._access_token
@property
def refresh_token(self) -> str:
"""返回当前 refresh_token。"""
return self._refresh_token
@refresh_token.setter
def refresh_token(self, value: str) -> None:
"""更新 refresh_token。"""
self._refresh_token = value.strip()
with self._lock:
self._access_token = ""
self._expires_at = 0.0
self._captcha_tokens.clear()
# ─── 内部方法 ──────────────────────────────────────────────
def _ensure_token_valid(self) -> None:
"""确保 access_token 有效(过期则自动刷新)。"""
if not self._access_token or time.time() >= (self._expires_at - 60):
self.refresh_access_token()
def _do_refresh(self) -> bool:
"""实际执行 token 刷新。
POST https://xluser-ssl.xunlei.com/v1/auth/token
"""
if not self._refresh_token:
logger.error("[XunleiCredential] 没有 refresh_token无法刷新")
return False
url = f"{XUNLEI_AUTH_API}/v1/auth/token"
body: Dict[str, str] = {
"grant_type": "refresh_token",
"refresh_token": self._refresh_token,
"client_id": self.CLIENT_ID,
}
try:
resp = self._session.post(url, json=body, timeout=30)
data = resp.json()
if resp.status_code != 200:
logger.error(
"[XunleiCredential] 刷新 token 失败: HTTP %d, %s",
resp.status_code,
data,
)
return False
access_token = data.get("access_token", "")
if not access_token:
logger.error(
"[XunleiCredential] 响应中缺少 access_token: %s", data
)
return False
expires_in = int(data.get("expires_in", 7200))
new_refresh = data.get("refresh_token", self._refresh_token)
self._access_token = access_token
self._expires_at = time.time() + expires_in
# 更新 refresh_token服务端可能下发新的
if new_refresh != self._refresh_token:
logger.info(
"[XunleiCredential] refresh_token 已轮换: "
f"{self._refresh_token[:8]}... → {new_refresh[:8]}..."
)
self._refresh_token = new_refresh
# 清除 captcha 缓存token 变了captcha 可能也失效了)
self._captcha_tokens.clear()
logger.info(
"[XunleiCredential] Token 刷新成功 (expires_in=%ds)", expires_in
)
return True
except requests.RequestException as e:
logger.error(f"[XunleiCredential] 刷新 token 网络异常: {e}")
return False
except Exception as e:
logger.exception(f"[XunleiCredential] 刷新 token 未知异常: {e}")
return False
def _do_get_captcha(self, action: str) -> str:
"""获取 captcha_token。
POST /v1/shield/captcha/init
"""
url = f"{XUNLEI_AUTH_API}/v1/shield/captcha/init"
body: Dict[str, Any] = {
"client_id": self.CLIENT_ID,
"action": action,
"device_id": self.DEVICE_ID,
"meta": {
"captcha_sign": "",
},
}
# 需要 Authorization 头
if not self._access_token:
if not self._do_refresh():
logger.error("[XunleiCredential] 无法获取 access_token跳过 captcha")
return ""
headers: Dict[str, str] = {
"Authorization": f"Bearer {self._access_token}",
"Content-Type": "application/json",
}
try:
resp = self._session.post(url, json=body, headers=headers, timeout=15)
data = resp.json()
captcha_token = data.get("captcha_token", "")
if captcha_token:
self._captcha_tokens[action] = captcha_token
logger.info(
"[XunleiCredential] captcha_token 获取成功 for action=%s",
action,
)
else:
logger.warning(
"[XunleiCredential] captcha_token 为空 for action=%s: %s",
action,
data,
)
return captcha_token
except requests.RequestException as e:
logger.error(f"[XunleiCredential] 获取 captcha_token 网络异常: {e}")
return ""
except Exception as e:
logger.exception(f"[XunleiCredential] 获取 captcha_token 异常: {e}")
return ""
def __repr__(self) -> str:
return (
f"XunleiCredentialManager("
f"refresh_token={'***' if self._refresh_token else 'None'}, "
f"has_access_token={bool(self._access_token)}, "
f"captcha_actions={list(self._captcha_tokens.keys())})"
)