当使用 Flink CDC Connector 监听 MySQL 表中主键为 BINARY/VARBINARY 类型时,可能会遇到以下错误: - 反序列化失败 - 主键处理异常 - 数据同步中断
MySQL CDC 连接器在处理二进制主键时可能存在以下问题: 1. 二进制数据的序列化/反序列化处理不完善 2. 主键比较逻辑对二进制类型支持不足 3. 元数据解析时对二进制类型的特殊处理缺失
<!-- 使用最新版本的 Flink CDC Connector -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.4.0</version> <!-- 检查并使用最新版本 -->
</dependency>
// 实现自定义的反序列化逻辑
public class BinaryKeyDebeziumDeserializer implements DebeziumDeserializationSchema<RowData> {
@Override
public void deserialize(SourceRecord record, Collector<RowData> out) {
// 特殊处理二进制主键字段
Struct valueStruct = (Struct) record.value();
Struct sourceStruct = valueStruct.getStruct("source");
// 获取二进制主键值并转换为合适格式
byte[] primaryKey = valueStruct.getBytes("your_binary_primary_key");
// 构建RowData并输出
// ...
}
}
如果业务允许,可以考虑: 1. 将 BINARY 主键转换为字符串表示(如 HEX 或 Base64) 2. 添加额外代理主键(如自增ID)
ALTER TABLE your_table
ADD COLUMN id INT AUTO_INCREMENT PRIMARY KEY,
MODIFY COLUMN binary_key VARBINARY(255) UNIQUE;
在 CDC 连接配置中添加以下参数:
MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("your_db")
.tableList("your_db.your_table")
.username("user")
.password("password")
.deserializer(new JsonDebeziumDeserializationSchema())
// 添加二进制处理相关配置
.includeSchemaChanges(true)
.scanNewlyAddedTableEnabled(true)
.build();
如果问题仍然存在,建议提供具体的错误日志以便进一步分析。