Files
CloudSearch/cloudsearch_enrich/feishu_bot.py
admin 83cbfaf03f v0.2.7: 修复Redis连接 + 启动管理后台
- 修复Redis认证 (配置密码)
- 启动Python管理后台 (端口9531, 15个功能开关)
- 统一版本号 0.2.7
- 更新docker-compose.yml (镜像版本/Redis URL/Admin服务)
2026-05-17 02:22:18 +08:00

320 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
CloudSearch 飞书 Bot v1.0.0
替代 Telegram Bot支持 /search /subscribe 命令 + Webhook 推送
通过飞书开放平台事件订阅接收消息
"""
import os
import json
import time
import hmac
import hashlib
import logging
import sqlite3
from typing import Optional
from flask import Flask, request, jsonify
import requests
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("feishubot")
# ── 飞书配置 ──────────────────────────────────
APP_ID = os.environ.get("FEISHU_APP_ID", "")
APP_SECRET = os.environ.get("FEISHU_APP_SECRET", "")
VERIFY_TOKEN = os.environ.get("FEISHU_VERIFY_TOKEN", "")
WEBHOOK_URL = os.environ.get("FEISHU_WEBHOOK_URL", "")
CLOUDSEARCH_API = os.environ.get("CLOUDSEARCH_API", "http://app:9527")
DB_PATH = os.environ.get("BOT_DB_PATH", "/data/bot.db")
# ── 飞书API ───────────────────────────────────
FEISHU_TOKEN_URL = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
FEISHU_SEND_URL = "https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type=open_id"
_tenant_token = None
_token_expire = 0
def get_tenant_token() -> str:
"""获取飞书 tenant_access_token缓存2h"""
global _tenant_token, _token_expire
if _tenant_token and time.time() < _token_expire:
return _tenant_token
resp = requests.post(FEISHU_TOKEN_URL, json={
"app_id": APP_ID, "app_secret": APP_SECRET
}, timeout=10)
data = resp.json()
if data.get("code") != 0:
raise Exception(f"获取飞书Token失败: {data}")
_tenant_token = data["tenant_access_token"]
_token_expire = time.time() + data.get("expire", 7200) - 300
logger.info("飞书 tenant_token 已刷新")
return _tenant_token
def send_feishu_msg(open_id: str, content: str, msg_type: str = "text"):
"""发送飞书消息"""
body = {
"receive_id": open_id,
"msg_type": msg_type,
"content": json.dumps({"text": content}) if msg_type == "text" else content
}
resp = requests.post(
FEISHU_SEND_URL,
headers={"Authorization": f"Bearer {get_tenant_token()}"},
json=body, timeout=10
)
data = resp.json()
if data.get("code") != 0:
logger.error(f"发送飞书消息失败: {data}")
return data.get("code") == 0
def send_feishu_card(open_id: str, card: dict):
"""发送飞书卡片消息"""
body = {
"receive_id": open_id,
"msg_type": "interactive",
"content": json.dumps(card)
}
resp = requests.post(
FEISHU_SEND_URL,
headers={"Authorization": f"Bearer {get_tenant_token()}"},
json=body, timeout=10
)
return resp.json().get("code") == 0
def send_webhook(text: str):
"""通过 Webhook 推送通知(用于订阅变更)"""
if not WEBHOOK_URL:
return
try:
requests.post(WEBHOOK_URL, json={
"msg_type": "text",
"content": {"text": text}
}, timeout=10)
except Exception as e:
logger.error(f"Webhook推送失败: {e}")
# ── Bot 核心逻辑 ────────────────────────────────
class FeishuBot:
def __init__(self):
self.db = sqlite3.connect(DB_PATH, check_same_thread=False)
self._init_db()
def _init_db(self):
self.db.execute("""
CREATE TABLE IF NOT EXISTS subscriptions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
open_id TEXT NOT NULL,
keyword TEXT NOT NULL,
last_check TEXT,
created_at TEXT DEFAULT (datetime('now','localtime')),
UNIQUE(open_id, keyword)
)
""")
self.db.commit()
logger.info("订阅数据库就绪")
def handle_text(self, open_id: str, text: str):
"""处理文本消息"""
text = text.strip()
if text.startswith("/search"):
keyword = text.replace("/search", "", 1).strip()
return self._cmd_search(open_id, keyword)
elif text.startswith("/subscribe"):
keyword = text.replace("/subscribe", "", 1).strip()
return self._cmd_subscribe(open_id, keyword)
elif text.startswith("/unsub"):
keyword = text.replace("/unsub", "", 1).strip()
return self._cmd_unsub(open_id, keyword)
elif text.startswith("/mysubs"):
return self._cmd_mysubs(open_id)
elif text.startswith("/help") or text.lower() == "help":
return self._cmd_help(open_id)
else:
return self._cmd_search(open_id, text) # 默认搜索
def _cmd_help(self, open_id: str):
help_text = (
"🔍 CloudSearch Bot\n\n"
"命令:\n"
"/search 关键词 — 搜索网盘资源\n"
"直接输入关键词也可以搜索\n"
"/subscribe 关键词 — 订阅关键词\n"
"/unsub 关键词 — 取消订阅\n"
"/mysubs — 查看我的订阅\n"
"/help — 帮助"
)
send_feishu_msg(open_id, help_text)
def _cmd_search(self, open_id: str, keyword: str):
if not keyword:
send_feishu_msg(open_id, "用法: /search 流浪地球2\n或直接输入关键词")
return
try:
resp = requests.post(
f"{CLOUDSEARCH_API}/api/query",
json={"q": keyword}, timeout=15
)
results = []
for line in resp.text.strip().split("\n"):
try:
d = json.loads(line)
if d.get("type") == "result":
results.append(d)
except json.JSONDecodeError:
continue
if not results:
send_feishu_msg(open_id, f"😞 未找到「{keyword}」的相关资源")
return
# 构建飞书卡片
elements = []
for i, r in enumerate(results[:5]):
title = (r.get("title") or r.get("content", ""))[:50]
cloud = r.get("cloud_type", "?").upper()
pwd = r.get("password", "")
pwd_str = f" 🔑{pwd}" if pwd else ""
elements.append({
"tag": "div",
"text": {"tag": "lark_md", "content": f"**{i+1}.** [{cloud}] {title}{pwd_str}"}
})
card = {
"header": {
"title": {"tag": "plain_text", "content": f"🔎 {keyword}{len(results)}个结果"},
"template": "blue"
},
"elements": elements + [{
"tag": "action",
"actions": [{
"tag": "button",
"text": {"tag": "plain_text", "content": "🌐 查看更多"},
"type": "primary",
"url": f"{CLOUDSEARCH_API}/?q={keyword}"
}]
}]
}
send_feishu_card(open_id, card)
except Exception as e:
send_feishu_msg(open_id, f"❌ 搜索失败: {e}")
def _cmd_subscribe(self, open_id: str, keyword: str):
if not keyword:
send_feishu_msg(open_id, "用法: /subscribe 流浪地球")
return
try:
self.db.execute(
"INSERT OR IGNORE INTO subscriptions (open_id, keyword) VALUES (?, ?)",
(open_id, keyword)
)
self.db.commit()
send_feishu_msg(open_id, f"✅ 已订阅「{keyword}」,有新结果会通知你")
except Exception as e:
send_feishu_msg(open_id, f"❌ 订阅失败: {e}")
def _cmd_unsub(self, open_id: str, keyword: str):
if not keyword:
send_feishu_msg(open_id, "用法: /unsub 流浪地球")
return
cur = self.db.execute(
"DELETE FROM subscriptions WHERE open_id=? AND keyword=?",
(open_id, keyword)
)
self.db.commit()
if cur.rowcount > 0:
send_feishu_msg(open_id, f"✅ 已取消订阅「{keyword}")
else:
send_feishu_msg(open_id, f"未找到「{keyword}」的订阅")
def _cmd_mysubs(self, open_id: str):
rows = self.db.execute(
"SELECT keyword, created_at FROM subscriptions WHERE open_id=? ORDER BY created_at DESC",
(open_id,)
).fetchall()
if not rows:
send_feishu_msg(open_id, "你还没有订阅任何关键词")
return
text = "📋 我的订阅:\n"
for kw, dt in rows:
text += f"{kw} ({dt[:10]})\n"
send_feishu_msg(open_id, text)
def check_subscriptions(self):
"""检查所有订阅,有新结果时推送通知"""
subs = self.db.execute("SELECT DISTINCT keyword FROM subscriptions").fetchall()
for (kw,) in subs:
try:
resp = requests.post(
f"{CLOUDSEARCH_API}/api/query",
json={"q": kw}, timeout=10
)
count = sum(1 for line in resp.text.split("\n")
if '"type":"result"' in line)
if count > 0:
# 通知所有订阅此关键词的用户
users = self.db.execute(
"SELECT open_id FROM subscriptions WHERE keyword=?",
(kw,)
).fetchall()
for (uid,) in users:
send_feishu_msg(uid, f"🔔「{kw}」有新资源({count}个)\n/search {kw}")
# Webhook 也推送
send_webhook(f"🔔 关键词「{kw}」发现 {count} 个新资源")
except Exception as e:
logger.error(f"检查订阅[{kw}]失败: {e}")
# ── Flask Web 服务 ─────────────────────────────
bot = FeishuBot()
app = Flask(__name__)
@app.route("/health")
def health():
return jsonify({"status": "ok", "bot": "feishu"})
@app.route("/feishu/event", methods=["POST"])
def feishu_event():
"""飞书事件订阅回调"""
body = request.get_json()
logger.info(f"飞书事件: {json.dumps(body, ensure_ascii=False)[:300]}")
# Token 验证首次配置URL时
if body.get("type") == "url_verification":
token = body.get("token", "")
if token == VERIFY_TOKEN:
return jsonify({"challenge": body.get("challenge", "")})
return jsonify({"error": "invalid token"}), 403
# 事件回调验证
if "header" in body:
# 收到消息事件
event = body.get("event", {})
msg_type = event.get("message", {}).get("message_type", "")
if msg_type == "text":
content = event["message"].get("content", "{}")
try:
text = json.loads(content).get("text", "")
except json.JSONDecodeError:
text = content
open_id = event.get("sender", {}).get("sender_id", {}).get("open_id", "")
if text and open_id:
bot.handle_text(open_id, text)
return jsonify({"code": 0})
@app.route("/feishu/check", methods=["POST"])
def trigger_check():
"""手动触发订阅检查"""
bot.check_subscriptions()
return jsonify({"ok": True})
# ── 启动入口 ───────────────────────────────────
def main():
if not APP_ID:
logger.warning("FEISHU_APP_ID 未设置Bot 无法接收消息(仅 Webhook 可用)")
logger.info("飞书 Bot 启动端口9531")
app.run(host="0.0.0.0", port=9532)
if __name__ == "__main__":
main()