插件窝 干货文章 如何实时获取 MySQL 数据库更新并通知用户?

如何实时获取 MySQL 数据库更新并通知用户?

connector database MySQL 数据库 562    来源:    2025-03-31

实时获取MySQL数据库更新并通知用户的解决方案

要实现MySQL数据库的实时更新通知,有几种主要方法,我将介绍最常用的几种方案:

1. 使用MySQL二进制日志(Binlog)

原理:MySQL的Binlog记录了所有对数据库的修改操作,可以通过解析Binlog来获取实时更新。

实现步骤: 1. 确保MySQL开启了Binlog(配置文件中设置log-bin=mysql-bin) 2. 使用Binlog解析工具: - Canal(阿里巴巴开源,Java实现) - Maxwell(Zendesk开源) - Debezium(Red Hat开源)

示例(使用Canal)

CanalConnector connector = CanalConnectors.newSingleConnector(
    new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
connector.connect();
connector.subscribe(".*\\..*");
while (true) {
    Message message = connector.getWithoutAck(100);
    for (CanalEntry.Entry entry : message.getEntries()) {
        // 处理变更事件
        if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
            // 解析并通知用户
        }
    }
    connector.ack(message.getId());
}

2. 使用触发器+消息队列

原理:通过数据库触发器将变更写入消息队列,然后消费队列通知用户。

实现步骤: 1. 创建变更记录表 2. 在目标表上创建触发器 3. 使用外部程序消费变更记录

SQL触发器示例

CREATE TRIGGER after_user_update
AFTER UPDATE ON users
FOR EACH ROW
BEGIN
    INSERT INTO change_log (table_name, record_id, change_type)
    VALUES ('users', NEW.id, 'update');
END;

3. 使用数据库CDC工具

工具选项: - Debezium:支持多种数据库,提供Kafka连接器 - AWS DMS:Amazon的数据库迁移服务,支持CDC - Google Cloud Dataflow:支持MySQL CDC

Debezium示例配置

{
  "name": "mysql-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "database.server.name": "dbserver1",
    "database.include.list": "inventory",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.inventory"
  }
}

4. 使用数据库轮询(简单但不实时)

原理:定期查询数据库检查变更

示例代码

import time
import mysql.connector

last_update = None

while True:
    conn = mysql.connector.connect(user='user', password='password',
                                  host='localhost', database='db')
    cursor = conn.cursor()
    cursor.execute("SELECT MAX(updated_at) FROM target_table")
    current_update = cursor.fetchone()[0]

    if last_update and current_update > last_update:
        # 发送通知
        send_notification("数据库已更新")

    last_update = current_update
    time.sleep(5)  # 5秒轮询间隔

通知用户的方式

获取到变更后,可以通过以下方式通知用户: 1. WebSocket:实时推送至前端 2. 邮件/SMS:发送邮件或短信通知 3. 移动推送:Firebase Cloud Messaging或APNs 4. 消息队列:RabbitMQ/Kafka通知其他服务

方案选择建议

  1. 高实时性要求:Binlog解析方案(Canal/Debezium)
  2. 简单场景:触发器+消息队列
  3. 云环境:使用云服务提供的CDC工具
  4. 资源有限:轮询方案(最简单但实时性差)

每种方案都有其优缺点,选择时应考虑实时性要求、系统复杂度、维护成本等因素。