插件窝 干货文章 如何实时获取 MySQL 数据库更新并发送短信通知?

如何实时获取 MySQL 数据库更新并发送短信通知?

短信 binlogevent 方案 使用 245    来源:    2025-04-01

实时获取 MySQL 数据库更新并发送短信通知方案

要实现实时监控 MySQL 数据库变更并触发短信通知,有几种可行的技术方案:

方案一:使用 MySQL Binlog 监听

实现步骤

  1. 启用 MySQL Binlog

    SET GLOBAL log_bin = ON;
    SET GLOBAL binlog_format = 'ROW';
    
  2. 使用 Binlog 解析工具

    • Python 可以使用 python-mysql-replication
    • Java 可以使用 canaldebezium
  3. 示例 Python 代码

    from pymysqlreplication import BinLogStreamReader
    from pymysqlreplication.row_event import DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent
    import requests  # 用于调用短信API
    
    def send_sms(message):
       # 调用短信服务API
       pass
    
    stream = BinLogStreamReader(
       connection_settings={
           "host": "localhost",
           "port": 3306,
           "user": "root",
           "passwd": "password"},
       server_id=100,
       blocking=True,
       only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent]
    )
    
    for binlogevent in stream:
       for row in binlogevent.rows:
           event = {"schema": binlogevent.schema, "table": binlogevent.table}
           if isinstance(binlogevent, DeleteRowsEvent):
               event["action"] = "delete"
               event["data"] = row["values"]
           elif isinstance(binlogevent, UpdateRowsEvent):
               event["action"] = "update"
               event["data"] = row["after_values"]
           elif isinstance(binlogevent, WriteRowsEvent):
               event["action"] = "insert"
               event["data"] = row["values"]
    
           # 发送短信通知
           send_sms(f"数据库变更: {event}")
    

方案二:使用数据库触发器 + 外部程序

  1. 创建触发器表

    CREATE TABLE notification_queue (
       id INT AUTO_INCREMENT PRIMARY KEY,
       table_name VARCHAR(100),
       action VARCHAR(10),
       record_id INT,
       created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
       processed BOOLEAN DEFAULT FALSE
    );
    
  2. 在目标表上创建触发器

    DELIMITER //
    CREATE TRIGGER after_customer_insert
    AFTER INSERT ON customers
    FOR EACH ROW
    BEGIN
       INSERT INTO notification_queue (table_name, action, record_id)
       VALUES ('customers', 'INSERT', NEW.id);
    END//
    DELIMITER ;
    
  3. 编写轮询程序处理队列

    import time
    import pymysql
    import requests
    
    def process_queue():
       conn = pymysql.connect(host='localhost', user='user', password='pass', db='db')
       while True:
           with conn.cursor() as cursor:
               cursor.execute("SELECT * FROM notification_queue WHERE processed = FALSE")
               rows = cursor.fetchall()
               for row in rows:
                   # 发送短信
                   send_sms(f"新记录插入到 {row[1]} 表,ID: {row[3]}")
                   # 标记为已处理
                   cursor.execute("UPDATE notification_queue SET processed = TRUE WHERE id = %s", (row[0],))
               conn.commit()
           time.sleep(5)  # 每5秒检查一次
    

方案三:使用 CDC (Change Data Capture) 工具

  1. 使用 Debezium

    • 配置 Debezium MySQL 连接器
    • 将变更事件发送到 Kafka
    • 编写消费者处理事件并发送短信
  2. 使用 AWS DMS + SNS

    • 配置 AWS Database Migration Service
    • 将变更事件发送到 SNS 主题
    • SNS 触发短信发送

短信服务集成

选择短信服务提供商(如阿里云短信、腾讯云短信、Twilio等),获取 API 密钥并集成到上述方案中。

注意事项

  1. 性能考虑:高频更新可能导致大量短信,需考虑限流
  2. 错误处理:短信发送失败需要有重试机制
  3. 安全性:确保数据库连接和短信API调用的安全性
  4. 监控:监控监听程序的运行状态

您可以根据具体需求和技术栈选择最适合的方案。如需更详细的某一种方案实现,我可以提供更具体的指导。