Files
CloudSearch/cloudsearch_enrich/tg_bot.py
admin 83cbfaf03f v0.2.7: 修复Redis连接 + 启动管理后台
- 修复Redis认证 (配置密码)
- 启动Python管理后台 (端口9531, 15个功能开关)
- 统一版本号 0.2.7
- 更新docker-compose.yml (镜像版本/Redis URL/Admin服务)
2026-05-17 02:22:18 +08:00

184 lines
6.9 KiB
Python

"""
CloudSearch Telegram Bot v1.0.0
提供: /search /subscribe /hot /help
"""
import os
import json
import time
import logging
import sqlite3
from typing import Optional
import requests
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import (
Application, CommandHandler, MessageHandler,
CallbackQueryHandler, ContextTypes, filters
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("tgbot")
class CloudSearchBot:
def __init__(self, token: str, api_base: str, db_path: str = "/data/bot.db"):
self.token = token
self.api_base = api_base.rstrip("/")
self.db = sqlite3.connect(db_path, check_same_thread=False)
self._init_db()
def _init_db(self):
self.db.execute("""
CREATE TABLE IF NOT EXISTS subscriptions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chat_id INTEGER NOT NULL,
keyword TEXT NOT NULL,
last_check TEXT,
created_at TEXT DEFAULT (datetime('now', 'localtime')),
UNIQUE(chat_id, keyword)
)
""")
self.db.commit()
async def start(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text(
"🔍 *CloudSearch Bot* v1.0\n\n"
"命令:\n"
"/search 关键词 — 搜索网盘资源\n"
"/hot — 热门搜索\n"
"/subscribe 关键词 — 订阅关键词\n"
"/unsub 关键词 — 取消订阅\n"
"/mysubs — 我的订阅\n"
"/help — 帮助",
parse_mode="Markdown"
)
async def search(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
keyword = " ".join(context.args) if context.args else ""
if not keyword:
await update.message.reply_text("用法: /search 流浪地球2")
return
msg = await update.message.reply_text(f"🔎 搜索中: *{keyword}*...", parse_mode="Markdown")
try:
resp = requests.post(
f"{self.api_base}/api/query",
json={"q": keyword},
timeout=15
)
# Parse NDJSON response
results = []
content_info = None
for line in resp.text.strip().split("\n"):
try:
data = json.loads(line)
if data.get("type") == "result":
results.append(data)
elif data.get("type") == "stats":
content_info = data.get("content_info")
except json.JSONDecodeError:
continue
if not results:
await msg.edit_text(f"😞 未找到「{keyword}」的相关资源")
return
# Format top 5 results
text = f"🔎 *{keyword}* — {len(results)} 个结果\n\n"
for i, r in enumerate(results[:5]):
title = (r.get("title") or r.get("content", ""))[:40]
cloud = r.get("cloud_type", "?").upper()
url = r.get("share_url", "")
pwd = r.get("password", "")
pwd_str = f" 🔑`{pwd}`" if pwd else ""
text += f"{i+1}. [{cloud}] [{title}]({url}){pwd_str}\n"
keyboard = [[
InlineKeyboardButton("🌐 查看更多", url=f"{self.api_base}/?q={keyword}")
]]
await msg.edit_text(
text,
parse_mode="Markdown",
disable_web_page_preview=True,
reply_markup=InlineKeyboardMarkup(keyboard)
)
except Exception as e:
await msg.edit_text(f"❌ 搜索失败: {e}")
async def subscribe(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
keyword = " ".join(context.args) if context.args else ""
if not keyword:
await update.message.reply_text("用法: /subscribe 流浪地球")
return
try:
self.db.execute(
"INSERT OR IGNORE INTO subscriptions (chat_id, keyword) VALUES (?, ?)",
(update.effective_chat.id, keyword)
)
self.db.commit()
await update.message.reply_text(f"✅ 已订阅: *{keyword}*", parse_mode="Markdown")
except Exception as e:
await update.message.reply_text(f"❌ 订阅失败: {e}")
async def unsub(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
keyword = " ".join(context.args) if context.args else ""
self.db.execute(
"DELETE FROM subscriptions WHERE chat_id=? AND keyword=?",
(update.effective_chat.id, keyword)
)
self.db.commit()
await update.message.reply_text(f"🗑 已取消: *{keyword}*", parse_mode="Markdown")
async def mysubs(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
subs = self.db.execute(
"SELECT keyword, created_at FROM subscriptions WHERE chat_id=? ORDER BY created_at DESC LIMIT 20",
(update.effective_chat.id,)
).fetchall()
if not subs:
await update.message.reply_text("📭 暂无订阅")
return
text = "📋 *我的订阅*\n" + "\n".join(f"{s[0]}" for s in subs)
await update.message.reply_text(text, parse_mode="Markdown")
async def hot(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
try:
resp = requests.get(f"{self.api_base}/api/rankings/hot?limit=10", timeout=10)
data = resp.json()
keywords = data if isinstance(data, list) else data.get("keywords", [])
text = "🔥 *热门搜索*\n" + "\n".join(
f"{i+1}. {kw.get('keyword', str(kw))}" for i, kw in enumerate(keywords[:10])
)
except:
text = "🔥 获取热门失败,请稍后重试"
await update.message.reply_text(text, parse_mode="Markdown")
async def help_cmd(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
await self.start(update, context)
def run(self):
app = Application.builder().token(self.token).build()
app.add_handler(CommandHandler("start", self.start))
app.add_handler(CommandHandler("search", self.search))
app.add_handler(CommandHandler("s", self.search))
app.add_handler(CommandHandler("hot", self.hot))
app.add_handler(CommandHandler("subscribe", self.subscribe))
app.add_handler(CommandHandler("sub", self.subscribe))
app.add_handler(CommandHandler("unsub", self.unsub))
app.add_handler(CommandHandler("mysubs", self.mysubs))
app.add_handler(CommandHandler("help", self.help_cmd))
logger.info("Bot starting...")
app.run_polling()
if __name__ == "__main__":
token = os.getenv("TG_BOT_TOKEN", "")
api = os.getenv("CLOUDSEARCH_API", "http://127.0.0.1:9527")
bot = CloudSearchBot(token, api)
bot.run()