""" CloudSearch Transfer — UC网盘清理模块 v1.0.0 提供文件删除和广告过滤功能。API 与夸克相同,仅域名不同。 """ from __future__ import annotations import logging from typing import Any, Dict, List import requests from .credential import UcCredentialManager logger = logging.getLogger(__name__) # ─── UC API ───────────────────────────────────────────────────────── UC_API_BASE = "https://pc-api.uc.cn" UC_FILE_API = f"{UC_API_BASE}/1/clouddrive/file" class UcCleanup: """UC 网盘文件清理器。 提供批量删除文件和广告文件过滤功能。 Attributes: credential: UC 凭证管理器。 session: 复用的 requests.Session。 timeout: HTTP 请求超时秒数。 """ def __init__( self, credential: UcCredentialManager, timeout: int = 30, ) -> None: """初始化清理器。 Args: credential: 有效的 UC 凭证管理器。 timeout: HTTP 请求超时秒数。 """ self.credential: UcCredentialManager = credential self.timeout: int = timeout self.session: requests.Session = requests.Session() def delete_files(self, file_ids: List[str]) -> bool: """批量删除文件(回收站方式)。 POST /1/clouddrive/file/delete Body: { "action_type": 2, "filelist": ["", "", ...] } action_type=1 表示彻底删除,action_type=2 表示移入回收站。 Args: file_ids: 要删除的文件 ID 列表。 Returns: True 表示删除请求已提交成功,False 表示失败。 Raises: RuntimeError: HTTP 请求错误。 """ if not file_ids: logger.warning("[UcCleanup] delete_files called with empty list") return True url: str = f"{UC_FILE_API}/delete" body: Dict[str, Any] = { "action_type": 2, # 2=回收站, 1=彻底删除 "filelist": file_ids, } headers = self.credential.get_headers() headers.setdefault("Content-Type", "application/json") logger.info("[UcCleanup] Deleting %d files: %s", len(file_ids), file_ids) try: resp = self.session.post( url, json=body, headers=headers, timeout=self.timeout ) resp.raise_for_status() except requests.RequestException as exc: raise RuntimeError(f"删除文件失败: {exc}") from exc data: Dict[str, Any] = resp.json() status: int = data.get("status", -1) if status != 0 and data.get("code") not in (0, None): logger.error( "[UcCleanup] Delete returned error: status=%s, message=%s", status, data.get("message"), ) return False logger.info("[UcCleanup] Delete succeeded for %d files", len(file_ids)) return True def delete_files_permanent(self, file_ids: List[str]) -> bool: """彻底删除文件(不从回收站恢复)。 与 delete_files 类似,但 action_type=1。 Args: file_ids: 要彻底删除的文件 ID 列表。 Returns: True 表示删除请求已提交成功。 """ if not file_ids: return True url: str = f"{UC_FILE_API}/delete" body: Dict[str, Any] = { "action_type": 1, # 1=彻底删除 "filelist": file_ids, } headers = self.credential.get_headers() headers.setdefault("Content-Type", "application/json") logger.info("[UcCleanup] Permanently deleting %d files", len(file_ids)) try: resp = self.session.post( url, json=body, headers=headers, timeout=self.timeout ) resp.raise_for_status() except requests.RequestException as exc: raise RuntimeError(f"彻底删除失败: {exc}") from exc data: Dict[str, Any] = resp.json() return data.get("status") == 0 or data.get("code") in (0, None) @staticmethod def filter_ads( files: List[Dict[str, Any]], banned_keywords: List[str], ) -> List[Dict[str, Any]]: """按关键词过滤文件列表中的广告文件。 遍历文件列表,剔除文件名中包含任一 banned_keywords 的文件。 匹配方式:不区分大小写的子串匹配。 Args: files: 文件信息字典列表,每个字典需包含 "name" 字段。 banned_keywords: 被禁关键词列表(匹配不区分大小写)。 Returns: 过滤后的文件信息列表。 """ if not banned_keywords: return files filtered: List[Dict[str, Any]] = [] removed_count: int = 0 for f in files: name: str = f.get("name", "") name_lower: str = str(name).lower() if any(keyword.lower() in name_lower for keyword in banned_keywords): logger.info("[UcCleanup] Filtered ad file: '%s'", name) removed_count += 1 continue filtered.append(f) if removed_count > 0: logger.info( "[UcCleanup] Ad filter removed %d/%d files", removed_count, len(files) ) return filtered @staticmethod def filter_ad_ids( file_ids: List[str], file_names: List[str], banned_keywords: List[str], ) -> List[str]: """按关键词过滤文件 ID 列表。 根据 file_names 判断是否为广告,返回对应的 file_ids。 Args: file_ids: 文件 ID 列表。 file_names: 与 file_ids 一一对应的文件名列表。 banned_keywords: 被禁关键词列表。 Returns: 过滤后的 file_ids 列表。 """ if not banned_keywords or len(file_ids) != len(file_names): return file_ids filtered_ids: List[str] = [] for fid, name in zip(file_ids, file_names): name_lower: str = str(name).lower() if any(kw.lower() in name_lower for kw in banned_keywords): logger.info("[UcCleanup] Filtered ad file: '%s' (id=%s)", name, fid) continue filtered_ids.append(fid) return filtered_ids def close(self) -> None: """关闭 HTTP 会话。""" self.session.close() def __enter__(self) -> "UcCleanup": return self def __exit__(self, *args: Any) -> None: self.close()