""" 阿里云盘凭证管理器 v1.0.0 refresh_token → access_token 刷新 + 自动缓存 + 过期前自动刷新 """ import time import logging import threading from typing import Dict, Optional from dataclasses import dataclass, field import requests logger = logging.getLogger(__name__) # ─── 常量 ────────────────────────────────────────────────── API_HOST = "https://api.aliyundrive.com" TOKEN_REFRESH_URL = f"{API_HOST}/token/refresh" DEFAULT_HEADERS = { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/135.0.0.0 Safari/537.36" ), "Accept": "application/json, text/plain, */*", "Content-Type": "application/json", } @dataclass class TokenInfo: """缓存的 Token 信息""" access_token: str = "" refresh_token: str = "" expires_at: float = 0.0 # Unix 时间戳 drive_id: str = "" user_id: str = "" nick_name: str = "" default_sbox_drive_id: str = "" @property def is_expired(self) -> bool: """检查 access_token 是否已过期(提前 60s 视为过期)""" return time.time() >= (self.expires_at - 60) @property def is_valid(self) -> bool: return bool(self.access_token) and not self.is_expired class AliyunCredentialManager: """ 阿里云盘凭证管理器 职责: - 使用 refresh_token 换取 access_token - 缓存 access_token / expires_at / drive_id - 过期前自动刷新(提前 60s) - 线程安全 用法: mgr = AliyunCredentialManager(refresh_token="xxx") mgr.refresh() # 强制刷新 headers = mgr.get_headers() # 获取带 Auth 的请求头 is_ok = mgr.validate() # 验证 refresh_token 有效性 """ def __init__(self, refresh_token: str = ""): self._refresh_token = refresh_token.strip() self._token: Optional[TokenInfo] = None self._lock = threading.Lock() self._session = requests.Session() self._session.headers.update(DEFAULT_HEADERS) # ─── 公开 API ────────────────────────────────────────── def refresh(self) -> bool: """ 使用 refresh_token 换取 access_token。 返回 True 表示成功,False 表示失败。 """ with self._lock: return self._do_refresh() def get_headers(self) -> Dict[str, str]: """ 获取带 Authorization 的请求头。 自动检查 token 有效性,必要时自动刷新。 Returns: {"Authorization": "Bearer ", ...} """ self._ensure_token_valid() headers = {} if self._token and self._token.access_token: headers["Authorization"] = f"Bearer {self._token.access_token}" return headers def get_access_token(self) -> str: """获取当前有效的 access_token(必要时自动刷新)""" self._ensure_token_valid() return self._token.access_token if self._token else "" def get_drive_id(self) -> str: """获取默认 drive_id""" self._ensure_token_valid() return self._token.drive_id if self._token else "" def get_sbox_drive_id(self) -> str: """获取保险箱 drive_id""" self._ensure_token_valid() return self._token.default_sbox_drive_id if self._token else "" def validate(self) -> bool: """ 验证 refresh_token 是否有效。 要求 refresh_token 长度 >= 20,且能成功换取 access_token。 """ if not self._refresh_token or len(self._refresh_token) < 20: logger.warning("[AliyunCredential] refresh_token 长度不足 20,验证失败") return False return self.refresh() @property def refresh_token(self) -> str: return self._refresh_token @refresh_token.setter def refresh_token(self, value: str): """更新 refresh_token(通常在 API 返回新 refresh_token 后调用)""" self._refresh_token = value.strip() # 清除旧缓存,下次请求自动刷新 with self._lock: self._token = None # ─── 内部方法 ────────────────────────────────────────── def _ensure_token_valid(self): """确保 token 有效(过期则自动刷新)""" if self._token is None or self._token.is_expired: self.refresh() def _do_refresh(self) -> bool: """实际执行 token 刷新""" if not self._refresh_token: logger.error("[AliyunCredential] 没有 refresh_token,无法刷新") return False try: resp = self._session.post( TOKEN_REFRESH_URL, json={"refresh_token": self._refresh_token}, timeout=30, ) data = resp.json() if resp.status_code != 200 or "access_token" not in data: code = data.get("code", "Unknown") message = data.get("message", "") logger.error( f"[AliyunCredential] 刷新 token 失败: " f"HTTP {resp.status_code} code={code} msg={message}" ) return False # 解析响应 access_token = data.get("access_token", "") expires_in = int(data.get("expires_in", 7200)) new_refresh = data.get("refresh_token", self._refresh_token) self._token = TokenInfo( access_token=access_token, refresh_token=new_refresh, expires_at=time.time() + expires_in, drive_id=str(data.get("default_drive_id", "")), user_id=str(data.get("user_id", "")), nick_name=str(data.get("nick_name", "")), default_sbox_drive_id=str(data.get("default_sbox_drive_id", "")), ) # 更新 refresh_token(服务端可能下发新的) if new_refresh != self._refresh_token: logger.info( "[AliyunCredential] refresh_token 已轮换,新旧前缀: " f"{self._refresh_token[:8]}... → {new_refresh[:8]}..." ) self._refresh_token = new_refresh logger.info( f"[AliyunCredential] Token 刷新成功 " f"(user={self._token.nick_name}, " f"expires_in={expires_in}s, " f"drive_id={self._token.drive_id[:8]}...)" ) return True except requests.RequestException as e: logger.error(f"[AliyunCredential] 刷新 token 网络异常: {e}") return False except Exception as e: logger.exception(f"[AliyunCredential] 刷新 token 未知异常: {e}") return False def to_dict(self) -> dict: """导出当前状态(用于持久化)""" self._ensure_token_valid() return { "refresh_token": self._refresh_token, "access_token": self._token.access_token if self._token else "", "expires_at": self._token.expires_at if self._token else 0, "drive_id": self._token.drive_id if self._token else "", "user_id": self._token.user_id if self._token else "", "nick_name": self._token.nick_name if self._token else "", }