""" 阿里云盘转存模块 v1.0.0 实现 4 步批量转存流程:获取分享详情 → 获取分享令牌 → 批量复制文件 → 创建新分享 """ import re import time import logging from typing import List, Dict, Tuple, Optional import requests from .credential import AliyunCredentialManager, API_HOST logger = logging.getLogger(__name__) # ─── API 端点 ────────────────────────────────────────────── # ① 获取分享详情(匿名) SHARE_INFO_URL = f"{API_HOST}/adrive/v3/share_link/get_share_by_anonymous" # ② 获取分享令牌(需 Auth) SHARE_TOKEN_URL = f"{API_HOST}/v2/share_link/get_share_token" # ③ 批量操作(复制文件) BATCH_URL = f"{API_HOST}/adrive/v4/batch" # ④ 创建分享 CREATE_SHARE_URL = f"{API_HOST}/adrive/v2/share_link/create" # ─── URL 模式 ────────────────────────────────────────────── # 匹配 aliyundrive.com/s/ URL_PATTERN = re.compile(r'aliyundrive\.com/s/([a-zA-Z0-9]+)') # ─── 默认请求头 ──────────────────────────────────────────── DEFAULT_HEADERS = { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/135.0.0.0 Safari/537.36" ), "Accept": "application/json, text/plain, */*", "Content-Type": "application/json", "Referer": "https://aliyundrive.com", } class AliyunTransfer: """ 阿里云盘批量转存 四步流程: ① 获取分享详情(匿名):POST /adrive/v3/share_link/get_share_by_anonymous ② 获取分享令牌(Auth):POST /v2/share_link/get_share_token ③ 批量复制文件:POST /adrive/v4/batch (X-Share-Token 头) ④ 创建新分享:POST /adrive/v2/share_link/create 用法: credential = AliyunCredentialManager(refresh_token="xxx") transfer = AliyunTransfer(credential, drive_id="12345") result = transfer.transfer( share_url="https://www.aliyundrive.com/s/abc123", share_password="", to_parent_file_id="root", ) """ def __init__( self, credential: AliyunCredentialManager, drive_id: str = "", to_parent_file_id: str = "root", request_timeout: int = 30, ): self.credential = credential self.drive_id = drive_id or credential.get_drive_id() self.to_parent_file_id = to_parent_file_id self.request_timeout = request_timeout self._session = requests.Session() self._session.headers.update(DEFAULT_HEADERS) # ─── 公开 API ────────────────────────────────────────── def transfer( self, share_url: str, share_password: str = "", to_parent_file_id: str = None, new_share_password: str = "", expiration: str = "", ) -> Dict: """ 执行完整的转存流程。 Args: share_url: 阿里云盘分享链接(如 https://www.aliyundrive.com/s/abc123) share_password: 分享提取码(如有) to_parent_file_id: 转存目标目录 file_id,默认用初始化时的值 new_share_password: 新分享的密码(空=无密码) expiration: 分享有效期,空=永久 Returns: { "success": True/False, "share_name": "...", "new_file_ids": ["id1", "id2"], "new_share_url": "https://...", "new_share_password": "...", "error": None or "...", } """ parent_id = to_parent_file_id or self.to_parent_file_id try: # ① 获取分享详情 share_id = self._extract_share_id(share_url) if not share_id: return self._error("无法从 URL 提取分享 ID") share_info = self._get_share_info(share_id) if not share_info: return self._error("分享不存在或已失效") share_name = share_info.get("share_name", "") file_infos = share_info.get("file_infos", []) if not file_infos: return self._error("分享内容为空") logger.info( f"[AliyunTransfer] 分享详情获取成功: " f"name={share_name}, files={len(file_infos)}" ) # ② 获取分享令牌 share_token = self._get_share_token(share_id, share_password) if not share_token: return self._error("获取分享令牌失败(可能需要提取码)") logger.info(f"[AliyunTransfer] 分享令牌获取成功") # ③ 批量复制文件 file_ids = [fi.get("file_id", "") for fi in file_infos if fi.get("file_id")] if not file_ids: return self._error("无法提取文件 ID") new_file_ids = self._batch_copy(share_id, share_token, file_ids, parent_id) if not new_file_ids: return self._error("批量转存失败,请检查权限或容量") logger.info(f"[AliyunTransfer] 批量转存成功: {len(new_file_ids)} 个文件") # ④ 创建新分享 share_result = self._create_share( new_file_ids, share_password=new_share_password, expiration=expiration, ) new_share_url = share_result.get("share_url", "") new_share_pwd = share_result.get("share_pwd", new_share_password) logger.info(f"[AliyunTransfer] 新分享创建成功: {new_share_url}") return { "success": True, "share_name": share_name, "share_id": share_id, "new_file_ids": new_file_ids, "new_share_url": new_share_url, "new_share_password": new_share_pwd, "error": None, } except Exception as e: logger.exception(f"[AliyunTransfer] 转存异常: {e}") return self._error(str(e)) def get_share_info(self, share_url: str) -> Optional[Dict]: """ 仅获取分享详情(不转存)。 Returns: {"share_name": "...", "file_infos": [...]} or None """ share_id = self._extract_share_id(share_url) if not share_id: logger.error(f"[AliyunTransfer] 无法从 URL 提取 share_id: {share_url}") return None return self._get_share_info(share_id) # ─── 步骤 ①:获取分享详情 ─────────────────────────────── def _get_share_info(self, share_id: str) -> Optional[Dict]: """ POST /adrive/v3/share_link/get_share_by_anonymous 请求体: {"share_id": "..."} 响应: {"share_name": "...", "file_infos": [{"file_id": "...", "name": "...", ...}]} """ try: resp = self._session.post( SHARE_INFO_URL, json={"share_id": share_id}, timeout=self.request_timeout, ) data = resp.json() if resp.status_code != 200: logger.error( f"[AliyunTransfer] 获取分享详情失败: " f"HTTP {resp.status_code}, {data}" ) return None # 检查业务错误码 code = data.get("code", "") if code: logger.error( f"[AliyunTransfer] 获取分享详情 API 错误: " f"code={code}, message={data.get('message', '')}" ) return None return { "share_name": data.get("share_name", ""), "share_title": data.get("share_title", data.get("share_name", "")), "file_infos": data.get("file_infos", []), "expiration": data.get("expiration", ""), "creator_name": data.get("creator_name", ""), "creator_id": data.get("creator_id", ""), } except requests.RequestException as e: logger.error(f"[AliyunTransfer] 获取分享详情网络异常: {e}") return None except Exception as e: logger.exception(f"[AliyunTransfer] 获取分享详情异常: {e}") return None # ─── 步骤 ②:获取分享令牌 ──────────────────────────────── def _get_share_token(self, share_id: str, share_password: str = "") -> Optional[str]: """ POST /v2/share_link/get_share_token 请求体: {"share_id": "..."} 需要 Auth 头 响应: {"share_token": "..."} """ try: headers = self.credential.get_headers() resp = self._session.post( SHARE_TOKEN_URL, json={ "share_id": share_id, "share_pwd": share_password, }, headers=headers, timeout=self.request_timeout, ) data = resp.json() if resp.status_code != 200: logger.error( f"[AliyunTransfer] 获取分享令牌失败: " f"HTTP {resp.status_code}, {data}" ) return None code = data.get("code", "") if code: logger.error( f"[AliyunTransfer] 获取分享令牌 API 错误: " f"code={code}, message={data.get('message', '')}" ) return None share_token = data.get("share_token", "") if not share_token: logger.error("[AliyunTransfer] 响应中缺少 share_token") return None return share_token except requests.RequestException as e: logger.error(f"[AliyunTransfer] 获取分享令牌网络异常: {e}") return None except Exception as e: logger.exception(f"[AliyunTransfer] 获取分享令牌异常: {e}") return None # ─── 步骤 ③:批量复制文件 ──────────────────────────────── def _batch_copy( self, share_id: str, share_token: str, file_ids: List[str], to_parent_file_id: str = "root", ) -> List[str]: """ POST /adrive/v4/batch 头: X-Share-Token: 请求体: { "requests": [ { "url": "/file/copy", "body": { "file_id": "...", "share_id": "...", "to_drive_id": "...", "to_parent_file_id": "..." } } ] } 响应: {"responses": [{"status": 200, "body": {"file_id": "new_id"}}, ...]} 返回新的 file_id 列表 """ drive_id = self.drive_id if not drive_id: drive_id = self.credential.get_drive_id() if not drive_id: logger.error("[AliyunTransfer] 缺少 drive_id,无法转存") return [] # 构建批量请求体 requests_list = [] for fid in file_ids: requests_list.append({ "url": "/file/copy", "body": { "file_id": fid, "share_id": share_id, "to_drive_id": drive_id, "to_parent_file_id": to_parent_file_id, }, "headers": {"Content-Type": "application/json"}, "id": fid, "method": "POST", }) try: headers = self.credential.get_headers() headers["X-Share-Token"] = share_token resp = self._session.post( BATCH_URL, json={"requests": requests_list, "resource": "file"}, headers=headers, timeout=self.request_timeout * 2, # 批量操作可能较慢 ) data = resp.json() if resp.status_code != 200: logger.error( f"[AliyunTransfer] 批量复制失败: " f"HTTP {resp.status_code}, {data}" ) return [] code = data.get("code", "") if code: logger.error( f"[AliyunTransfer] 批量复制 API 错误: " f"code={code}, message={data.get('message', '')}" ) return [] # 提取新 file_id new_ids = [] responses = data.get("responses", []) for item in responses: status = item.get("status", 0) body = item.get("body", {}) if status in (200, 201, 202): new_fid = body.get("file_id", "") if new_fid: new_ids.append(new_fid) else: logger.warning( f"[AliyunTransfer] 单个文件复制失败: " f"id={item.get('id')}, status={status}, body={body}" ) if not new_ids: logger.error("[AliyunTransfer] 所有文件复制均失败") elif len(new_ids) < len(file_ids): logger.warning( f"[AliyunTransfer] 部分文件复制成功: " f"{len(new_ids)}/{len(file_ids)}" ) return new_ids except requests.RequestException as e: logger.error(f"[AliyunTransfer] 批量复制网络异常: {e}") return [] except Exception as e: logger.exception(f"[AliyunTransfer] 批量复制异常: {e}") return [] # ─── 步骤 ④:创建新分享 ────────────────────────────────── def _create_share( self, file_ids: List[str], share_password: str = "", expiration: str = "", ) -> Dict: """ POST /adrive/v2/share_link/create 请求体: {"drive_id": "...", "file_id_list": [...], "share_pwd": "...", "expiration": "..."} 响应: {"share_url": "...", "share_id": "..."} """ drive_id = self.drive_id or self.credential.get_drive_id() if not drive_id: logger.error("[AliyunTransfer] 缺少 drive_id,无法创建分享") return {"share_url": "", "share_pwd": ""} body = { "drive_id": drive_id, "file_id_list": file_ids, "share_pwd": share_password or "", "expiration": expiration or "", } try: headers = self.credential.get_headers() resp = self._session.post( CREATE_SHARE_URL, json=body, headers=headers, timeout=self.request_timeout, ) data = resp.json() if resp.status_code != 200: logger.error( f"[AliyunTransfer] 创建分享失败: " f"HTTP {resp.status_code}, {data}" ) return {"share_url": "", "share_pwd": share_password} code = data.get("code", "") if code: logger.error( f"[AliyunTransfer] 创建分享 API 错误: " f"code={code}, message={data.get('message', '')}" ) return {"share_url": "", "share_pwd": share_password} share_url = data.get("share_url", "") share_pwd = data.get("share_pwd", share_password) return {"share_url": share_url, "share_pwd": share_pwd} except requests.RequestException as e: logger.error(f"[AliyunTransfer] 创建分享网络异常: {e}") return {"share_url": "", "share_pwd": share_password} except Exception as e: logger.exception(f"[AliyunTransfer] 创建分享异常: {e}") return {"share_url": "", "share_pwd": share_password} # ─── URL 解析 ────────────────────────────────────────── @staticmethod def _extract_share_id(url: str) -> Optional[str]: """从阿里云盘分享 URL 中提取 share_id""" m = URL_PATTERN.search(url) if m: return m.group(1) return None @staticmethod def extract_share_id_static(url: str) -> Optional[str]: """静态方法:提取 share_id""" return AliyunTransfer._extract_share_id(url) # ─── 工具方法 ────────────────────────────────────────── def _error(self, message: str) -> Dict: """构造错误返回""" return { "success": False, "share_name": "", "share_id": "", "new_file_ids": [], "new_share_url": "", "new_share_password": "", "error": message, }