插件窝 干货文章 用 Python 去构建一个 RSS 提示系统

用 Python 去构建一个 RSS 提示系统

entry config self notification 97    来源:    2025-05-12

Python RSS 提示系统构建方案

系统概述

我将为您设计一个基于Python的RSS提示系统,该系统能够定期检查RSS源更新,并在发现新内容时通过多种方式通知用户。

核心功能设计

  1. RSS源监控:定期检查配置的RSS源更新
  2. 内容解析:提取关键信息(标题、摘要、发布时间等)
  3. 去重处理:避免重复提示已处理过的内容
  4. 通知机制:支持多种通知方式(邮件、桌面通知、Webhook等)
  5. 配置管理:允许用户添加/删除RSS源和配置通知方式

技术实现方案

1. 基础架构

import feedparser
import time
from datetime import datetime
import hashlib
import smtplib
from email.mime.text import MIMEText
import sqlite3
import json
import os
from plyer import notification  # 用于桌面通知
import requests  # 用于Webhook通知

2. 核心代码实现

RSS解析与监控类

class RSSMonitor:
    def __init__(self, config_file='config.json'):
        self.config = self.load_config(config_file)
        self.db_conn = sqlite3.connect('rss_history.db')
        self.init_db()

    def load_config(self, config_file):
        """加载配置文件"""
        if not os.path.exists(config_file):
            # 默认配置
            default_config = {
                "rss_feeds": [
                    {"url": "https://example.com/feed", "name": "Example Feed"},
                ],
                "check_interval": 3600,  # 检查间隔(秒)
                "notification": {
                    "email": {
                        "enabled": False,
                        "smtp_server": "smtp.example.com",
                        "smtp_port": 587,
                        "username": "your@email.com",
                        "password": "password",
                        "recipient": "recipient@email.com"
                    },
                    "desktop": {
                        "enabled": True
                    },
                    "webhook": {
                        "enabled": False,
                        "url": "https://your-webhook-url.com"
                    }
                }
            }
            with open(config_file, 'w') as f:
                json.dump(default_config, f, indent=4)
            return default_config

        with open(config_file) as f:
            return json.load(f)

    def init_db(self):
        """初始化数据库"""
        cursor = self.db_conn.cursor()
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS processed_entries (
                entry_id TEXT PRIMARY KEY,
                feed_url TEXT,
                title TEXT,
                published TEXT,
                processed_time TEXT
            )
        ''')
        self.db_conn.commit()

    def get_feed_entries(self, feed_url):
        """获取RSS源条目"""
        feed = feedparser.parse(feed_url)
        return feed.entries

    def generate_entry_id(self, entry):
        """生成条目唯一ID"""
        unique_str = f"{entry.get('id', '')}{entry.get('title', '')}{entry.get('published', '')}"
        return hashlib.md5(unique_str.encode('utf-8')).hexdigest()

    def is_new_entry(self, entry_id):
        """检查是否是新条目"""
        cursor = self.db_conn.cursor()
        cursor.execute('SELECT 1 FROM processed_entries WHERE entry_id = ?', (entry_id,))
        return cursor.fetchone() is None

    def mark_as_processed(self, entry, feed_url):
        """标记条目为已处理"""
        entry_id = self.generate_entry_id(entry)
        cursor = self.db_conn.cursor()
        cursor.execute('''
            INSERT INTO processed_entries 
            (entry_id, feed_url, title, published, processed_time)
            VALUES (?, ?, ?, ?, ?)
        ''', (
            entry_id,
            feed_url,
            entry.get('title', ''),
            entry.get('published', ''),
            datetime.now().isoformat()
        ))
        self.db_conn.commit()

    def send_notification(self, feed_name, entry):
        """发送通知"""
        title = f"新内容: {feed_name}"
        message = entry.get('title', '无标题')

        # 桌面通知
        if self.config['notification']['desktop']['enabled']:
            try:
                notification.notify(
                    title=title,
                    message=message,
                    app_name="RSS Monitor"
                )
            except Exception as e:
                print(f"桌面通知失败: {e}")

        # 邮件通知
        if self.config['notification']['email']['enabled']:
            self.send_email_notification(title, message, entry.get('link', ''))

        # Webhook通知
        if self.config['notification']['webhook']['enabled']:
            self.send_webhook_notification(feed_name, entry)

    def send_email_notification(self, subject, message, link):
        """发送邮件通知"""
        email_config = self.config['notification']['email']
        msg = MIMEText(f"{message}\n\n查看详情: {link}")
        msg['Subject'] = subject
        msg['From'] = email_config['username']
        msg['To'] = email_config['recipient']

        try:
            with smtplib.SMTP(email_config['smtp_server'], email_config['smtp_port']) as server:
                server.starttls()
                server.login(email_config['username'], email_config['password'])
                server.send_message(msg)
        except Exception as e:
            print(f"邮件发送失败: {e}")

    def send_webhook_notification(self, feed_name, entry):
        """发送Webhook通知"""
        webhook_config = self.config['notification']['webhook']
        payload = {
            "feed": feed_name,
            "title": entry.get('title', '无标题'),
            "summary": entry.get('summary', '无摘要'),
            "link": entry.get('link', ''),
            "published": entry.get('published', '')
        }

        try:
            response = requests.post(
                webhook_config['url'],
                json=payload,
                timeout=10
            )
            if response.status_code != 200:
                print(f"Webhook通知失败: {response.status_code} - {response.text}")
        except Exception as e:
            print(f"Webhook通知失败: {e}")

    def check_feeds(self):
        """检查所有RSS源"""
        for feed in self.config['rss_feeds']:
            try:
                entries = self.get_feed_entries(feed['url'])
                for entry in entries:
                    entry_id = self.generate_entry_id(entry)
                    if self.is_new_entry(entry_id):
                        print(f"发现新内容: {entry.get('title', '无标题')}")
                        self.send_notification(feed['name'], entry)
                        self.mark_as_processed(entry, feed['url'])
            except Exception as e:
                print(f"检查RSS源 {feed['url']} 失败: {e}")

    def run(self):
        """运行监控循环"""
        print("RSS监控系统启动...")
        while True:
            self.check_feeds()
            time.sleep(self.config['check_interval'])

主程序入口

if __name__ == "__main__":
    monitor = RSSMonitor()
    try:
        monitor.run()
    except KeyboardInterrupt:
        print("\nRSS监控系统停止")
    finally:
        monitor.db_conn.close()

部署与使用指南

1. 安装依赖

pip install feedparser plyer requests

2. 配置文件说明

创建或修改config.json文件:

{
    "rss_feeds": [
        {"url": "https://example.com/feed", "name": "示例源"},
        {"url": "https://another-feed.com/rss", "name": "另一个源"}
    ],
    "check_interval": 1800,
    "notification": {
        "email": {
            "enabled": true,
            "smtp_server": "smtp.example.com",
            "smtp_port": 587,
            "username": "your@email.com",
            "password": "your-password",
            "recipient": "recipient@email.com"
        },
        "desktop": {
            "enabled": true
        },
        "webhook": {
            "enabled": false,
            "url": "https://your-webhook-url.com"
        }
    }
}

3. 运行系统

python rss_monitor.py

高级功能扩展建议

  1. 多平台支持:添加Telegram、Slack等通知方式
  2. 内容过滤:基于关键词过滤感兴趣的内容
  3. Web界面:使用Flask/Django添加管理界面
  4. 多用户支持:不同用户可以订阅不同的RSS源
  5. 内容摘要:使用NLP技术生成内容摘要
  6. 异常处理:添加更完善的错误处理和重试机制
  7. 日志记录:添加详细的日志记录功能
  8. Docker支持:容器化部署

注意事项

  1. 邮件通知需要配置正确的SMTP服务器信息
  2. 桌面通知在Linux系统上可能需要额外安装libnotify
  3. 长时间运行建议使用systemdsupervisor管理进程
  4. 定期清理数据库中的旧记录以避免膨胀

这个RSS提示系统可以作为一个基础框架,您可以根据实际需求进行扩展和定制。