当使用 Flink CDC 监听 MySQL 表时,如果表的主键是二进制类型(如 BINARY, VARBINARY, BLOB 等),可能会遇到 ClassCastException 异常。这是因为 Flink CDC 在处理二进制主键时需要进行特殊处理。
错误通常类似:
java.lang.ClassCastException: [B cannot be cast to java.lang.String
在 Flink CDC MySQL 连接器配置中,添加 binary-handling-mode
参数:
DebeziumSourceFunction<SourceRecord> sourceFunction = MySQLSource.<SourceRecord>builder()
.hostname("localhost")
.port(3306)
.databaseList("your_database")
.tableList("your_database.your_table")
.username("username")
.password("password")
.deserializer(new YourDebeziumDeserializer()) // 使用自定义反序列化器
.debeziumProperties(
new Properties() {{
// 处理二进制数据为base64编码字符串
put("binary.handling.mode", "base64");
// 或者处理为十六进制字符串
// put("binary.handling.mode", "hex");
}}
)
.build();
实现自定义的反序列化器来处理二进制主键:
public class BinaryKeyDebeziumDeserializer implements DebeziumDeserializationSchema<SourceRecord> {
@Override
public void deserialize(SourceRecord record, Collector<SourceRecord> out) throws Exception {
// 处理二进制主键
Struct value = (Struct) record.value();
Struct after = value.getStruct("after");
// 获取主键字段并转换
byte[] primaryKey = after.getBytes("your_primary_key_column");
String primaryKeyStr = Base64.getEncoder().encodeToString(primaryKey);
// 更新记录
after.put("your_primary_key_column", primaryKeyStr);
out.collect(record);
}
@Override
public TypeInformation<SourceRecord> getProducedType() {
return TypeInformation.of(SourceRecord.class);
}
}
如果业务允许,可以考虑修改表结构: 1. 将二进制主键改为字符串类型 2. 或者添加一个额外的字符串类型主键
binary.handling.mode
配置对于生产环境,推荐组合使用方案1和方案2:
DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("your_db")
.tableList("your_db.your_table")
.username("user")
.password("pass")
.deserializer(new BinaryKeyDebeziumDeserializer())
.debeziumProperties(
new Properties() {{
put("binary.handling.mode", "base64");
put("include.schema.changes", "false");
}}
)
.build();
通过以上方法,应该能够解决 Flink CDC 监听 MySQL 二进制主键时的 ClassCastException 问题。