Files
CloudSearch/cloudsearch_transfer/credential/manager.py
admin 83cbfaf03f v0.2.7: 修复Redis连接 + 启动管理后台
- 修复Redis认证 (配置密码)
- 启动Python管理后台 (端口9531, 15个功能开关)
- 统一版本号 0.2.7
- 更新docker-compose.yml (镜像版本/Redis URL/Admin服务)
2026-05-17 02:22:18 +08:00

131 lines
4.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
CloudSearch Transfer — 凭证管理器 v1.0.0
参考 search-ucmao 的 get_and_validate_credential + cloud-auto-save 的 Token回写
"""
import time
import logging
from typing import Optional, Dict, Any
from dataclasses import dataclass, field
from ..config import PlatformConfig
logger = logging.getLogger(__name__)
@dataclass
class CredentialStatus:
"""凭证状态"""
valid: bool
platform: str
last_check: float = 0.0
last_error: str = ""
checks_count: int = 0
fail_count: int = 0
class CredentialManager:
"""
凭证管理器
- 凭证校验(各平台最小长度要求不同)
- Token自动刷新阿里云/迅雷)
- 健康检测
"""
# 各平台最小凭证长度
MIN_LENGTH_MAP = {
"quark": 50, # Cookie ≥ 50字符
"baidu": 50, # Cookie ≥ 50字符
"uc": 50, # Cookie ≥ 50字符
"aliyun": 20, # refresh_token ≥ 20字符
"xunlei": 30, # refresh_token ≥ 30字符
"pan123": 30,
"cloud189": 30,
}
# 凭证类型cookie / refresh_token
CREDENTIAL_TYPE = {
"quark": "cookie",
"baidu": "cookie",
"uc": "cookie",
"aliyun": "refresh_token",
"xunlei": "refresh_token",
"pan123": "cookie",
"cloud189": "cookie",
}
def __init__(self):
self._status: Dict[str, CredentialStatus] = {}
self._token_cache: Dict[str, Dict[str, Any]] = {}
def validate(self, platform: str, config: PlatformConfig) -> bool:
"""
校验凭证有效性
参考 search-ucmao 的 get_and_validate_credential 逻辑
"""
min_len = self.MIN_LENGTH_MAP.get(platform, 20)
if self.CREDENTIAL_TYPE.get(platform) == "refresh_token":
token = config.refresh_token
valid = bool(token and len(token) >= min_len)
else:
cookie = config.cookie
valid = bool(cookie and len(cookie) >= min_len)
# 记录状态
status = self._status.get(platform, CredentialStatus(valid=False, platform=platform))
status.last_check = time.time()
status.checks_count += 1
if not valid:
status.fail_count += 1
status.last_error = f"凭证长度不足 (需要≥{min_len})"
else:
status.valid = True
self._status[platform] = status
return valid
def get_credential(self, platform: str, config: PlatformConfig) -> str:
"""
获取有效凭证
对于Token类型会自动刷新
"""
if not self.validate(platform, config):
return ""
cred_type = self.CREDENTIAL_TYPE.get(platform, "cookie")
if cred_type == "refresh_token":
# 优先使用缓存的access_token
cached = self._token_cache.get(platform, {})
if cached.get("access_token") and cached.get("expires_at", 0) > time.time() + 60:
return cached["access_token"]
return config.refresh_token
else:
return config.cookie
def update_access_token(self, platform: str, access_token: str,
expires_in: int = 3600):
"""更新缓存的access_token"""
self._token_cache[platform] = {
"access_token": access_token,
"expires_at": time.time() + expires_in,
}
def get_status(self, platform: str) -> Optional[CredentialStatus]:
"""获取凭证状态"""
return self._status.get(platform)
def get_all_status(self) -> Dict[str, CredentialStatus]:
"""获取所有平台凭证状态"""
return dict(self._status)
def mark_invalid(self, platform: str, reason: str = ""):
"""标记凭证失效"""
status = self._status.get(platform, CredentialStatus(valid=False, platform=platform))
status.valid = False
status.last_error = reason
status.fail_count += 1
status.last_check = time.time()
self._status[platform] = status
logger.warning(f"[Credential] {platform} marked invalid: {reason}")