我将为您设计一个基于Python的RSS提示系统,该系统能够定期检查RSS源更新,并在发现新内容时通过多种方式通知用户。
import feedparser
import time
from datetime import datetime
import hashlib
import smtplib
from email.mime.text import MIMEText
import sqlite3
import json
import os
from plyer import notification # 用于桌面通知
import requests # 用于Webhook通知
class RSSMonitor:
def __init__(self, config_file='config.json'):
self.config = self.load_config(config_file)
self.db_conn = sqlite3.connect('rss_history.db')
self.init_db()
def load_config(self, config_file):
"""加载配置文件"""
if not os.path.exists(config_file):
# 默认配置
default_config = {
"rss_feeds": [
{"url": "https://example.com/feed", "name": "Example Feed"},
],
"check_interval": 3600, # 检查间隔(秒)
"notification": {
"email": {
"enabled": False,
"smtp_server": "smtp.example.com",
"smtp_port": 587,
"username": "your@email.com",
"password": "password",
"recipient": "recipient@email.com"
},
"desktop": {
"enabled": True
},
"webhook": {
"enabled": False,
"url": "https://your-webhook-url.com"
}
}
}
with open(config_file, 'w') as f:
json.dump(default_config, f, indent=4)
return default_config
with open(config_file) as f:
return json.load(f)
def init_db(self):
"""初始化数据库"""
cursor = self.db_conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS processed_entries (
entry_id TEXT PRIMARY KEY,
feed_url TEXT,
title TEXT,
published TEXT,
processed_time TEXT
)
''')
self.db_conn.commit()
def get_feed_entries(self, feed_url):
"""获取RSS源条目"""
feed = feedparser.parse(feed_url)
return feed.entries
def generate_entry_id(self, entry):
"""生成条目唯一ID"""
unique_str = f"{entry.get('id', '')}{entry.get('title', '')}{entry.get('published', '')}"
return hashlib.md5(unique_str.encode('utf-8')).hexdigest()
def is_new_entry(self, entry_id):
"""检查是否是新条目"""
cursor = self.db_conn.cursor()
cursor.execute('SELECT 1 FROM processed_entries WHERE entry_id = ?', (entry_id,))
return cursor.fetchone() is None
def mark_as_processed(self, entry, feed_url):
"""标记条目为已处理"""
entry_id = self.generate_entry_id(entry)
cursor = self.db_conn.cursor()
cursor.execute('''
INSERT INTO processed_entries
(entry_id, feed_url, title, published, processed_time)
VALUES (?, ?, ?, ?, ?)
''', (
entry_id,
feed_url,
entry.get('title', ''),
entry.get('published', ''),
datetime.now().isoformat()
))
self.db_conn.commit()
def send_notification(self, feed_name, entry):
"""发送通知"""
title = f"新内容: {feed_name}"
message = entry.get('title', '无标题')
# 桌面通知
if self.config['notification']['desktop']['enabled']:
try:
notification.notify(
title=title,
message=message,
app_name="RSS Monitor"
)
except Exception as e:
print(f"桌面通知失败: {e}")
# 邮件通知
if self.config['notification']['email']['enabled']:
self.send_email_notification(title, message, entry.get('link', ''))
# Webhook通知
if self.config['notification']['webhook']['enabled']:
self.send_webhook_notification(feed_name, entry)
def send_email_notification(self, subject, message, link):
"""发送邮件通知"""
email_config = self.config['notification']['email']
msg = MIMEText(f"{message}\n\n查看详情: {link}")
msg['Subject'] = subject
msg['From'] = email_config['username']
msg['To'] = email_config['recipient']
try:
with smtplib.SMTP(email_config['smtp_server'], email_config['smtp_port']) as server:
server.starttls()
server.login(email_config['username'], email_config['password'])
server.send_message(msg)
except Exception as e:
print(f"邮件发送失败: {e}")
def send_webhook_notification(self, feed_name, entry):
"""发送Webhook通知"""
webhook_config = self.config['notification']['webhook']
payload = {
"feed": feed_name,
"title": entry.get('title', '无标题'),
"summary": entry.get('summary', '无摘要'),
"link": entry.get('link', ''),
"published": entry.get('published', '')
}
try:
response = requests.post(
webhook_config['url'],
json=payload,
timeout=10
)
if response.status_code != 200:
print(f"Webhook通知失败: {response.status_code} - {response.text}")
except Exception as e:
print(f"Webhook通知失败: {e}")
def check_feeds(self):
"""检查所有RSS源"""
for feed in self.config['rss_feeds']:
try:
entries = self.get_feed_entries(feed['url'])
for entry in entries:
entry_id = self.generate_entry_id(entry)
if self.is_new_entry(entry_id):
print(f"发现新内容: {entry.get('title', '无标题')}")
self.send_notification(feed['name'], entry)
self.mark_as_processed(entry, feed['url'])
except Exception as e:
print(f"检查RSS源 {feed['url']} 失败: {e}")
def run(self):
"""运行监控循环"""
print("RSS监控系统启动...")
while True:
self.check_feeds()
time.sleep(self.config['check_interval'])
if __name__ == "__main__":
monitor = RSSMonitor()
try:
monitor.run()
except KeyboardInterrupt:
print("\nRSS监控系统停止")
finally:
monitor.db_conn.close()
pip install feedparser plyer requests
创建或修改config.json
文件:
{
"rss_feeds": [
{"url": "https://example.com/feed", "name": "示例源"},
{"url": "https://another-feed.com/rss", "name": "另一个源"}
],
"check_interval": 1800,
"notification": {
"email": {
"enabled": true,
"smtp_server": "smtp.example.com",
"smtp_port": 587,
"username": "your@email.com",
"password": "your-password",
"recipient": "recipient@email.com"
},
"desktop": {
"enabled": true
},
"webhook": {
"enabled": false,
"url": "https://your-webhook-url.com"
}
}
}
python rss_monitor.py
libnotify
systemd
或supervisor
管理进程这个RSS提示系统可以作为一个基础框架,您可以根据实际需求进行扩展和定制。