文章目錄
- 概述
- 核心概念
- 生產者
- 示例
- 同步 / 異步發送消息
- 生產者參數配置
- ack-確認機制
- retries - 重試次數
- compression_type - 消息壓縮類型
- 分區機制
- 分區策略
- 消費者
- 消息有序性
- 提交和偏移量
- 偏移量提交方式
- 手動提交
- 高可用設計
- SpringBoot集成Kafka
- 基本使用
- 傳遞對象消息
概述
核心概念
Kafka將生產者發布的消息發布到topic中,需要這些消息的消費者可以訂閱這些主題。
下面這張圖也為我們引出了,Kafka 比較重要的幾個概念:
-
Producer(生產者) : 產生消息的一方。
-
Consumer(消費者) : 消費消息的一方。
-
Broker(代理 / 單個kafka實例):可以看作是一個獨立的Kafka實例。
- 多個Kafka Broker組成一個Kafka Cluster。
- 每個 Broker 中又包含了 Topic 以及 Partition這兩個重要的概念
-
Topic(主題) : Producer將消息發送到特定的主題,Consumer通過訂閱特定的 Topic(主題) 來消費消息。
-
Partition(分區 / 隊列) : Partition 屬于 Topic 的一部分。一個 Topic 可以有多個 Partition ,并且同一 Topic 下的 Partition 可以分布在不同的 Broker 上,這也就表明一個 Topic 可以橫跨多個 Broker
劃重點:Kafka 中的 Partition(分區) 實際上可以對應成為消息隊列中的隊列。 -
消費者組:同一個消費者組中,多個消費者訂閱同一個topic,只有一個消費者可以接收到消息。
生產者
示例
同步 / 異步發送消息
生產者參數配置
ack-確認機制
retries - 重試次數
compression_type - 消息壓縮類型
分區機制
kafka分區機制,允許消息存放在不同broke的不同分區上。
分區策略
默認是輪詢。
消費者
消息有序性
消息都發給同一個分區,就可以保證消息有序性。
提交和偏移量
消費者在消費消息時,可以追蹤消息再分區的位置(偏移量),并自動向一個叫做_consumer_offset
的特殊topic發送消息,包含了分區的偏移量。
如果消費者發送崩潰或者有新的消費者加入群組,會觸發再平衡。例如消費者2掛掉了,那么分區3和分區4將被再平衡機制,指向到其他消費者。
在自動提交偏移量模式下,再平衡機制可能會引發問題,因為掛掉的消費者提交的消息偏移量與新指定的消費者正在處理的消息偏移量是不一致的。
提交偏移量小于正在處理的偏移量:
如果提交的偏移量小于正在處理的最后一個消息的偏移量,那么處于兩個偏移量之間的消息就會被重復處理。
提交偏移量大于正在處理的偏移量:
如果提交的偏移量大于正在處理的最后一個消息的偏移量,那么處于兩個偏移量之間的消息將會丟失。
偏移量提交方式
手動提交
首先將自動提交設置為false:
同步提交
使用 commitSync() 提交偏移量最簡單也最可靠。這個 API 會提交由 poll() 方法返回的最新偏移量,提交成功后馬上返回,如果提交失敗就拋出異常。
commitSync() 將會提交由 poll() 返回的最新偏移量,所以在處理完所有記錄后要確保調用了 commitSync(),否則還是會有丟失消息的風險。
如果發生了再均衡,從最近一批消息到發生再均衡之間的所有消息都將被重復處理。
同時在這個程序中,只要沒有發生不可恢復的錯誤,commitSync() 方法會一直嘗試直至提交成功。如果提交失敗,我們也只能把異常記錄到錯誤日志里。
異步提交
同步提交有一個不足之處,在 broker 對提交請求作出回應之前,應用程序會一直阻塞,這樣會限制應用程序的吞吐量。我們可以通過降低提交頻率來提升吞吐量,但如果發生了再均衡,會增加重復消息的數量。 這個時候可以使用異步提交 API。我們只管發送提交請求,無需等待 broker 的響應。
在成功提交或碰到無法恢復的錯誤之前,commitSync() 會一直重試,但是 commitAsync() 不會,這也是 commitAsync() 不好的一個地方。 它之所以不進行重試,是因為在它收到服務器響應的時候,可能有一個更大的偏移量已經提交成功。假設我們發出一個請求用于提交偏移量 2000,這個時候發生了短暫的通信問題,服務器收不到請求,自然也不會作出任何響應。與此同時,我們處理了另外一批消息,并成功提交了偏移量 3000。如果 commitAsync() 重新嘗試提交偏移量 2000,它有可能在偏移量 3000 之后提交成功。這個時候如果發生再均衡,就會出現重復消息。 commitAsync() 也支持回調,在 broker 作出響應時會執行回調。回調經常被用于記錄提交錯誤或生成度量指標。如果要用它來進行重試,則一定要注意提交的順序。
同步和異步混合提交
一般情況下,針對偶爾出現的提交失敗,不進行重試不會有太大問題,因為如果提交失敗是因為臨時問題導致的,那么后續的提交總會有成功的。 但如果這是發生在關閉消費者或再均衡前的最后一次提交,就要確保能夠提交成功。因此在這種情況下,我們應該考慮使用混合提交的方法:
高可用設計
- 集群方式(cluster),一個kafka集群由多個broke組成,一個broke宕機,其他機器上的broke依然可以對外服務。
- 備份機制(replication):kafka中為了保證消息的安全性,將信息進行了備份,并且定義了兩類副本:
- 領導者副本:生產者首先將消息發送到領導者副本進行備份,領導者副本只有一個。
- 追隨者副本 :領導者副本,將自己的消息與追隨者副本進行同步。追隨者副本可以有多個,且可以分為兩類:
- ISR(in - sync replica):需要同步復制保存的follower。
- 普通: 與領導者副本之間是異步保存。
- 當leader失效后,需要選出新的leader,選舉原則如下:
- 優先從ISR中選擇,因為ISR中的消息數據是與leader同步的
- 如果ISR列表中的follower都不行了,就只能從其他follow中選取。
- 極端情況:所有副本都失效了,這時有兩種方案:
- 等待ISR中的一個活過來,選為leader,數據可靠,但是時間不確定。
- 選擇第一個活過來的副本為leader,不一定位ISR中的, 以最快速度恢復可用性,但是數據不一定完整。
SpringBoot集成Kafka
基本使用