MySQL的binlog(二进制日志)记录了所有对数据库的修改操作,Canal是阿里巴巴开源的一个基于binlog的增量订阅&消费组件,可以实时捕获数据库变更。
-- 检查binlog是否开启
SHOW VARIABLES LIKE 'log_bin';
-- 修改my.cnf配置文件开启binlog
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
下载并配置Canal:
# conf/canal.properties
canal.instance.mysql.slaveId = 1234
canal.instance.filter.regex = .*\\..*
# conf/example/instance.properties
canal.instance.mysql.address=127.0.0.1:3306
canal.instance.mysql.username=canal
canal.instance.mysql.password=canal
启动Canal服务端:
./bin/startup.sh
Java客户端示例:
// 创建连接
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111),
"example",
"",
""
);
connector.connect();
connector.subscribe(".*\\..*");
while (true) {
Message message = connector.getWithoutAck(100); // 获取指定数量的数据
long batchId = message.getId();
if (batchId != -1) {
try {
for (Entry entry : message.getEntries()) {
if (entry.getEntryType() == EntryType.ROWDATA) {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
for (RowData rowData : rowChange.getRowDatasList()) {
// 处理变更数据
processChange(rowChange.getEventType(), rowData);
}
}
}
connector.ack(batchId); // 确认处理完成
} catch (Exception e) {
connector.rollback(batchId); // 处理失败回滚
}
}
}
private void processChange(EventType eventType, RowData rowData) {
switch (eventType) {
case INSERT:
// 处理插入操作
break;
case UPDATE:
// 处理更新操作
break;
case DELETE:
// 处理删除操作
break;
default:
break;
}
}
通过以上方案,可以实现MySQL数据库变更的实时捕获和处理,适用于数据同步、缓存更新、实时分析等多种场景。