- 修复Redis认证 (配置密码) - 启动Python管理后台 (端口9531, 15个功能开关) - 统一版本号 0.2.7 - 更新docker-compose.yml (镜像版本/Redis URL/Admin服务)
217 lines
7.6 KiB
Python
217 lines
7.6 KiB
Python
"""
|
||
阿里云盘凭证管理器 v1.0.0
|
||
refresh_token → access_token 刷新 + 自动缓存 + 过期前自动刷新
|
||
"""
|
||
|
||
import time
|
||
import logging
|
||
import threading
|
||
from typing import Dict, Optional
|
||
from dataclasses import dataclass, field
|
||
|
||
import requests
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# ─── 常量 ──────────────────────────────────────────────────
|
||
|
||
API_HOST = "https://api.aliyundrive.com"
|
||
TOKEN_REFRESH_URL = f"{API_HOST}/token/refresh"
|
||
|
||
DEFAULT_HEADERS = {
|
||
"User-Agent": (
|
||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
|
||
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
||
"Chrome/135.0.0.0 Safari/537.36"
|
||
),
|
||
"Accept": "application/json, text/plain, */*",
|
||
"Content-Type": "application/json",
|
||
}
|
||
|
||
|
||
@dataclass
|
||
class TokenInfo:
|
||
"""缓存的 Token 信息"""
|
||
access_token: str = ""
|
||
refresh_token: str = ""
|
||
expires_at: float = 0.0 # Unix 时间戳
|
||
drive_id: str = ""
|
||
user_id: str = ""
|
||
nick_name: str = ""
|
||
default_sbox_drive_id: str = ""
|
||
|
||
@property
|
||
def is_expired(self) -> bool:
|
||
"""检查 access_token 是否已过期(提前 60s 视为过期)"""
|
||
return time.time() >= (self.expires_at - 60)
|
||
|
||
@property
|
||
def is_valid(self) -> bool:
|
||
return bool(self.access_token) and not self.is_expired
|
||
|
||
|
||
class AliyunCredentialManager:
|
||
"""
|
||
阿里云盘凭证管理器
|
||
|
||
职责:
|
||
- 使用 refresh_token 换取 access_token
|
||
- 缓存 access_token / expires_at / drive_id
|
||
- 过期前自动刷新(提前 60s)
|
||
- 线程安全
|
||
|
||
用法:
|
||
mgr = AliyunCredentialManager(refresh_token="xxx")
|
||
mgr.refresh() # 强制刷新
|
||
headers = mgr.get_headers() # 获取带 Auth 的请求头
|
||
is_ok = mgr.validate() # 验证 refresh_token 有效性
|
||
"""
|
||
|
||
def __init__(self, refresh_token: str = ""):
|
||
self._refresh_token = refresh_token.strip()
|
||
self._token: Optional[TokenInfo] = None
|
||
self._lock = threading.Lock()
|
||
self._session = requests.Session()
|
||
self._session.headers.update(DEFAULT_HEADERS)
|
||
|
||
# ─── 公开 API ──────────────────────────────────────────
|
||
|
||
def refresh(self) -> bool:
|
||
"""
|
||
使用 refresh_token 换取 access_token。
|
||
返回 True 表示成功,False 表示失败。
|
||
"""
|
||
with self._lock:
|
||
return self._do_refresh()
|
||
|
||
def get_headers(self) -> Dict[str, str]:
|
||
"""
|
||
获取带 Authorization 的请求头。
|
||
自动检查 token 有效性,必要时自动刷新。
|
||
|
||
Returns:
|
||
{"Authorization": "Bearer <access_token>", ...}
|
||
"""
|
||
self._ensure_token_valid()
|
||
headers = {}
|
||
if self._token and self._token.access_token:
|
||
headers["Authorization"] = f"Bearer {self._token.access_token}"
|
||
return headers
|
||
|
||
def get_access_token(self) -> str:
|
||
"""获取当前有效的 access_token(必要时自动刷新)"""
|
||
self._ensure_token_valid()
|
||
return self._token.access_token if self._token else ""
|
||
|
||
def get_drive_id(self) -> str:
|
||
"""获取默认 drive_id"""
|
||
self._ensure_token_valid()
|
||
return self._token.drive_id if self._token else ""
|
||
|
||
def get_sbox_drive_id(self) -> str:
|
||
"""获取保险箱 drive_id"""
|
||
self._ensure_token_valid()
|
||
return self._token.default_sbox_drive_id if self._token else ""
|
||
|
||
def validate(self) -> bool:
|
||
"""
|
||
验证 refresh_token 是否有效。
|
||
要求 refresh_token 长度 >= 20,且能成功换取 access_token。
|
||
"""
|
||
if not self._refresh_token or len(self._refresh_token) < 20:
|
||
logger.warning("[AliyunCredential] refresh_token 长度不足 20,验证失败")
|
||
return False
|
||
return self.refresh()
|
||
|
||
@property
|
||
def refresh_token(self) -> str:
|
||
return self._refresh_token
|
||
|
||
@refresh_token.setter
|
||
def refresh_token(self, value: str):
|
||
"""更新 refresh_token(通常在 API 返回新 refresh_token 后调用)"""
|
||
self._refresh_token = value.strip()
|
||
# 清除旧缓存,下次请求自动刷新
|
||
with self._lock:
|
||
self._token = None
|
||
|
||
# ─── 内部方法 ──────────────────────────────────────────
|
||
|
||
def _ensure_token_valid(self):
|
||
"""确保 token 有效(过期则自动刷新)"""
|
||
if self._token is None or self._token.is_expired:
|
||
self.refresh()
|
||
|
||
def _do_refresh(self) -> bool:
|
||
"""实际执行 token 刷新"""
|
||
if not self._refresh_token:
|
||
logger.error("[AliyunCredential] 没有 refresh_token,无法刷新")
|
||
return False
|
||
|
||
try:
|
||
resp = self._session.post(
|
||
TOKEN_REFRESH_URL,
|
||
json={"refresh_token": self._refresh_token},
|
||
timeout=30,
|
||
)
|
||
data = resp.json()
|
||
|
||
if resp.status_code != 200 or "access_token" not in data:
|
||
code = data.get("code", "Unknown")
|
||
message = data.get("message", "")
|
||
logger.error(
|
||
f"[AliyunCredential] 刷新 token 失败: "
|
||
f"HTTP {resp.status_code} code={code} msg={message}"
|
||
)
|
||
return False
|
||
|
||
# 解析响应
|
||
access_token = data.get("access_token", "")
|
||
expires_in = int(data.get("expires_in", 7200))
|
||
new_refresh = data.get("refresh_token", self._refresh_token)
|
||
|
||
self._token = TokenInfo(
|
||
access_token=access_token,
|
||
refresh_token=new_refresh,
|
||
expires_at=time.time() + expires_in,
|
||
drive_id=str(data.get("default_drive_id", "")),
|
||
user_id=str(data.get("user_id", "")),
|
||
nick_name=str(data.get("nick_name", "")),
|
||
default_sbox_drive_id=str(data.get("default_sbox_drive_id", "")),
|
||
)
|
||
|
||
# 更新 refresh_token(服务端可能下发新的)
|
||
if new_refresh != self._refresh_token:
|
||
logger.info(
|
||
"[AliyunCredential] refresh_token 已轮换,新旧前缀: "
|
||
f"{self._refresh_token[:8]}... → {new_refresh[:8]}..."
|
||
)
|
||
self._refresh_token = new_refresh
|
||
|
||
logger.info(
|
||
f"[AliyunCredential] Token 刷新成功 "
|
||
f"(user={self._token.nick_name}, "
|
||
f"expires_in={expires_in}s, "
|
||
f"drive_id={self._token.drive_id[:8]}...)"
|
||
)
|
||
return True
|
||
|
||
except requests.RequestException as e:
|
||
logger.error(f"[AliyunCredential] 刷新 token 网络异常: {e}")
|
||
return False
|
||
except Exception as e:
|
||
logger.exception(f"[AliyunCredential] 刷新 token 未知异常: {e}")
|
||
return False
|
||
|
||
def to_dict(self) -> dict:
|
||
"""导出当前状态(用于持久化)"""
|
||
self._ensure_token_valid()
|
||
return {
|
||
"refresh_token": self._refresh_token,
|
||
"access_token": self._token.access_token if self._token else "",
|
||
"expires_at": self._token.expires_at if self._token else 0,
|
||
"drive_id": self._token.drive_id if self._token else "",
|
||
"user_id": self._token.user_id if self._token else "",
|
||
"nick_name": self._token.nick_name if self._token else "",
|
||
}
|