当使用 Flink-Connector-MySQL-CDC 监听含有二进制主键的 MySQL 表时,可能会遇到一些特殊问题。以下是常见异常及解决方案:
现象:
- 出现 ClassCastException
或序列化错误
- 数据无法正确解析
解决方案:
// 在创建 CDC source 时明确指定主键类型
MySqlSource<String> source = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("your_db")
.tableList("your_db.your_table")
.username("user")
.password("password")
.deserializer(new JsonDebeziumDeserializationSchema()) // 使用合适的反序列化器
.build();
现象: - 主键值显示为乱码或字节数组形式 - 下游处理异常
解决方案:
// 自定义反序列化器处理二进制主键
public class BinaryKeyDeserializer implements DebeziumDeserializationSchema<String> {
@Override
public void deserialize(SourceRecord record, Collector<String> out) {
Struct value = (Struct) record.value();
Struct after = value.getStruct("after");
// 处理二进制主键
byte[] primaryKey = after.getBytes("id"); // 假设主键字段名为id
String hexKey = Hex.encodeHexString(primaryKey);
// 构建输出数据
String output = String.format("{\"id\":\"%s\", ...}", hexKey);
out.collect(output);
}
// 其他必要方法实现...
}
推荐配置:
MySqlSource.builder()
.serverId(5400) // 确保唯一
.serverTimeZone("UTC") // 明确时区
.scanNewlyAddedTableEnabled(true) // 如果需要
.includeSchemaChanges(true) // 如果需要
.startupOptions(StartupOptions.initial()) // 根据需求选择
.build();
明确主键类型:
日志调试:
env.execute("MySQL CDC Job")
.setParallelism(1)
.getExecutionEnvironment()
.setRuntimeMode(RuntimeExecutionMode.STREAMING)
.enableCheckpointing(30000)
.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000));
版本兼容性检查:
权限验证:
-- MySQL 用户需要的最小权限
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user'@'%';
FLUSH PRIVILEGES;
检查 MySQL binlog 格式是否为 ROW 模式:
SHOW VARIABLES LIKE 'binlog_format';
验证二进制主键是否包含特殊字符或非常规值
检查网络连接和权限设置
启用详细日志:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Logger logger = LoggerFactory.getLogger(YourClass.class);
如果问题仍然存在,建议提供具体的异常堆栈信息以便进一步分析。