前面的章節中我們聊到如何避免保證消息丟失,沒有印象的同學可以再看看,本節我們將展開如何實現kafka的一次精確。
首先我們需要明白兩個概念“冪等”和“事物”
冪等
“冪等”這個詞原是數學領域中的概念,指的是某些操作或函數能夠被執行多次,但每次得到的結果都是不變的。比如乘以 1 就是一個冪等操作,而數加 1 這個操作就不是冪等的,因為執行一次和執行多次的結果不同。
在計算機領域中
- 若一個子程序是冪等的,不管運行這個子程序多少次,與該子程序關聯的那部分系統狀態保持不變。
- 在函數式編程語言(比如 Scala 或 Haskell)中,很多純函數天然就是冪等的。
事務
事務就是是一個操作序列,這些操作要么都執行,要么都不執行,它是一個不可分割的工作單位。Kafka 的事務概念類似于我們熟知的數據庫提供的事務。在數據庫領域,事務提供的安全性保障是經典的 ACID,即原子性(Atomicity)、一致性 (Consistency)、隔離性 (Isolation) 和持久性 (Durability)。
Idempotent Producer
Producer 默認不是冪等性的,但我們可以創建冪等性 Producer。在 0.11.0.0 版本引入了冪等生產者。
- enable.idempotence=true
光有冪等還不夠,還需要事務的保證
Transactional Producer
producer.initTransactions();
try {producer.beginTransaction();producer.send(record1);producer.send(record2);producer.commitTransaction();
} catch (KafkaException e) {producer.abortTransaction();
}
消費者端保證
生產者解決了重復消息的問題,消費這端自然也需要避免重復消費
- enable.auto.commit=false
由默認的自動提交改為手動提交,關于自動提交的最佳實踐可以參考上一節【kafka實踐】11|消費位移提交
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// Process the recordprocessRecord(record);}// Commit the offsets after processing the batch of recordsconsumer.commitSync();
}
當然更建議配合業務層面由冪等處理,這樣就能做到“萬無一失”。
筆者準備了英文原版的kafka經典書籍:kafka in action
電子書版本,掃碼回復:kafka? ?請多多支持!