- 修复Redis认证 (配置密码) - 启动Python管理后台 (端口9531, 15个功能开关) - 统一版本号 0.2.7 - 更新docker-compose.yml (镜像版本/Redis URL/Admin服务)
205 lines
7.2 KiB
Python
205 lines
7.2 KiB
Python
"""
|
|
CloudSearch Subscription Monitor v1.0.0
|
|
关键词订阅 + 新资源检测 + 多渠道通知
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import time
|
|
import sqlite3
|
|
import logging
|
|
import requests
|
|
from typing import List, Dict, Optional
|
|
from dataclasses import dataclass
|
|
|
|
logger = logging.getLogger("subscription")
|
|
|
|
|
|
@dataclass
|
|
class Notification:
|
|
chat_id: int
|
|
keyword: str
|
|
new_count: int
|
|
results: List[dict]
|
|
channel: str = "telegram" # telegram / feishu / dingtalk
|
|
|
|
|
|
class SubscriptionMonitor:
|
|
"""订阅监控:定时搜索关键词,发现新资源后推送通知"""
|
|
|
|
def __init__(self, api_base: str, db_path: str = "/data/subscriptions.db",
|
|
tg_bot_token: str = None):
|
|
self.api_base = api_base.rstrip("/")
|
|
self.tg_token = tg_bot_token
|
|
self.db = sqlite3.connect(db_path, check_same_thread=False)
|
|
self._init_db()
|
|
|
|
def _init_db(self):
|
|
self.db.executescript("""
|
|
CREATE TABLE IF NOT EXISTS subscriptions (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
chat_id INTEGER NOT NULL,
|
|
keyword TEXT NOT NULL,
|
|
last_result_hash TEXT,
|
|
last_check TEXT,
|
|
created_at TEXT DEFAULT (datetime('now','localtime')),
|
|
UNIQUE(chat_id, keyword)
|
|
);
|
|
CREATE TABLE IF NOT EXISTS sent_notifications (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
subscription_id INTEGER,
|
|
result_hash TEXT,
|
|
sent_at TEXT DEFAULT (datetime('now','localtime')),
|
|
FOREIGN KEY(subscription_id) REFERENCES subscriptions(id)
|
|
);
|
|
""")
|
|
self.db.commit()
|
|
|
|
def check_all(self, batch_size: int = 10) -> List[Notification]:
|
|
"""检查所有订阅,返回需要通知的列表"""
|
|
subs = self.db.execute(
|
|
"SELECT id, chat_id, keyword, last_result_hash FROM subscriptions ORDER BY last_check ASC LIMIT ?",
|
|
(batch_size,)
|
|
).fetchall()
|
|
|
|
notifications = []
|
|
for sub_id, chat_id, keyword, last_hash in subs:
|
|
try:
|
|
result = self._search(keyword)
|
|
new_hash = self._hash_results(result)
|
|
|
|
if new_hash and new_hash != last_hash:
|
|
new_results = self._filter_new(sub_id, result, last_hash)
|
|
if new_results:
|
|
notifications.append(Notification(
|
|
chat_id=chat_id,
|
|
keyword=keyword,
|
|
new_count=len(new_results),
|
|
results=new_results[:5],
|
|
))
|
|
|
|
# 更新状态
|
|
self.db.execute(
|
|
"UPDATE subscriptions SET last_result_hash=?, last_check=datetime('now','localtime') WHERE id=?",
|
|
(new_hash, sub_id)
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Check failed: {keyword} - {e}")
|
|
|
|
self.db.commit()
|
|
return notifications
|
|
|
|
def _search(self, keyword: str) -> list:
|
|
"""搜索关键词"""
|
|
try:
|
|
resp = requests.post(
|
|
f"{self.api_base}/api/query",
|
|
json={"q": keyword},
|
|
timeout=20
|
|
)
|
|
results = []
|
|
for line in resp.text.strip().split("\n"):
|
|
try:
|
|
d = json.loads(line)
|
|
if d.get("type") == "result":
|
|
results.append({
|
|
"title": d.get("title", ""),
|
|
"url": d.get("share_url", ""),
|
|
"cloud": d.get("cloud_type", ""),
|
|
"source": d.get("source", ""),
|
|
})
|
|
except json.JSONDecodeError:
|
|
continue
|
|
return results
|
|
except Exception as e:
|
|
logger.error(f"Search error: {e}")
|
|
return []
|
|
|
|
def _hash_results(self, results: list) -> str:
|
|
"""计算结果哈希"""
|
|
import hashlib
|
|
key = "|".join(
|
|
r.get("url", "")[:50] for r in sorted(
|
|
results, key=lambda x: x.get("url", "")
|
|
)
|
|
)
|
|
return hashlib.md5(key.encode()).hexdigest()
|
|
|
|
def _filter_new(self, sub_id: int, results: list, last_hash: str) -> list:
|
|
"""过滤出新结果"""
|
|
new = []
|
|
for r in results:
|
|
rhash = str(hash(r.get("url", "")))
|
|
existing = self.db.execute(
|
|
"SELECT id FROM sent_notifications WHERE subscription_id=? AND result_hash=?",
|
|
(sub_id, rhash)
|
|
).fetchone()
|
|
if not existing:
|
|
new.append(r)
|
|
self.db.execute(
|
|
"INSERT OR IGNORE INTO sent_notifications (subscription_id, result_hash) VALUES (?,?)",
|
|
(sub_id, rhash)
|
|
)
|
|
return new
|
|
|
|
def notify_telegram(self, notif: Notification):
|
|
"""通过 Telegram 发送通知"""
|
|
if not self.tg_token:
|
|
return
|
|
text = f"🔔 *{notif.keyword}* 有新资源!({notif.new_count}个)\n\n"
|
|
for i, r in enumerate(notif.results[:5]):
|
|
title = r.get("title", "")[:40]
|
|
url = r.get("url", "")
|
|
cloud = r.get("cloud", "?").upper()
|
|
text += f"{i+1}. [{cloud}] [{title}]({url})\n"
|
|
|
|
try:
|
|
requests.post(
|
|
f"https://api.telegram.org/bot{self.tg_token}/sendMessage",
|
|
json={
|
|
"chat_id": notif.chat_id,
|
|
"text": text,
|
|
"parse_mode": "Markdown",
|
|
"disable_web_page_preview": True,
|
|
},
|
|
timeout=10
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"TG notify failed: {e}")
|
|
|
|
def notify_feishu(self, notif: Notification, webhook_url: str):
|
|
"""通过飞书发送通知"""
|
|
text = f"🔔 {notif.keyword} 有新资源!({notif.new_count}个)\n"
|
|
for r in notif.results[:5]:
|
|
text += f"• [{r.get('cloud','?').upper()}] {r.get('title','')[:40]} {r.get('url','')}\n"
|
|
try:
|
|
requests.post(webhook_url, json={
|
|
"msg_type": "text",
|
|
"content": {"text": text}
|
|
}, timeout=10)
|
|
except Exception as e:
|
|
logger.error(f"Feishu notify failed: {e}")
|
|
|
|
def run_loop(self, interval_minutes: int = 15):
|
|
"""循环运行"""
|
|
logger.info(f"Subscription monitor started (interval={interval_minutes}min)")
|
|
while True:
|
|
try:
|
|
notifs = self.check_all()
|
|
for n in notifs:
|
|
self.notify_telegram(n)
|
|
if notifs:
|
|
logger.info(f"Sent {len(notifs)} notifications")
|
|
except Exception as e:
|
|
logger.error(f"Monitor error: {e}")
|
|
time.sleep(interval_minutes * 60)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
api = os.getenv("CLOUDSEARCH_API", "http://127.0.0.1:9527")
|
|
token = os.getenv("TG_BOT_TOKEN", "")
|
|
interval = int(os.getenv("CHECK_INTERVAL", "15"))
|
|
monitor = SubscriptionMonitor(api, tg_bot_token=token)
|
|
monitor.run_loop(interval)
|