- 修复Redis认证 (配置密码) - 启动Python管理后台 (端口9531, 15个功能开关) - 统一版本号 0.2.7 - 更新docker-compose.yml (镜像版本/Redis URL/Admin服务)
131 lines
4.1 KiB
Python
131 lines
4.1 KiB
Python
"""
|
||
CloudSearch Transfer — 凭证管理器 v1.0.0
|
||
参考 search-ucmao 的 get_and_validate_credential + cloud-auto-save 的 Token回写
|
||
"""
|
||
|
||
import time
|
||
import logging
|
||
from typing import Optional, Dict, Any
|
||
from dataclasses import dataclass, field
|
||
|
||
from ..config import PlatformConfig
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
@dataclass
|
||
class CredentialStatus:
|
||
"""凭证状态"""
|
||
valid: bool
|
||
platform: str
|
||
last_check: float = 0.0
|
||
last_error: str = ""
|
||
checks_count: int = 0
|
||
fail_count: int = 0
|
||
|
||
|
||
class CredentialManager:
|
||
"""
|
||
凭证管理器
|
||
- 凭证校验(各平台最小长度要求不同)
|
||
- Token自动刷新(阿里云/迅雷)
|
||
- 健康检测
|
||
"""
|
||
|
||
# 各平台最小凭证长度
|
||
MIN_LENGTH_MAP = {
|
||
"quark": 50, # Cookie ≥ 50字符
|
||
"baidu": 50, # Cookie ≥ 50字符
|
||
"uc": 50, # Cookie ≥ 50字符
|
||
"aliyun": 20, # refresh_token ≥ 20字符
|
||
"xunlei": 30, # refresh_token ≥ 30字符
|
||
"pan123": 30,
|
||
"cloud189": 30,
|
||
}
|
||
|
||
# 凭证类型:cookie / refresh_token
|
||
CREDENTIAL_TYPE = {
|
||
"quark": "cookie",
|
||
"baidu": "cookie",
|
||
"uc": "cookie",
|
||
"aliyun": "refresh_token",
|
||
"xunlei": "refresh_token",
|
||
"pan123": "cookie",
|
||
"cloud189": "cookie",
|
||
}
|
||
|
||
def __init__(self):
|
||
self._status: Dict[str, CredentialStatus] = {}
|
||
self._token_cache: Dict[str, Dict[str, Any]] = {}
|
||
|
||
def validate(self, platform: str, config: PlatformConfig) -> bool:
|
||
"""
|
||
校验凭证有效性
|
||
参考 search-ucmao 的 get_and_validate_credential 逻辑
|
||
"""
|
||
min_len = self.MIN_LENGTH_MAP.get(platform, 20)
|
||
|
||
if self.CREDENTIAL_TYPE.get(platform) == "refresh_token":
|
||
token = config.refresh_token
|
||
valid = bool(token and len(token) >= min_len)
|
||
else:
|
||
cookie = config.cookie
|
||
valid = bool(cookie and len(cookie) >= min_len)
|
||
|
||
# 记录状态
|
||
status = self._status.get(platform, CredentialStatus(valid=False, platform=platform))
|
||
status.last_check = time.time()
|
||
status.checks_count += 1
|
||
if not valid:
|
||
status.fail_count += 1
|
||
status.last_error = f"凭证长度不足 (需要≥{min_len})"
|
||
else:
|
||
status.valid = True
|
||
self._status[platform] = status
|
||
|
||
return valid
|
||
|
||
def get_credential(self, platform: str, config: PlatformConfig) -> str:
|
||
"""
|
||
获取有效凭证
|
||
对于Token类型会自动刷新
|
||
"""
|
||
if not self.validate(platform, config):
|
||
return ""
|
||
|
||
cred_type = self.CREDENTIAL_TYPE.get(platform, "cookie")
|
||
if cred_type == "refresh_token":
|
||
# 优先使用缓存的access_token
|
||
cached = self._token_cache.get(platform, {})
|
||
if cached.get("access_token") and cached.get("expires_at", 0) > time.time() + 60:
|
||
return cached["access_token"]
|
||
return config.refresh_token
|
||
else:
|
||
return config.cookie
|
||
|
||
def update_access_token(self, platform: str, access_token: str,
|
||
expires_in: int = 3600):
|
||
"""更新缓存的access_token"""
|
||
self._token_cache[platform] = {
|
||
"access_token": access_token,
|
||
"expires_at": time.time() + expires_in,
|
||
}
|
||
|
||
def get_status(self, platform: str) -> Optional[CredentialStatus]:
|
||
"""获取凭证状态"""
|
||
return self._status.get(platform)
|
||
|
||
def get_all_status(self) -> Dict[str, CredentialStatus]:
|
||
"""获取所有平台凭证状态"""
|
||
return dict(self._status)
|
||
|
||
def mark_invalid(self, platform: str, reason: str = ""):
|
||
"""标记凭证失效"""
|
||
status = self._status.get(platform, CredentialStatus(valid=False, platform=platform))
|
||
status.valid = False
|
||
status.last_error = reason
|
||
status.fail_count += 1
|
||
status.last_check = time.time()
|
||
self._status[platform] = status
|
||
logger.warning(f"[Credential] {platform} marked invalid: {reason}")
|