""" CloudSearch Subscription Monitor v1.0.0 关键词订阅 + 新资源检测 + 多渠道通知 """ import os import json import time import sqlite3 import logging import requests from typing import List, Dict, Optional from dataclasses import dataclass logger = logging.getLogger("subscription") @dataclass class Notification: chat_id: int keyword: str new_count: int results: List[dict] channel: str = "telegram" # telegram / feishu / dingtalk class SubscriptionMonitor: """订阅监控:定时搜索关键词,发现新资源后推送通知""" def __init__(self, api_base: str, db_path: str = "/data/subscriptions.db", tg_bot_token: str = None): self.api_base = api_base.rstrip("/") self.tg_token = tg_bot_token self.db = sqlite3.connect(db_path, check_same_thread=False) self._init_db() def _init_db(self): self.db.executescript(""" CREATE TABLE IF NOT EXISTS subscriptions ( id INTEGER PRIMARY KEY AUTOINCREMENT, chat_id INTEGER NOT NULL, keyword TEXT NOT NULL, last_result_hash TEXT, last_check TEXT, created_at TEXT DEFAULT (datetime('now','localtime')), UNIQUE(chat_id, keyword) ); CREATE TABLE IF NOT EXISTS sent_notifications ( id INTEGER PRIMARY KEY AUTOINCREMENT, subscription_id INTEGER, result_hash TEXT, sent_at TEXT DEFAULT (datetime('now','localtime')), FOREIGN KEY(subscription_id) REFERENCES subscriptions(id) ); """) self.db.commit() def check_all(self, batch_size: int = 10) -> List[Notification]: """检查所有订阅,返回需要通知的列表""" subs = self.db.execute( "SELECT id, chat_id, keyword, last_result_hash FROM subscriptions ORDER BY last_check ASC LIMIT ?", (batch_size,) ).fetchall() notifications = [] for sub_id, chat_id, keyword, last_hash in subs: try: result = self._search(keyword) new_hash = self._hash_results(result) if new_hash and new_hash != last_hash: new_results = self._filter_new(sub_id, result, last_hash) if new_results: notifications.append(Notification( chat_id=chat_id, keyword=keyword, new_count=len(new_results), results=new_results[:5], )) # 更新状态 self.db.execute( "UPDATE subscriptions SET last_result_hash=?, last_check=datetime('now','localtime') WHERE id=?", (new_hash, sub_id) ) except Exception as e: logger.error(f"Check failed: {keyword} - {e}") self.db.commit() return notifications def _search(self, keyword: str) -> list: """搜索关键词""" try: resp = requests.post( f"{self.api_base}/api/query", json={"q": keyword}, timeout=20 ) results = [] for line in resp.text.strip().split("\n"): try: d = json.loads(line) if d.get("type") == "result": results.append({ "title": d.get("title", ""), "url": d.get("share_url", ""), "cloud": d.get("cloud_type", ""), "source": d.get("source", ""), }) except json.JSONDecodeError: continue return results except Exception as e: logger.error(f"Search error: {e}") return [] def _hash_results(self, results: list) -> str: """计算结果哈希""" import hashlib key = "|".join( r.get("url", "")[:50] for r in sorted( results, key=lambda x: x.get("url", "") ) ) return hashlib.md5(key.encode()).hexdigest() def _filter_new(self, sub_id: int, results: list, last_hash: str) -> list: """过滤出新结果""" new = [] for r in results: rhash = str(hash(r.get("url", ""))) existing = self.db.execute( "SELECT id FROM sent_notifications WHERE subscription_id=? AND result_hash=?", (sub_id, rhash) ).fetchone() if not existing: new.append(r) self.db.execute( "INSERT OR IGNORE INTO sent_notifications (subscription_id, result_hash) VALUES (?,?)", (sub_id, rhash) ) return new def notify_telegram(self, notif: Notification): """通过 Telegram 发送通知""" if not self.tg_token: return text = f"🔔 *{notif.keyword}* 有新资源!({notif.new_count}个)\n\n" for i, r in enumerate(notif.results[:5]): title = r.get("title", "")[:40] url = r.get("url", "") cloud = r.get("cloud", "?").upper() text += f"{i+1}. [{cloud}] [{title}]({url})\n" try: requests.post( f"https://api.telegram.org/bot{self.tg_token}/sendMessage", json={ "chat_id": notif.chat_id, "text": text, "parse_mode": "Markdown", "disable_web_page_preview": True, }, timeout=10 ) except Exception as e: logger.error(f"TG notify failed: {e}") def notify_feishu(self, notif: Notification, webhook_url: str): """通过飞书发送通知""" text = f"🔔 {notif.keyword} 有新资源!({notif.new_count}个)\n" for r in notif.results[:5]: text += f"• [{r.get('cloud','?').upper()}] {r.get('title','')[:40]} {r.get('url','')}\n" try: requests.post(webhook_url, json={ "msg_type": "text", "content": {"text": text} }, timeout=10) except Exception as e: logger.error(f"Feishu notify failed: {e}") def run_loop(self, interval_minutes: int = 15): """循环运行""" logger.info(f"Subscription monitor started (interval={interval_minutes}min)") while True: try: notifs = self.check_all() for n in notifs: self.notify_telegram(n) if notifs: logger.info(f"Sent {len(notifs)} notifications") except Exception as e: logger.error(f"Monitor error: {e}") time.sleep(interval_minutes * 60) if __name__ == "__main__": api = os.getenv("CLOUDSEARCH_API", "http://127.0.0.1:9527") token = os.getenv("TG_BOT_TOKEN", "") interval = int(os.getenv("CHECK_INTERVAL", "15")) monitor = SubscriptionMonitor(api, tg_bot_token=token) monitor.run_loop(interval)