Files
admin 83cbfaf03f v0.2.7: 修复Redis连接 + 启动管理后台
- 修复Redis认证 (配置密码)
- 启动Python管理后台 (端口9531, 15个功能开关)
- 统一版本号 0.2.7
- 更新docker-compose.yml (镜像版本/Redis URL/Admin服务)
2026-05-17 02:22:18 +08:00

217 lines
7.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
阿里云盘凭证管理器 v1.0.0
refresh_token → access_token 刷新 + 自动缓存 + 过期前自动刷新
"""
import time
import logging
import threading
from typing import Dict, Optional
from dataclasses import dataclass, field
import requests
logger = logging.getLogger(__name__)
# ─── 常量 ──────────────────────────────────────────────────
API_HOST = "https://api.aliyundrive.com"
TOKEN_REFRESH_URL = f"{API_HOST}/token/refresh"
DEFAULT_HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/135.0.0.0 Safari/537.36"
),
"Accept": "application/json, text/plain, */*",
"Content-Type": "application/json",
}
@dataclass
class TokenInfo:
"""缓存的 Token 信息"""
access_token: str = ""
refresh_token: str = ""
expires_at: float = 0.0 # Unix 时间戳
drive_id: str = ""
user_id: str = ""
nick_name: str = ""
default_sbox_drive_id: str = ""
@property
def is_expired(self) -> bool:
"""检查 access_token 是否已过期(提前 60s 视为过期)"""
return time.time() >= (self.expires_at - 60)
@property
def is_valid(self) -> bool:
return bool(self.access_token) and not self.is_expired
class AliyunCredentialManager:
"""
阿里云盘凭证管理器
职责:
- 使用 refresh_token 换取 access_token
- 缓存 access_token / expires_at / drive_id
- 过期前自动刷新(提前 60s
- 线程安全
用法:
mgr = AliyunCredentialManager(refresh_token="xxx")
mgr.refresh() # 强制刷新
headers = mgr.get_headers() # 获取带 Auth 的请求头
is_ok = mgr.validate() # 验证 refresh_token 有效性
"""
def __init__(self, refresh_token: str = ""):
self._refresh_token = refresh_token.strip()
self._token: Optional[TokenInfo] = None
self._lock = threading.Lock()
self._session = requests.Session()
self._session.headers.update(DEFAULT_HEADERS)
# ─── 公开 API ──────────────────────────────────────────
def refresh(self) -> bool:
"""
使用 refresh_token 换取 access_token。
返回 True 表示成功False 表示失败。
"""
with self._lock:
return self._do_refresh()
def get_headers(self) -> Dict[str, str]:
"""
获取带 Authorization 的请求头。
自动检查 token 有效性,必要时自动刷新。
Returns:
{"Authorization": "Bearer <access_token>", ...}
"""
self._ensure_token_valid()
headers = {}
if self._token and self._token.access_token:
headers["Authorization"] = f"Bearer {self._token.access_token}"
return headers
def get_access_token(self) -> str:
"""获取当前有效的 access_token必要时自动刷新"""
self._ensure_token_valid()
return self._token.access_token if self._token else ""
def get_drive_id(self) -> str:
"""获取默认 drive_id"""
self._ensure_token_valid()
return self._token.drive_id if self._token else ""
def get_sbox_drive_id(self) -> str:
"""获取保险箱 drive_id"""
self._ensure_token_valid()
return self._token.default_sbox_drive_id if self._token else ""
def validate(self) -> bool:
"""
验证 refresh_token 是否有效。
要求 refresh_token 长度 >= 20且能成功换取 access_token。
"""
if not self._refresh_token or len(self._refresh_token) < 20:
logger.warning("[AliyunCredential] refresh_token 长度不足 20验证失败")
return False
return self.refresh()
@property
def refresh_token(self) -> str:
return self._refresh_token
@refresh_token.setter
def refresh_token(self, value: str):
"""更新 refresh_token通常在 API 返回新 refresh_token 后调用)"""
self._refresh_token = value.strip()
# 清除旧缓存,下次请求自动刷新
with self._lock:
self._token = None
# ─── 内部方法 ──────────────────────────────────────────
def _ensure_token_valid(self):
"""确保 token 有效(过期则自动刷新)"""
if self._token is None or self._token.is_expired:
self.refresh()
def _do_refresh(self) -> bool:
"""实际执行 token 刷新"""
if not self._refresh_token:
logger.error("[AliyunCredential] 没有 refresh_token无法刷新")
return False
try:
resp = self._session.post(
TOKEN_REFRESH_URL,
json={"refresh_token": self._refresh_token},
timeout=30,
)
data = resp.json()
if resp.status_code != 200 or "access_token" not in data:
code = data.get("code", "Unknown")
message = data.get("message", "")
logger.error(
f"[AliyunCredential] 刷新 token 失败: "
f"HTTP {resp.status_code} code={code} msg={message}"
)
return False
# 解析响应
access_token = data.get("access_token", "")
expires_in = int(data.get("expires_in", 7200))
new_refresh = data.get("refresh_token", self._refresh_token)
self._token = TokenInfo(
access_token=access_token,
refresh_token=new_refresh,
expires_at=time.time() + expires_in,
drive_id=str(data.get("default_drive_id", "")),
user_id=str(data.get("user_id", "")),
nick_name=str(data.get("nick_name", "")),
default_sbox_drive_id=str(data.get("default_sbox_drive_id", "")),
)
# 更新 refresh_token服务端可能下发新的
if new_refresh != self._refresh_token:
logger.info(
"[AliyunCredential] refresh_token 已轮换,新旧前缀: "
f"{self._refresh_token[:8]}... → {new_refresh[:8]}..."
)
self._refresh_token = new_refresh
logger.info(
f"[AliyunCredential] Token 刷新成功 "
f"(user={self._token.nick_name}, "
f"expires_in={expires_in}s, "
f"drive_id={self._token.drive_id[:8]}...)"
)
return True
except requests.RequestException as e:
logger.error(f"[AliyunCredential] 刷新 token 网络异常: {e}")
return False
except Exception as e:
logger.exception(f"[AliyunCredential] 刷新 token 未知异常: {e}")
return False
def to_dict(self) -> dict:
"""导出当前状态(用于持久化)"""
self._ensure_token_valid()
return {
"refresh_token": self._refresh_token,
"access_token": self._token.access_token if self._token else "",
"expires_at": self._token.expires_at if self._token else 0,
"drive_id": self._token.drive_id if self._token else "",
"user_id": self._token.user_id if self._token else "",
"nick_name": self._token.nick_name if self._token else "",
}