Files
admin 83cbfaf03f v0.2.7: 修复Redis连接 + 启动管理后台
- 修复Redis认证 (配置密码)
- 启动Python管理后台 (端口9531, 15个功能开关)
- 统一版本号 0.2.7
- 更新docker-compose.yml (镜像版本/Redis URL/Admin服务)
2026-05-17 02:22:18 +08:00

102 lines
3.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
百度网盘凭证管理器 — bdstoken 获取与校验
参考 cloud-auto-save 的 BaiduNetDisk.cookie 机制
"""
import logging
import requests
from ...errors import TransferError, TransferErrorCode
logger = logging.getLogger(__name__)
# 百度网盘 API 基础 URL
BAIDU_PAN_API = "https://pan.baidu.com"
class BaiduCredentialManager:
"""百度网盘 Cookie 凭证 + bdstoken 管理
百度网盘的大多数受保护 API 都需要 bdstoken 参数,
该 token 通过 API 获取并缓存在实例中。
"""
def __init__(self, cookie: str, session: requests.Session):
"""
Args:
cookie: 完整的百度 Cookie 字符串
session: 共享的 requests.Session继承 User-Agent 等 headers
"""
self.cookie = cookie
self.session = session
self._bdstoken: str = ""
# ─── 公开方法 ──────────────────────────────────────────
def validate(self) -> bool:
"""校验 Cookie 是否有效:长度 >= 50 视为合格"""
return bool(self.cookie and len(self.cookie.strip()) >= 50)
def get_bdstoken(self, force_refresh: bool = False) -> str:
"""
获取 bdstoken首次调用会请求 API 获取并缓存。
API: GET /api/gettemplatevariable?fields=["bdstoken"]
Raises:
TransferError: 获取失败 (BAIDU_BDSTOKEN_FAIL)
"""
if self._bdstoken and not force_refresh:
return self._bdstoken
url = f"{BAIDU_PAN_API}/api/gettemplatevariable"
params = {"fields": '["bdstoken"]'}
headers = self.get_headers()
try:
resp = self.session.get(url, params=params, headers=headers, timeout=15)
resp.raise_for_status()
data = resp.json()
except Exception as e:
logger.error(f"获取 bdstoken 网络异常: {e}")
raise TransferError(
TransferErrorCode.BAIDU_BDSTOKEN_FAIL,
message=f"百度 bdstoken 请求失败: {e}",
platform="baidu",
)
errno = data.get("errno", -1)
if errno != 0:
logger.error(f"获取 bdstoken API 返回 errno={errno}: {data}")
raise TransferError(
TransferErrorCode.BAIDU_BDSTOKEN_FAIL,
message=f"百度 bdstoken 获取失败 (errno={errno})",
platform="baidu",
details={"response": data},
)
self._bdstoken = data.get("result", {}).get("bdstoken", "")
if not self._bdstoken:
raise TransferError(
TransferErrorCode.BAIDU_BDSTOKEN_FAIL,
message="百度 bdstoken 为空",
platform="baidu",
)
logger.info("bdstoken 获取成功")
return self._bdstoken
def get_headers(self) -> dict:
"""构建携带 Cookie 的请求头(继承 session 默认 headers 外的额外字段)"""
headers = {
"Cookie": self.cookie,
"Referer": "https://pan.baidu.com/",
"Origin": "https://pan.baidu.com",
}
return headers
def invalidate_bdstoken(self):
"""使缓存失效,下次调用 get_bdstoken 会重新获取"""
self._bdstoken = ""
logger.info("bdstoken 缓存已失效")