Files
CloudSearch/cloudsearch_enrich/subscription_monitor.py
admin 83cbfaf03f v0.2.7: 修复Redis连接 + 启动管理后台
- 修复Redis认证 (配置密码)
- 启动Python管理后台 (端口9531, 15个功能开关)
- 统一版本号 0.2.7
- 更新docker-compose.yml (镜像版本/Redis URL/Admin服务)
2026-05-17 02:22:18 +08:00

205 lines
7.2 KiB
Python

"""
CloudSearch Subscription Monitor v1.0.0
关键词订阅 + 新资源检测 + 多渠道通知
"""
import os
import json
import time
import sqlite3
import logging
import requests
from typing import List, Dict, Optional
from dataclasses import dataclass
logger = logging.getLogger("subscription")
@dataclass
class Notification:
chat_id: int
keyword: str
new_count: int
results: List[dict]
channel: str = "telegram" # telegram / feishu / dingtalk
class SubscriptionMonitor:
"""订阅监控:定时搜索关键词,发现新资源后推送通知"""
def __init__(self, api_base: str, db_path: str = "/data/subscriptions.db",
tg_bot_token: str = None):
self.api_base = api_base.rstrip("/")
self.tg_token = tg_bot_token
self.db = sqlite3.connect(db_path, check_same_thread=False)
self._init_db()
def _init_db(self):
self.db.executescript("""
CREATE TABLE IF NOT EXISTS subscriptions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chat_id INTEGER NOT NULL,
keyword TEXT NOT NULL,
last_result_hash TEXT,
last_check TEXT,
created_at TEXT DEFAULT (datetime('now','localtime')),
UNIQUE(chat_id, keyword)
);
CREATE TABLE IF NOT EXISTS sent_notifications (
id INTEGER PRIMARY KEY AUTOINCREMENT,
subscription_id INTEGER,
result_hash TEXT,
sent_at TEXT DEFAULT (datetime('now','localtime')),
FOREIGN KEY(subscription_id) REFERENCES subscriptions(id)
);
""")
self.db.commit()
def check_all(self, batch_size: int = 10) -> List[Notification]:
"""检查所有订阅,返回需要通知的列表"""
subs = self.db.execute(
"SELECT id, chat_id, keyword, last_result_hash FROM subscriptions ORDER BY last_check ASC LIMIT ?",
(batch_size,)
).fetchall()
notifications = []
for sub_id, chat_id, keyword, last_hash in subs:
try:
result = self._search(keyword)
new_hash = self._hash_results(result)
if new_hash and new_hash != last_hash:
new_results = self._filter_new(sub_id, result, last_hash)
if new_results:
notifications.append(Notification(
chat_id=chat_id,
keyword=keyword,
new_count=len(new_results),
results=new_results[:5],
))
# 更新状态
self.db.execute(
"UPDATE subscriptions SET last_result_hash=?, last_check=datetime('now','localtime') WHERE id=?",
(new_hash, sub_id)
)
except Exception as e:
logger.error(f"Check failed: {keyword} - {e}")
self.db.commit()
return notifications
def _search(self, keyword: str) -> list:
"""搜索关键词"""
try:
resp = requests.post(
f"{self.api_base}/api/query",
json={"q": keyword},
timeout=20
)
results = []
for line in resp.text.strip().split("\n"):
try:
d = json.loads(line)
if d.get("type") == "result":
results.append({
"title": d.get("title", ""),
"url": d.get("share_url", ""),
"cloud": d.get("cloud_type", ""),
"source": d.get("source", ""),
})
except json.JSONDecodeError:
continue
return results
except Exception as e:
logger.error(f"Search error: {e}")
return []
def _hash_results(self, results: list) -> str:
"""计算结果哈希"""
import hashlib
key = "|".join(
r.get("url", "")[:50] for r in sorted(
results, key=lambda x: x.get("url", "")
)
)
return hashlib.md5(key.encode()).hexdigest()
def _filter_new(self, sub_id: int, results: list, last_hash: str) -> list:
"""过滤出新结果"""
new = []
for r in results:
rhash = str(hash(r.get("url", "")))
existing = self.db.execute(
"SELECT id FROM sent_notifications WHERE subscription_id=? AND result_hash=?",
(sub_id, rhash)
).fetchone()
if not existing:
new.append(r)
self.db.execute(
"INSERT OR IGNORE INTO sent_notifications (subscription_id, result_hash) VALUES (?,?)",
(sub_id, rhash)
)
return new
def notify_telegram(self, notif: Notification):
"""通过 Telegram 发送通知"""
if not self.tg_token:
return
text = f"🔔 *{notif.keyword}* 有新资源!({notif.new_count}个)\n\n"
for i, r in enumerate(notif.results[:5]):
title = r.get("title", "")[:40]
url = r.get("url", "")
cloud = r.get("cloud", "?").upper()
text += f"{i+1}. [{cloud}] [{title}]({url})\n"
try:
requests.post(
f"https://api.telegram.org/bot{self.tg_token}/sendMessage",
json={
"chat_id": notif.chat_id,
"text": text,
"parse_mode": "Markdown",
"disable_web_page_preview": True,
},
timeout=10
)
except Exception as e:
logger.error(f"TG notify failed: {e}")
def notify_feishu(self, notif: Notification, webhook_url: str):
"""通过飞书发送通知"""
text = f"🔔 {notif.keyword} 有新资源!({notif.new_count}个)\n"
for r in notif.results[:5]:
text += f"• [{r.get('cloud','?').upper()}] {r.get('title','')[:40]} {r.get('url','')}\n"
try:
requests.post(webhook_url, json={
"msg_type": "text",
"content": {"text": text}
}, timeout=10)
except Exception as e:
logger.error(f"Feishu notify failed: {e}")
def run_loop(self, interval_minutes: int = 15):
"""循环运行"""
logger.info(f"Subscription monitor started (interval={interval_minutes}min)")
while True:
try:
notifs = self.check_all()
for n in notifs:
self.notify_telegram(n)
if notifs:
logger.info(f"Sent {len(notifs)} notifications")
except Exception as e:
logger.error(f"Monitor error: {e}")
time.sleep(interval_minutes * 60)
if __name__ == "__main__":
api = os.getenv("CLOUDSEARCH_API", "http://127.0.0.1:9527")
token = os.getenv("TG_BOT_TOKEN", "")
interval = int(os.getenv("CHECK_INTERVAL", "15"))
monitor = SubscriptionMonitor(api, tg_bot_token=token)
monitor.run_loop(interval)