""" CloudSearch Telegram Bot v1.0.0 提供: /search /subscribe /hot /help """ import os import json import time import logging import sqlite3 from typing import Optional import requests from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup from telegram.ext import ( Application, CommandHandler, MessageHandler, CallbackQueryHandler, ContextTypes, filters ) logging.basicConfig(level=logging.INFO) logger = logging.getLogger("tgbot") class CloudSearchBot: def __init__(self, token: str, api_base: str, db_path: str = "/data/bot.db"): self.token = token self.api_base = api_base.rstrip("/") self.db = sqlite3.connect(db_path, check_same_thread=False) self._init_db() def _init_db(self): self.db.execute(""" CREATE TABLE IF NOT EXISTS subscriptions ( id INTEGER PRIMARY KEY AUTOINCREMENT, chat_id INTEGER NOT NULL, keyword TEXT NOT NULL, last_check TEXT, created_at TEXT DEFAULT (datetime('now', 'localtime')), UNIQUE(chat_id, keyword) ) """) self.db.commit() async def start(self, update: Update, context: ContextTypes.DEFAULT_TYPE): await update.message.reply_text( "🔍 *CloudSearch Bot* v1.0\n\n" "命令:\n" "/search 关键词 — 搜索网盘资源\n" "/hot — 热门搜索\n" "/subscribe 关键词 — 订阅关键词\n" "/unsub 关键词 — 取消订阅\n" "/mysubs — 我的订阅\n" "/help — 帮助", parse_mode="Markdown" ) async def search(self, update: Update, context: ContextTypes.DEFAULT_TYPE): keyword = " ".join(context.args) if context.args else "" if not keyword: await update.message.reply_text("用法: /search 流浪地球2") return msg = await update.message.reply_text(f"🔎 搜索中: *{keyword}*...", parse_mode="Markdown") try: resp = requests.post( f"{self.api_base}/api/query", json={"q": keyword}, timeout=15 ) # Parse NDJSON response results = [] content_info = None for line in resp.text.strip().split("\n"): try: data = json.loads(line) if data.get("type") == "result": results.append(data) elif data.get("type") == "stats": content_info = data.get("content_info") except json.JSONDecodeError: continue if not results: await msg.edit_text(f"😞 未找到「{keyword}」的相关资源") return # Format top 5 results text = f"🔎 *{keyword}* — {len(results)} 个结果\n\n" for i, r in enumerate(results[:5]): title = (r.get("title") or r.get("content", ""))[:40] cloud = r.get("cloud_type", "?").upper() url = r.get("share_url", "") pwd = r.get("password", "") pwd_str = f" 🔑`{pwd}`" if pwd else "" text += f"{i+1}. [{cloud}] [{title}]({url}){pwd_str}\n" keyboard = [[ InlineKeyboardButton("🌐 查看更多", url=f"{self.api_base}/?q={keyword}") ]] await msg.edit_text( text, parse_mode="Markdown", disable_web_page_preview=True, reply_markup=InlineKeyboardMarkup(keyboard) ) except Exception as e: await msg.edit_text(f"❌ 搜索失败: {e}") async def subscribe(self, update: Update, context: ContextTypes.DEFAULT_TYPE): keyword = " ".join(context.args) if context.args else "" if not keyword: await update.message.reply_text("用法: /subscribe 流浪地球") return try: self.db.execute( "INSERT OR IGNORE INTO subscriptions (chat_id, keyword) VALUES (?, ?)", (update.effective_chat.id, keyword) ) self.db.commit() await update.message.reply_text(f"✅ 已订阅: *{keyword}*", parse_mode="Markdown") except Exception as e: await update.message.reply_text(f"❌ 订阅失败: {e}") async def unsub(self, update: Update, context: ContextTypes.DEFAULT_TYPE): keyword = " ".join(context.args) if context.args else "" self.db.execute( "DELETE FROM subscriptions WHERE chat_id=? AND keyword=?", (update.effective_chat.id, keyword) ) self.db.commit() await update.message.reply_text(f"🗑 已取消: *{keyword}*", parse_mode="Markdown") async def mysubs(self, update: Update, context: ContextTypes.DEFAULT_TYPE): subs = self.db.execute( "SELECT keyword, created_at FROM subscriptions WHERE chat_id=? ORDER BY created_at DESC LIMIT 20", (update.effective_chat.id,) ).fetchall() if not subs: await update.message.reply_text("📭 暂无订阅") return text = "📋 *我的订阅*\n" + "\n".join(f"• {s[0]}" for s in subs) await update.message.reply_text(text, parse_mode="Markdown") async def hot(self, update: Update, context: ContextTypes.DEFAULT_TYPE): try: resp = requests.get(f"{self.api_base}/api/rankings/hot?limit=10", timeout=10) data = resp.json() keywords = data if isinstance(data, list) else data.get("keywords", []) text = "🔥 *热门搜索*\n" + "\n".join( f"{i+1}. {kw.get('keyword', str(kw))}" for i, kw in enumerate(keywords[:10]) ) except: text = "🔥 获取热门失败,请稍后重试" await update.message.reply_text(text, parse_mode="Markdown") async def help_cmd(self, update: Update, context: ContextTypes.DEFAULT_TYPE): await self.start(update, context) def run(self): app = Application.builder().token(self.token).build() app.add_handler(CommandHandler("start", self.start)) app.add_handler(CommandHandler("search", self.search)) app.add_handler(CommandHandler("s", self.search)) app.add_handler(CommandHandler("hot", self.hot)) app.add_handler(CommandHandler("subscribe", self.subscribe)) app.add_handler(CommandHandler("sub", self.subscribe)) app.add_handler(CommandHandler("unsub", self.unsub)) app.add_handler(CommandHandler("mysubs", self.mysubs)) app.add_handler(CommandHandler("help", self.help_cmd)) logger.info("Bot starting...") app.run_polling() if __name__ == "__main__": token = os.getenv("TG_BOT_TOKEN", "") api = os.getenv("CLOUDSEARCH_API", "http://127.0.0.1:9527") bot = CloudSearchBot(token, api) bot.run()