当使用 Flink CDC 监听主键为二进制格式(BINARY/VARBINARY/BLOB等)的 MySQL 表时,可能会遇到报错问题。以下是常见问题和解决方案:
Unsupported primary key type: BINARY
Failed to deserialize binary key
Schema mismatch for binary key
如果可能,修改表结构将二进制主键转换为字符串类型:
ALTER TABLE your_table MODIFY COLUMN id VARCHAR(255)
CHARACTER SET utf8mb4 COLLATE utf8mb4_bin;
在 Flink CDC 配置中添加自定义反序列化器:
DebeziumDeserializationSchema<RowData> deserializer = new JsonDebeziumDeserializationSchema(true) {
@Override
public void deserialize(SourceRecord record, Collector<RowData> out) throws Exception {
// 自定义处理二进制主键
Struct value = (Struct) record.value();
Struct after = value.getStruct("after");
// 处理二进制字段
byte[] binaryKey = after.getBytes("id");
String stringKey = bytesToHex(binaryKey); // 转换为十六进制字符串
// 创建新的RowData
GenericRowData rowData = new GenericRowData(1);
rowData.setField(0, stringKey);
out.collect(rowData);
}
};
MySQLSource.<RowData>builder()
.hostname("localhost")
.port(3306)
.databaseList("your_db")
.tableList("your_db.your_table")
.username("user")
.password("password")
.deserializer(deserializer)
.build();
在 Flink SQL 中创建视图进行转换:
-- 原始表定义
CREATE TABLE source_table (
id BINARY,
other_columns STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'user',
'password' = 'password',
'database-name' = 'your_db',
'table-name' = 'your_table'
);
-- 转换视图
CREATE VIEW converted_view AS
SELECT
CAST(id AS STRING) AS id_str,
other_columns
FROM source_table;
在 CDC 连接器配置中添加 Debezium 属性:
MySQLSource.<RowData>builder()
.hostname("localhost")
.port(3306)
.databaseList("your_db")
.tableList("your_db.your_table")
.username("user")
.password("password")
.deserializer(new JsonDebeziumDeserializationSchema())
.debeziumProperties(
PropertiesUtil.debeziumProperties()
.setProperty("binary.handling.mode", "hex")
.build()
)
.build();
如果以上方案仍不能解决问题,建议查看具体的错误日志并提供更多细节以便进一步分析。