文章目錄
- 背景
- 分析
- 檢查現象
- 檢查B集群是否有異常,導致重復消費的
- 分析同步任務
- 修復問題
- 發現flink job 一直報異常
- 修復問題
背景
使用seatunnel 同步數據從A 集群kafka 同步到B集群kafka,現象是發現兩邊數據不一致,每天10w級別會多幾十條數據
分析
檢查現象
因為兩側kafka的數據同時也會寫es,先檢查兩側es的數據, 通過二分發現,B集群es數據確實比A集群多,多的數據檢查發現是重復的數據,有記錄被寫了多次
檢查B集群是否有異常,導致重復消費的
檢查日志發現有沒有寫入失敗,導致一批數據被重復消費的,從日志來看是沒有的
分析同步任務
檢查seatunnel 任務,發現沒有配置semantics ,然后發又檢查了客戶現場的flink job ,確定有經常任務會有環境導致的重啟現象,應該問題就是這了
sink {kafka {topic = "test_topic"bootstrap.servers = "localhost:9092"format = jsonkafka.request.timeout.ms = 60000semantics = EXACTLY_ONCEkafka.config = {acks = "all"request.timeout.ms = 60000buffer.memory = 33554432}}
修復問題
加上 semantics = EXACTLY_ONCE
sink {kafka {topic = "test_topic"bootstrap.servers = "localhost:9092"format = jsonkafka.request.timeout.ms = 60000kafka.config = {acks = "all"request.timeout.ms = 60000buffer.memory = 33554432}}
發現flink job 一直報異常
最后問題定位到這,官方bug導致,升級最新版本修復
sink 在一致性語義情況下報異常
修復問題
只修改sink一致性語義是不夠的,還要消費b 集群kafka 的客戶端的事務配置是,read_commited的