那 Kafka 到底在什么情況下才能保證消息不丟失呢?
Kafka 只對“已提交”的消息(committed message)做有限度的持久化保證。
? ? 第一個核心要素是“已提交的消息”。什么是已提交的消息?當 Kafka 的若干個 Broker 成
功地接收到一條消息并寫入到日志文件后,它們會告訴生產者程序這條消息已成功提交。此
時,這條消息在 Kafka 看來就正式變為“已提交”消息了。
? ? 那為什么是若干個 Broker 呢?這取決于你對“已提交”的定義。你可以選擇只要有一個
Broker 成功保存該消息就算是已提交,也可以是令所有 Broker 都成功保存該消息才算是
已提交。不論哪種情況,Kafka 只對已提交的消息做持久化保證這件事情是不變的。
? ? 第二個核心要素就是“有限度的持久化保證”,也就是說 Kafka 不可能保證在任何情況下
都做到不丟失消息。舉個極端點的例子,如果地球都不存在了,Kafka 還能保存任何消息
嗎?顯然不能!倘若這種情況下你依然還想要 Kafka 不丟消息,那么只能在別的星球部署
Kafka Broker 服務器了。
? ? 現在你應該能夠稍微體會出這里的“有限度”的含義了吧,其實就是說 Kafka 不丟消息是
有前提條件的。假如你的消息保存在 N 個 Kafka Broker 上,那么這個前提條件就是這 N
個 Broker 中至少有 1 個存活。只要這個條件成立,Kafka 就能保證你的這條消息永遠不會
丟失。
? ? ?總結一下,Kafka 是能做到不丟失消息的,只不過這些消息必須是已提交的消息,而且還要
滿足一定的條件。當然,說明這件事并不是要為 Kafka 推卸責任,而是為了在出現該類問
題時我們能夠明確責任邊界。
最佳實踐
- 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。記住,一
定要使用帶有回調通知的 send 方法。
- ?設置 acks = all。acks 是 Producer 的一個參數,代表了你對“已提交”消息的定義。
如果設置成 all,則表明所有副本 Broker 都要接收到消息,該消息才算是“已提交”。
這是最高等級的“已提交”定義。
- ?設置 retries 為一個較大的值。這里的 retries 同樣是 Producer 的參數,對應前面提到
的 Producer 自動重試。當出現網絡的瞬時抖動時,消息發送可能會失敗,此時配置了retries > 0 的 Producer 能夠自動重試消息發送,避免消息丟失。
- 設置 unclean.leader.election.enable = false。這是 Broker 端的參數,它控制的是哪
些 Broker 有資格競選分區的 Leader。如果一個 Broker 落后原先的 Leader 太多,那么
它一旦成為新的 Leader,必然會造成消息的丟失。故一般都要將該參數設置成 false,
即不允許這種情況的發生。
- 設置 replication.factor >= 3。這也是 Broker 端的參數。其實這里想表述的是,最好將
消息多保存幾份,畢竟目前防止消息丟失的主要機制就是冗余。
6. 設置 min.insync.replicas > 1。這依然是 Broker 端參數,控制的是消息至少要被寫入
到多少個副本才算是“已提交”。設置成大于 1 可以提升消息持久性。在實際環境中千
萬不要使用默認值 1。
- 確保 replication.factor > min.insync.replicas。如果兩者相等,那么只要有一個副本掛
機,整個分區就無法正常工作了。我們不僅要改善消息的持久性,防止數據丟失,還要
在不降低可用性的基礎上完成。推薦設置成 replication.factor = min.insync.replicas +
1。
- 確保消息消費完成再提交。Consumer 端有個參數 enable.auto.commit,最好把它設
置成 false,并采用手動提交位移的方式。就像前面說的,這對于單 Consumer 多線程
處理的場景而言是至關重要的。
推薦閱讀
- 事件風暴在DDD中的應用
- 網關層數據脫敏
- 建立估算軟件開發工作量的方法