目錄
- 消息隊列應用場景和基礎知識
- MQ常見的應用場景
- MQ消息隊列的兩種消息模式
- 如何保證消息隊列的高可用?
- 如何保證消息不丟失?
- 如何保證消息不被重復消費?如何保證消息消費的冪等性?
- 重復消費的原因
- 解決方案
- 如何保證消息被消費的順序性?
- 常見的消息隊列中間件
- push 和 pull
- kafka基礎概念
- kafka架構
- topic
- Broker
- Partition
- 為什么要分區的一些原因
- kafka的ack機制
- offset
- isr
- AR(Assigned Replicas)
- LEO
- HW
- kafka producer
- 3 臺 broker, partition 2個備份
- 發送消息
- 消息路由到不同的partition
- 查看磁盤上的`kafka-logs`文件目錄下對應的topic文件
- Producer如何將消息可靠的發送給Kafka集群
- ack應答機制(3種策略)
- retries
- batch.size
- linger.ms
- requests.timeout.ms
- max.in.flight.requests.per.connection
- kafkaProducer源碼流程圖
- broker
- 文件目錄和索引
- broker leader選舉
- Kafka Controller 選舉
- 如何選舉出 partition leader?
- kafka consumer
- Consumer Group(消費組)
- 消費者數據的不丟失?
- 提交offset
- 低級消費者(或簡單消費者)
- 高級消費者
- kafka重復消費的根本原因就是"數據消費了,但是offset沒更新"!而我們要探究一般什么情況下會導致offset沒更新?
- 再均衡(Rebalance)
- 什么情況下會發生再平衡?
- 再均衡協議
- 再均衡過程
- 消費者組狀態以及狀態轉換
- 再平衡例子和策略?
- Range(默認)
- Round Robin
- Sticky
- 問題:kafka 消息丟失/重復場景
- 問題:Kafka分區數越多越好嗎?
- 問題:Kafka如何消息的順序性和一致性
- LEO & HW 保證副本的數據一致性
- 冪等性
- 冪等生產者的內部機制
- 其它知識
- kafka為什么很快
- 順序寫入日志
- 零拷貝 mmap
- 預讀數據/批處理(讀一塊然后處理)
- 日志分段(有類似跳表索引)
- 數據壓縮
- kafka使用`page cache`
- kafka集群多Topic性能下降
- 死信隊列(Dead-Letter Queue)
- 數據傳輸的事物定義有哪三種?
消息隊列應用場景和基礎知識
MQ常見的應用場景
- 異步處理
將一些非核心的業務流程以異步并行的方式執行,從而減少請求響應時間,提高系統吞吐量。
- 應用解耦
通過消息隊列,使得每個應用系統不必受其他系統影響,更加獨立
- 流量削峰
一般在秒殺或團搶活動中使用廣泛,后臺系統根據消息隊列中的消息信息,進行秒殺業務處理。
- 消息通訊
消息通訊是指應用間的數據通信。消息隊列一般都內置了高效的通信機制,因此也可以用在純的消息通訊。比如實現點對點消息隊列,或者聊天室等點對點通訊。
- 延時任務
場景:用戶在美團 APP 下單,假如沒有立即支付,進入訂單詳情會顯示倒計時,如果超過支付時間,訂單就會被自動取消。
處理方式:訂單服務生成訂單后,發送一條延時消息到消息隊列。消息隊列在消息到達支付過期時間時,將消息投遞給消費者,消費者收到消息之后,判斷訂單狀態是否為已支付,假如未支付,則執行取消訂單的邏輯。
- 數據中轉樞紐
例如,當應用日志用于離線日志分析時,搜索單個日志記錄同樣不可或缺,而構建各自獨立的工作流來采集每種類型的數據再導入到各自的專用系統顯然不切實際,利用消息隊列 Kafka 作為數據中轉樞紐,同份數據可以被導入到不同專用系統中
- 分布式事務:保證數據一致性
-
場景
訂單服務需要同時生成訂單和扣減庫存,這涉及兩個不同的數據庫操作。
如果一個成功一個失敗,就會導致數據不一致。 -
解決方案
通過MQ實現分布式事務。訂單服務生成訂單后,將扣減庫存的任務交給MQ,最終實現數據的一致性。
通過“最終一致性”解決了分布式事務的難題,雖然短時間內可能有數據不一致,但最終狀態一定是正確的。
MQ消息隊列的兩種消息模式
在JMS標準中,有兩種消息模型P2P(Point to Point),Publish/Subscribe(Pub/Sub)。
-
點對點模式
- 每個消息只有一個接收者(Consumer)(即一旦被消費,消息就不再在消息隊列中);
- 發送者和接收者間沒有依賴性,發送者發送消息之后,不管有沒有接收者在運行,都不會影響到發送者下次發送消息;
- 接收者在成功接收消息之后需向隊列應答成功,以便消息隊列刪除當前接收的消息;
-
發布/訂閱模式
- 每個消息可以有多個訂閱者;
- 發布者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須創建一個訂閱者之后,才能消費發布者的消息。
- 為了消費消息,訂閱者需要提前訂閱該角色主題,并保持在線運行
如何保證消息隊列的高可用?
- 鏡像集群模式
非分布式,如果某個 queue 負載很重,加機器,新增的機器也包含了這個 queue 的所有數據,并沒有辦法線性擴展 queue
- 分布式,備份
分布式消息隊列,就是說一個 topic 的數據,是分散放在多個機器上的,每個機器就放一部分數據
例如
- 生產者寫 leader,然后 leader 將數據落地寫本地磁盤,接著其它 follower 自己主動從 leader 來 pull 數據。一旦所有 follower 同步好數據了,就會發送 ack 給 leader,leader 收到所有 follower 的 ack 之后,就會返回寫成功的消息給生產者
- 消費的時候,只會從 leader 去讀,但是只有當一個消息已經被所有 follower 都同步成功返回 ack 的時候,這個消息才會被消費者讀到
如何保證消息不丟失?
消息丟失的環節;
- 發送過程中丟失
- MQ還未持久化消息,然后宕機了
- 消費者消費到消息但是未處理(此時MQ把消息已經刪除了),然后宕機了
如何保證不丟?
- 生產者,MQ之間確認
- MQ持久化
- MQ,消費者之間確認
如何保證消息不被重復消費?如何保證消息消費的冪等性?
重復消費的原因
消息重復的根本原因是網絡不可達,且不可避免
-
生產者發送時未收到MQ的響應(可能網絡閃斷,不過最終MQ處理完成了)然后重發,最終也是成功處理,這會導致重復消息進入MQ,后續消費重復
-
消費者消費到消息,完成業務處理,當消費者給MQ服務端反饋應答的時候網絡閃斷。MQ還保留著消息,保證消息至少被消費一次,在網絡恢復后再次把消息嘗試投遞給消費者去處理,這樣消費者就收到兩條一樣的消息了
解決方案
- 消息發送者發送消息時攜帶一個全局唯一的消息id
- 消費者獲取消息后,先根據id在 redis/db 中查詢是否之前 該記錄被消費過了
- 如果沒有被消費過,則消費并寫入 redis/db; 否則直接忽略
如何保證消息被消費的順序性?
消息有序是指:按照消息發送的順序來消費
- 生產者保證消息的順序到達MQ
- 消費者保證順序消費MQ中的消息
1:1:1,局部順序消費
常見的消息隊列中間件
特性 | ActiveMQ | RabbitMQ | RocketMQ | kafka |
---|---|---|---|---|
開發語言 | java | erlang | java | scala |
單機吞吐量 | 萬級 | 萬級 | 10萬級 | 10萬級 |
時效性 | ms級 | us級 | ms級 | ms級以內 |
可用性 | 高(主從架構) | 高(主從架構) | 非常高(分布式架構) | 非常高(分布式架構) |
功能特性 | 成熟的產品,在很多公司得到應用;有較多的文檔;各種協議支持較好 | 基于erlang開發,所以并發能力很強,性能極其好,延時很低;管理界面較豐富 | MQ功能比較完備,擴展性佳 | 只支持主要的MQ功能,像一些消息查詢,消息回溯等功能沒有提供,畢竟是為大數據準備的,在大數據領域應用廣。 |
push 和 pull
x | push模型 | pull模型 |
---|---|---|
描述 | 服務端主動發送數據給客戶端 | 客戶端主動從服務端拉取數據,通常客戶端會定時拉取 |
實時性 | 較好,收到數據后可立即發送給客戶端 | 一般,取決于pull的間隔時間 |
服務端狀態 | 需要保存push狀態,哪些客戶端已經發送成功,哪些發送失敗 | 服務端無狀態 |
客戶端狀態 | 無需額外保存狀態 | 需保存當前拉取的信息的狀態,以便在故障或者重啟的時候恢復 |
狀態保存 | 集中式,集中在服務端 | 分布式,分散在各個客戶端 |
負載均衡 | 服務端統一處理和控制 | 客戶端之間做分配,需要協調機制,如使用zookeeper |
其它 | 服務端需要做流量控制,無法最大化客戶端的處理能力;其次,在客戶端故障情況下,無效的push對服務端有一定負載。 | 客戶端的請求可能很多無效或者沒有數據可供傳輸,浪費帶寬和服務器處理能力 |
缺點方案 | 服務器端的狀態存儲是個難點,可以將這些狀態轉移到DB或者key-value存儲,來減輕server壓力。 | 針對實時性的問題,可以將push加入進來,push小數據的通知信息,讓客戶端再來主動pull。針對無效請求的問題,可以設置逐漸延長間隔時間的策略,以及合理設計協議盡量縮小請求數據包來節省帶寬。 |
kafka基礎概念
kafka架構
- Kafka中消息是以topic進行分類的,生產者生產消息,消費者消費消息,都是面向topic的
- Producer將每一條消息順序IO追加到partition對應到log文件末尾
- 每條消息都有一個offset
- Partition是用來存儲數據的,但并不是最小的數據存儲單元。Partition下還可以細分成Segment,每個Partition是由一個或多個Segment組成。每個Segment分別對應兩個文件:一個是以.index結尾的索引文件,另一個是以.log結尾的數據文件,且兩個文件的文件名完全相同。所有的Segment均存在于所屬Partition的目錄下。
- 消息按規則路由到不同的partition中,每個partition內部消息是有序的,但是不同partition之間不是有序的
- Producer生產消息push到Kafka集群;Consumer通過pull的方式從Kafka集群拉取消息
topic
- Topic是Kafka數據寫入操作的基本單元,可以指定副本
一個Topic包含一個或多個Partition,建Topic的時候可以手動指定* Partition個數,個數與服務器個數相當 每條消息屬于且僅屬于一個Topic - Producer發布數據時,必須指定將該消息發布到哪個Topic
- Consumer訂閱消息時,也必須指定訂閱哪個Topic的信息
Broker
Broker 是指 Kafka 集群中的一個節點(或服務器)。它是 Kafka 系統的核心組件之一,負責存儲和管理消息數據,并處理生產者和消費者之間的消息傳遞。簡單來說,Broker 就是一個運行 Kafka 服務的進程,它承擔了以下主要職責:
- 存儲消息數據
- 每個 Broker 負責存儲分配給它的分區(Partition)的數據。
- 分區是 Kafka 中的基本存儲單元,每個分區對應一個日志文件(Log Segment),Broker 會將消息持久化到磁盤中。
- 接收生產者的消息
- 生產者(Producer)將消息發送到特定的 Topic,Broker 根據分區策略(如輪詢、哈希等)將消息寫入對應的分區。
- 為消費者提供消息
- 消費者(Consumer)從 Broker 中拉取消息數據,Broker 負責根據消費者的消費進度(Offset)返回相應的消息。
- 副本管理
- 每個分區可以有多個副本(Replica),其中一個副本是 Leader,其他副本是 Follower。
- Broker 負責管理分區的 Leader 和 Follower 副本,確保數據的高可用性和一致性。
- 集群協調
- 多個 Broker 組成一個 Kafka 集群,通過 Zookeeper 或 Kafka 自帶的 Raft 協議進行協調。
- Broker 之間會互相通信,完成分區分配、Leader 選舉等任務。
Partition
- 每個Partition在物理上對應一個文件夾,該文件夾下存儲這個Partition的所有消息和索引文件
- 一個Partition中的數據是有序、不可變的,使用偏移量(offset)唯一標識一條數據,是一個long類型的數據
為什么要分區的一些原因
-
分區類似hadoop分布式文件系統,也類似于spark中RDD以partition方式將數據分塊存儲,一是分塊存儲提高數據概率性安全,比如分布式的文件塊,如果某臺機器宕機數據丟失,也是丟失一部分,不會出現整個文件全軍覆沒。另外通過partition級別的冗余存儲(replication.factor)來保證partition級別的安全性。
-
每個partition可以被認為是一個無限長度的數組,新數據順序追加進這個數組。物理上,每個partition對應于一個文件夾。一個broker上可以存放多個partition。這樣,producer可以將數據發送給多個broker上的多個partition,consumer也可以并行從多個broker上的不同partition上讀數據,實現了水平擴展
-
并行讀寫:磁盤寫入速度就是kafka處理速度的極限,處理不過來就要加機器。每臺機器持有不同的partition。Producer產生10個新消息,可以交給10個broker上的partition一起寫;Consumer消費10個新消息,可以從10個broker上面一起讀。
-
The partitions in the log serve several purposes. First, they allow the log to scale beyond a size that will fit on a single server. Each individual partition must fit on the servers that host it, but a topic may have many partitions so it can handle an arbitrary amount of data. Second they act as the unit of parallelism—more on that in a bit.The partitions of the log are distributed over the servers in the Kafka cluster with each server handling data and requests for a share of the partitions. Each partition is replicated across a configurable number of servers for fault tolerance.
kafka的ack機制
request.required.acks有三個值 0 1 -1
- 0: 生產者不會等待broker的ack,這個延遲最低但是存儲的保證最弱當server掛掉的時候就會丟數據
- 1: 服務端會等待ack值,leader副本確認接收到消息后發送ack但是如果leader掛掉后他不確保是否復制完成新leader也會導致數據丟失
- -1: 同樣在1的基礎上,服務端會等所有的follower的副本受到數據后才會受到leader發出的ack,這樣數據不會丟失消費消息
offset
Kafka會為每一個Consumer Group保留一些metadata信息——當前消費的消息的position,也即offset。這個offset由Consumer控制。正常情況下Consumer會在消費完一條消息后遞增該offset。當然,Consumer也可將offset設成一個較小的值,重新消費一些消息。因為offet由Consumer控制,所以Kafka broker是無狀態的,它不需要標記哪些消息被哪些消費過,也不需要通過broker去保證同一個Consumer Group只有一個Consumer能消費某一條消息,因此也就不需要鎖機制,這也為Kafka的高吞吐率提供了有力保障。
isr
全稱是:In-Sync Replicas,isr 是一個副本的列表,里面存儲的都是能跟leader 數據一致的副本,確定一個副本在isr列表中,有2個判斷條件
- 條件1:根據副本和leader 的交互時間差,如果大于某個時間差 就認定這個副本不行了,就把此副本從isr 中剔除,此時間差根據
配置參數rerplica.lag.time.max.ms=10000 決定 單位ms - 條件2:根據leader 和副本的信息條數差值決定是否從isr 中剔除此副本,此信息條數差值根據配置參數rerplica.lag.max.messages=4000 決定 單位條 isr 中的副本刪除或者增加 都是通過一個周期調度來管理的
AR(Assigned Replicas)
-
分區中的所有副本統稱為
AR
(Assigned Replicas)。 -
所有與leader副本保持一定程度同步的副本(包括leader副本在內)組成
ISR
(In Sync Replicas)。 -
ISR 集合是 AR 集合的一個子集。消息會先發送到leader副本,然后follower副本才能從leader中拉取消息進行同步。同步期間,follower副本相對于leader副本而言會有一定程度的滯后。前面所說的 ”一定程度同步“ 是指可忍受的滯后范圍,這個范圍可以通過參數進行配置
-
leader副本同步滯后過多的副本(不包括leader副本)將組成
OSR
(Out-of-Sync Replied) -
AR = ISR + OSR。正常情況下,所有的follower副本都應該與leader 副本保持 一定程度的同步,即AR=ISR,OSR集合為空
LEO
LEO(log end offset) 稱為日志末端位移,代表日志文件中下一條待寫入消息的 offset,這個 offset 上實際是沒有消息的。
分區 ISR 集合中的每個副本(所有的 leader 和 follower 副本)都會維護自身的 LEO。當 leader 副本收到生產者的一條消息,LEO 通常會自增 1,而 follower 副本需要從 leader 副本 fetch 到數據后,才會增加它的 LEO。
HW
HW(High Watermark) 稱為高水位,它標識了一個特定的消息偏移量(offset),消費者只能拉取到這個 offset 之前的消息。
SR 集合中最小的 LEO 即為分區的 HW,消費者只能讀取到小于高水位線以下的消息,即成功復制到所有副本的最后一條消息。因此,對于同一個副本對象,其高水位值不會大于 LEO 值。
在分區高水位以下的消息被認為是已提交消息,反之就是未提交消息。消費者只能消費已提交的消息,即圖中位移小于 8 的所有消息
最后,leader 副本會比較自己的 LEO 以及滿足條件的 follower 副本上的 LEO ,選取兩者中較小值作為新的 HW ,來更新自己的 HW 值
kafka producer
3 臺 broker, partition 2個備份
- 三臺broker(0,1,2), 設置topic的partition數目是2,replication備份因子是2
- 對于partition 0,leader 是 3,follower 是 1,兩個備份
- 對于partition 1,leader 是 1,follower 是 2,兩個備份
發送消息
一次發送了4條消息,內容分別是
空
a1
b1
c1
消息路由到不同的partition
空
和b1
存儲到了partition 0
中a1
和c1
存儲到了partition 1
中
查看磁盤上的kafka-logs
文件目錄下對應的topic文件
- .index結尾的索引文件,
- .log結尾的數據文件
Producer如何將消息可靠的發送給Kafka集群
ack應答機制(3種策略)
acks指定了必須有多少個分區副本接收到了消息,生產者才會認為消息是發送成功的。
- acks=0,生產者成功寫入消息之前不會等待來自任何服務器的響應,這種配置,提高吞吐量,但是消息存在丟失風險。
- acks=1,只要集群的leader(master)收到了消息,生產者將會受到發送成功的一個響應,如果消息無撞到達首領節點(比如首領節點崩潰,新的首領還沒有被選舉出來),生產者會收到一個錯誤響應,為了避免數據丟失,生產者會重發消息。不過,如果一個沒有收到消息的節點成為新首領,消息還是會丟失。這個時候的吞吐量取決于使用的是同步發送還是異步發送。如果讓發送客戶端等待服務器的響應(通過調用Future 對象的get()方法,顯然會增加延遲(在網絡上傳輸一個來回的延遲)。如果客戶端使用回調,延遲問題就可以得到緩解,不過吞吐量還是會受發送中消息數量的限制(比如,生產者在收到服務器響應之前可以發送多少個消息)。
- acks=-1,所有參與復制的節點全部收到消息的時候,生產者才會收到來自服務器的一個響應,這種模式最安全,但是吞吐量受限制,它可以保證不止一個服務器收到消息,就算某臺服務器奔潰,那么整個集群還是會正產運轉。
retries
生產者從服務器收到的錯誤消息有可能是臨時的,當生產者收到服務器發來的錯誤消息,會啟動重試機制,當重試了n(設置的值)次,還是收到錯誤消息,那么將會返回錯誤。生產者會在每次重試之間間隔100ms,不過可以通過retry.backoff.ms
改變這個間隔。
batch.size
當多個消息發往同一個分區,生產者會將他們放進同一個批次,該參數指定了一個批次可以使用的內存大小,按照字節數進行計算(不是消息個數);當批次被填滿,批次里面所有得消息將會被發送;半滿的批次,甚至只包含一個消息也可能會被發送,所以即使把批次設置的很大,也不會造成延遲,只是占用的內存打了一些而已。但是設置的太小,那么生產者將會頻繁的發送小,增加一些額外的開銷。
linger.ms
該參數指定了生產者在發送批次之前等待更多消息加入批次的時間。KafkaProducer會在批次填滿或linger.ms達到上限時把批次發送出去。默認情況下,只要有可用的線程, 生產者就會把消息發送出去,就算批次里只有一個消息。把linger.ms設置成比0大的數,讓生產者在發送批次之前等待一會兒,使更多的消息加入到這個批次。雖然這樣會增加延遲,但也會提升吞吐量(因為一次性發送更多的消息,每個消息的開銷就變小了)
requests.timeout.ms
生產者發送數據時等待服務器返回響應的時間
max.in.flight.requests.per.connection
指定了生產者收到服務器響應之前可以發送多少個消息。它的值越高,將會消耗更多的內存,不過也會提升吞吐量。設置為1,可以保證消息是按照發送的順序寫入服務器。即使發生了重試。
kafkaProducer源碼流程圖
broker
文件目錄和索引
log默認保留168小時(7天),默認一個segment文件大小1G,否則產生新文件
.log
存儲數據.index
存儲索引(類似跳表的索引)
為數據文件建索引采取了稀疏存儲:每隔一定字節的數據建立一條索引(這樣的目的是為了減少索引文件的大小)
broker leader選舉
leader選舉借助zookeeper最簡單最直觀的方案是:leader在zk上創建一個臨時節點,所有Follower對此節點注冊監聽,當leader宕機時,此時ISR里的所有Follower都嘗試創建該節點,而創建成功者(Zookeeper保證只有一個能創建成功)即是新的Leader,其它Replica即為Follower。
缺點:當kafka集群業務很多,partition達到成千上萬時,當broker宕機時,造成集群內大量的調整,會造成大量Watch事件被觸發,Zookeeper負載會過重,而zk是不適合大量寫操作的。
kafka在所有broker中選出一個controller,所有Partition的Leader選舉都由controller決定。controller會將Leader的改變直接通過RPC的方式(比Zookeeper Queue的方式更高效)通知需為此作出響應的Broker。同時controller也負責增刪Topic以及Replica的重新分配。
Kafka Controller 選舉
Kafka Controller的選舉是依賴Zookeeper來實現的,在Kafka集群中哪個broker能夠成功創建/controller
這個臨時(EPHEMERAL)節點,就可以成為Kafka Controller。
controller實現如上功能:
- brokers列表:
ls /brokers/ids
- 某個broker信息:
get /brokers/ids/0
- topic信息:
get /brokers/topics/kafka10-topic-20170924
- partition信息:
get /brokers/topics/kafka10-topic-20170924/partitions/0/state
- controller中心節點變更次數:
get /controller_epoch
- conrtoller leader信息:
get /controller
如何選舉出 partition leader?
不同場景下的選舉思路不同;基本思路如下
-
按照AR集合中副本的順序查找第一個存活的副本,并且這個副本在ISR集合中。
-
一個分區的AR集合在分配的時候就被指定,并且只要不發生重分配的情況,集合內部副本的順序是保持不變的,而分區的ISR集合中副本的順序可能會改變。注意這里是根據AR的順序而不是ISR的順序進行選舉的。
-
分區進行重分配(reassign)的時候也需要執行leader的選舉動作。
思路:從重分配的AR列表中找到第一個存活的副本,且這副本在目前的ISR列表中。 -
發生優先副本(preferred replica partition leader election)的選舉時,直接將優先副本設置為leader即可,AR集合中的第一個副本即為優先副本。
-
當某節點被優雅地關閉(也就是執行ControlledShutdown)時,位于這個節點上的leader副本都會下線,所以與此對應的分區需要執行leader的選舉。從AR列表中找到第一個存活的副本,且這個副本在目前的ISR列表中,與此同時還要確保這個副本不處于正在被關閉的節點上。
kafka consumer
Consumer Group(消費組)
consumer group是kafka提供的可擴展且具有容錯性的消費者機制
- 一個消費組可以有多個消費者或消費者實例(consumer instance)
- 一個消費組下的所有消費者共享一個公共的ID,即group ID
- 一個消費組下訂閱的topic下的每個partition只能分配給該group下的一個consumer(當然該分區還可以被分配給其他group),即一個partition只能被一個消費進程/線程消費,而不能被多個消費進程/線程消費(當然一個消費進程/線程可以消費多個partition)
- 每一個消費組都會被記錄它在某一個分區的Offset,即不同consumer group針對同一個分區,都有"各自"的偏移量
- 同一個 Topic 可以多個消費組
消費者數據的不丟失?
通過offset commit來保證數據的不丟失。kafka自己記錄了每次消費的offset數值,下次消費的時候從上次的offset繼續消費
而offset的信息在kafka0.9版本之前保存在zookeeper中,在0.9版本后保存到了一個topic中,即使消費者在運行過程中掛掉了,再次啟動的時候會找到offset的值,即可接著上次消費。由于offset的信息寫入的時候并不是每條消息消費完成就寫入,所以會導致有重復消費的問題,但是不會丟失消息
唯一例外的情況是,我們在程序中給原本做不同功能的兩個consumer組設置了KafkaSpoutConfig.builder.setGroupid的時候設置成了同樣的groupid,這種情況會導致這兩個組共享了同一份數據,就會產生組A消費partition1,partition2中的消息,組B消費partition3的消息,這樣每個組消費的消息都會丟失,都是不完整的。為了保證每個組都能獨享一份消息數據,groupid一定不要重復
提交offset
offset提交的方式有兩種:自動提交和手動提交。
offset下標自動提交其實在很多場景都不適用,因為自動提交是在kafka拉取到數據之后就直接提交,這樣很容易丟失數據,尤其是在需要事物控制的時候。很多情況下我們需要從kafka成功拉取數據之后,對數據進行相應的處理之后再進行提交。如拉取數據之后進行寫入mysql這種,所以這時我們就需要進行手動提交kafka的offset下標。
手動提交偏移量:
1. 同步提交
2. 異步提交
3. 異步+同步 組合的方式提交
低級消費者(或簡單消費者)
高級消費者
- 問題一: Kafka 高級消費者怎樣才能達到最大吞吐量?
答:分區數量與線程數量一致。
- 問題二: 消費者消費能力不足時,如果提高并發?
答:1. 增加分區個數; 2. 增加消費者線程數; 3.自動提交 offset
- 在高階消費者中,Offset 采用自動提交的方式。
自動提交時,假設 1s 提交一次 offset 的更新,設當前 offset=10,當消費者消費了 0.5s 的數據,offset 移動了 15,由于提交間隔為 1s,因此這一 offset 的更新并不會被提交,這時候我們寫的消費者掛掉,重啟后,消費者會去 ZooKeeper 上獲取讀取位置,獲取到的 offset 仍為 10,它就會重復消費,這就是一個典型的重復消費問題。
高階消費者存在一個弊端,即消費者消費到哪里由高階消費者 API 進行提交,提交到 ZooKeeper,消費者線程不參與 offset 更新的過程,這就會造成數據丟失(消費者讀取完成,高級消費者 API 的 offset 已經提交,但是還沒有處理完成 Spark Streaming 掛掉,此時 offset 已經更新,無法再消費之前丟失的數據),還有可能造成數據重復讀取(消費者讀取完成, 高級消費者 API 的 offset 還沒有提交,讀取數據已經處理完成后 Spark Streaming 掛掉,此時 offset 還沒有更新,重啟后會再次消費之前處理完成的數據)。
kafka重復消費的根本原因就是"數據消費了,但是offset沒更新"!而我們要探究一般什么情況下會導致offset沒更新?
- 原因1:強行kill線程,導致消費后的數據,offset沒有提交(消費系統宕機、重啟等)
- 原因2:設置offset為自動提交,關閉kafka時,如果在close之前,調用 consumer.unsubscribe() 則有可能部分offset沒提交,下次重啟會重復消費
- 原因3:(重復消費最常見的原因):消費后的數據,當offset還沒有提交時,partition就斷開連接。比如,通常會遇到消費的數據,處理很耗時,導致超過了Kafka的session timeout時間(0.10.x版本默認是30秒),那么就會re-blance重平衡,此時有一定幾率offset沒提交,會導致重平衡后重復消費
- 原因4:當消費者重新分配partition的時候,可能出現從頭開始消費的情況,導致重發問題。
- 原因5:當消費者消費的速度很慢的時候,可能在一個session周期內還未完成,導致心跳機制檢測報告出問題。
- 原因6:并發很大,可能在規定的時間(session.time.out默認30s)內沒有消費完,就會可能導致rebalance重平衡,導致一部分offset自動提交失敗,然后重平衡后重復消費
再均衡(Rebalance)
再平衡指的是在kafka consumer所訂閱的topic發生變化時發生的一種分區重分配機制。
什么情況下會發生再平衡?
- 有新的消費者加入消費組
- 有消費者宕機或下線
- 消費者并不一定需要真正下線,例如遇到長時間的GC、網絡延遲導致消費者長時間未向 GroupCoordinator 發送心跳等情況時,GroupCoordinator 會認為消費者已經下線
- 有消費者主動退出消費組(發送 LeaveGroupRequest 請求)。比如客戶端調用了 unsubscrible() 方法取消對某些主題的訂閱
- 消費組所對應的 GroupCoordinator 節點發生了變更,例如coordinator掛了,集群選舉出新的coordinator。GroupCoordinator 是 Kafka 服務端中用于管理消費組的組件。而消費者客戶端中的 ConsumerCoordinator 組件負責與 GroupCoordinator 進行交互
- 消費組內所訂閱的任一主題或者主題的partition數量發生變化
再均衡協議
- Heartbeat請求:consumer需要定期給coordinator發送心跳來表明自己還活著
- LeaveGroup請求:主動告訴coordinator我要離開consumer group
- SyncGroup請求:group leader把分配方案告訴組內所有成員
- JoinGroup請求:成員請求加入組
- DescribeGroup請求:顯示組的所有信息,包括成員信息,協議名稱,分配方案,訂閱信息等。通常該請求是給管理員使用
再均衡過程
Kafka中分區再均衡主要分為兩步,第一步是JoinGroup,第二步是SyncGroup
-
Join, 顧名思義就是加入組。這一步中,所有成員都向coordinator發送JoinGroup請求,請求入組。一旦所有成員都發送了JoinGroup請求,coordinator會從中選擇一個consumer擔任leader的角色,并把組成員信息以及訂閱信息發給leader:注意leader和coordinator不是一個概念。leader負責消費分配方案的制定。
-
Sync,這一步leader開始分配消費方案,即哪個consumer負責消費哪些topic的哪些partition。一旦完成分配,leader會將這個方案封裝進SyncGroup請求中發給coordinator,非leader也會發SyncGroup請求,只是內容為空。coordinator接收到分配方案之后會把方案塞進SyncGroup的response中發給各個consumer。這樣組內的所有成員就都知道自己應該消費哪些分區了。
消費者組狀態以及狀態轉換
- Dead:組內已經沒有任何成員的最終狀態,組的元數據也已經被coordinator移除了。這種狀態響應各種請求都是一個response: UNKNOWN_MEMBER_ID
- Empty:組內無成員,但是位移信息還沒有過期。這種狀態只能響應JoinGroup請求
- PreparingRebalance:組準備開啟新的rebalance,等待成員加入
- AwaitingSync:正在等待leader consumer將分配方案傳給各個成員
- Stable:rebalance完成,可以開始消費了
再平衡例子和策略?
Range(默認)
range策略:首先會計算每個consumer可以消費的分區個數,然后按照順序將指定個數范圍的分區分配給各個consumer;這種方式分配只是針對消費者訂閱的topic的單個topic所有分區再分配
eg1: 10分區;2個機器實例,一個1個線程,一個2個線程(消費者線程排完序將會是C1-0, C2-0, C2-1)
C1-0 將消費 0, 1, 2, 3 分區
C2-0 將消費 4, 5, 6 分區
C2-1 將消費 7, 8, 9 分區
eg2: 11分區 同上,可能是如下
C1-0 將消費 0, 1, 2, 3 分區
C2-0 將消費 4, 5, 6, 7 分區
C2-1 將消費 8, 9, 10 分區
eg3: 假如有2個主題(T1和T2),分別有10個分區,那么最后分區分配的結果看起來如下
C1-0 將消費 T1主題的 0, 1, 2, 3 分區以及 T2主題的 0, 1, 2, 3分區
C2-0 將消費 T1主題的 4, 5, 6 分區以及 T2主題的 4, 5, 6分區
C2-1 將消費 T1主題的 7, 8, 9 分區以及 T2主題的 7, 8, 9分區
可以看出,C1-0 消費者線程比其他消費者線程多消費了2個分區,這就是Range strategy的一個很明顯的弊端。
Round Robin
會采用輪詢的方式將當前所有的分區依次分配給所有的consumer;這種分配策略是針對消費者消費的所有topic的所有分區進行分配。當有新的消費者加入或者有消費者退出,就會觸發rebalance
使用RoundRobin策略有兩個前提條件必須滿足
- 同一個Consumer Group里面的所有消費者的num.streams必須相等;
- 每個消費者訂閱的主題必須相同。
假如按照 hashCode 排序完的topic-partitions組依次為T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9,消費者線程排序為C1-0, C1-1, C2-0, C2-1,最后分區分配的結果為:
C1-0 將消費 T1-5, T1-2, T1-6 分區;
C1-1 將消費 T1-3, T1-1, T1-9 分區;
C2-0 將消費 T1-0, T1-4 分區;
C2-1 將消費 T1-8, T1-7 分區;
Sticky
sticky 英[?st?ki]
美[?st?ki]
adj. 黏(性)的; 一面帶黏膠的; 悶熱的;
n. 告事貼;
Sticky策略是新版本中新增的策略,顧名思義,這種策略會保證再分配時已經分配過的分區盡量保證其能夠繼續由當前正在消費的consumer繼續消費,當然,前提是每個consumer所分配的分區數量都大致相同,這樣能夠保證每個consumer消費壓力比較均衡
eg:三個consumer:C0、C1和C2,三個topic:t0、t1和t2,這三個topic分別有1、2和3個分區;C0訂閱了t0,C1訂閱了t0和t1,C2則訂閱了t0、t1和t2
分區排序結果 | 定于的consumer數量 | 訂閱的consumer |
---|---|---|
t2-0 | 1 | c2 |
t2-1 | 1 | c2 |
t2-2 | 1 | c2 |
t1-0 | 2 | c1, c2 |
t1-1 | 2 | c1, c2 |
t0-0 | 3 | c0,c2, c2 |
consumer進行排序:c0,c1,c2;然后將各個分區依次遍歷分配給各個consumer,首先需要注意的是,這里的遍歷并不是C0分配完了再分配給C1,而是每次分配分區的時候都整個的對所有的consumer從頭開始遍歷分配,如果當前consumer沒有訂閱當前分區,則會遍歷下一個consumer。
最終分配結果如下
consumer | topic-partition |
---|---|
c0 | t0-0 |
c1 | t1-0, t1-1 |
c2 | t2-0, t2-1, t2-2 |
eg2: 假設開始分配如下,然后c1宕機
consumer | topic-partition |
---|---|
c0 | t0-0, t1-1, t3-0 |
c1 | t1-1, t2-0, t3-1 |
c2 | t1-0, t2-1 |
然后在平衡后
**consumer | topic-partition |
---|---|
c0 | t0-0, t1-1, t3-0, t2-0 |
c2 | t1-0, t2-1, t0-1, t3-1 |
問題:kafka 消息丟失/重復場景
問題:Kafka分區數越多越好嗎?
并非分區數量越多,效率越高
Topic 每個 partition 在 Kafka 路徑下都有一個自己的目錄,該目錄下有兩個主要的文件:base_offset.log 和 base_offset.index。Kafka 服務端的 ReplicaManager 會為每個 Broker 節點保存每個分區的這兩個文件的文件句柄。所以如果分區過多,ReplicaManager 需要保持打開狀態的文件句柄數也就會很多。
每個 Producer, Consumer 進程都會為分區緩存消息,如果分區過多,緩存的消息越多,占用的內存就越大;
n 個分區有 1 個 Leader,(n-1) 個 Follower,如果運行過程中 Leader 掛了,則會從剩余 (n-1) 個 Followers 中選舉新 Leader;如果有成千上萬個分區,那么需要很長時間的選舉,消耗較大的性能。
問題:Kafka如何消息的順序性和一致性
Kafka 在消息處理的順序性方面有一些機制,但并不保證消息的嚴格有序性。以下是 Kafka 處理消息順序性的一些特點:
1、分區內有序性: 在每個分區內,消息是有序存儲的。Kafka 保證對于每個分區,消息的寫入和消費是按照消息的順序進行的。這意味著對于同一個分區的消息,它們將按照發送的順序被消費。這樣保證了在單個分區內的消息順序性。
2、分區間無序性: 在多個分區之間,消息的順序性不能得到保證。不同分區的消息在 Kafka 集群中是并行處理的,而且 Kafka 也不會跨分區地維護全局有序性。因此,對于多個分區的消息,它們在消費者端接收的順序可能與發送順序不一致。
3、消息復制: Kafka 支持多副本復制,每個分區可以有多個副本存儲在不同的 Broker 上。在進行消息復制時,Kafka 會保證消息的副本在各個 Broker 上的復制順序與領導者(Leader)分區中的消息順序保持一致,從而確保數據的一致性。
LEO & HW 保證副本的數據一致性
冪等性
冪等生產者的啟用
- enable.idempotence: 設置為 true 以啟用冪等生產者。
- transactional.id: 可選參數,用于事務性生產者。如果需要事務支持,必須設置此參數。
冪等生產者的內部機制
-
生產者ID(Producer ID)
唯一標識:每個冪等生產者在啟動時會從Kafka集群獲取一個唯一的 Producer ID。
持久化:Producer ID 由Kafka集群持久化存儲,確保在生產者重啟后仍然有效。 -
序列號(Sequence Number)
遞增序列:每個Partition維護一個遞增的序列號。
唯一性:每條消息在發送時會攜帶一個唯一的序列號,確保消息在Partition中的唯一性。 -
請求重試
自動重試:冪等生產者會自動重試發送失敗的消息。
冪等性保證:即使消息被重試多次,Kafka也會確保每條消息只被寫入一次
其它知識
kafka為什么很快
順序寫入日志
追加數據到日志文件尾部,順序IO
零拷貝 mmap
即便是順序寫入硬盤,硬盤的訪問速度還是不可能追上內存。所以Kafka的數據并不是實時的寫入硬盤,它充分利用了現代操作系統 分頁存儲 來利用內存提高I/O效率。
Memory Mapped Files(后面簡稱mmap)也被翻譯成 內存映射文件 ,在64位操作系統中一般可以表示20G的數據文件,它的工作原理是直接利用操作系統的Page來實現文件到物理內存的直接映射。完成映射之后你對物理內存的操作會被同步到硬盤上(操作系統在適當的時候)。
通過mmap,進程像讀寫硬盤一樣讀寫內存(當然是虛擬機內存),也不必關心內存的大小有虛擬內存為我們兜底。
使用這種方式可以獲取很大的I/O提升, 省去了用戶空間到內核空間 復制的開銷(調用文件的read會把數據先放到內核空間的內存中,然后再復制到用戶空間的內存中。)也有一個很明顯的缺陷——不可靠, 寫到mmap中的數據并沒有被真正的寫到硬盤,操作系統會在程序主動調用flush的時候才把數據真正的寫到硬盤。 Kafka提供了一個參數——producer.type來控制是不是主動flush,如果Kafka寫入到mmap之后就立即flush然后再返回Producer叫 同步 (sync);寫入mmap之后立即返回Producer不調用flush叫 異步 (async)。
在內核版本2.1中,引入了sendfile系統調用,以簡化網絡上和兩個本地文件之間的數據傳輸。 sendfile的引入不僅減少了數據復制,還減少了上下文切換。
sendfile(socket, file, len);
,運行流程如下:
- sendfile系統調用,文件數據被copy至內核緩沖區
- 再從內核緩沖區copy至內核中socket相關的緩沖區
- 最后再socket相關的緩沖區copy到協議引擎
相較傳統read/write方式,2.1版本內核引進的sendfile已經減少了內核緩沖區到user緩沖區,再由user緩沖區到socket相關緩沖區的文件copy,而在內核版本2.4之后,文件描述符結果被改變,sendfile實現了更簡單的方式,再次減少了一次copy操作。
在apache,nginx,lighttpd等web服務器當中,都有一項sendfile相關的配置,使用sendfile可以大幅提升文件傳輸性能。
Kafka把所有的消息都存放在一個一個的文件中,當消費者需要數據的時候Kafka直接把文件發送給消費者,配合mmap作為文件讀寫方式,直接把它傳給sendfile。
預讀數據/批處理(讀一塊然后處理)
日志分段(有類似跳表索引)
數據壓縮
當啟用壓縮時,對批處理的影響特別明顯,因為隨著數據大小的增加,壓縮通常會變得更有效
特別是在使用基于文本的格式時,比如 JSON,壓縮的效果會非常明顯,壓縮比通常在5x
到7x
之間
此外,記錄的批處理主要作為一個客戶端操作,負載在傳遞的過程中,不僅對網絡帶寬有積極影響,而且對服務端的磁盤 I/O 利用率也有積極影響
kafka使用page cache
~ free -mtotal used free shared buffers cached
Mem: 128956 96440 32515 0 5368 39900
-/+ buffers/cache: 51172 77784
Swap: 16002 0 16001
page cache用于緩存文件的頁數據,buffer cache用于緩存塊設備(如磁盤)的塊數據。頁是邏輯上的概念,因此page cache是與文件系統同級的;塊是物理上的概念,因此buffer cache是與塊設備驅動程序同級的。
producer生產消息時,會使用pwrite()系統調用【對應到Java NIO中是FileChannel.write() API】按偏移量寫入數據,并且都會先寫入page cache里。consumer消費消息時,會使用sendfile()系統調用【對應FileChannel.transferTo() API】,零拷貝地將數據從page cache傳輸到broker的Socket buffer,再通過網絡傳輸。
同時,page cache中的數據會隨著內核中flusher線程的調度以及對sync()/fsync()的調用寫回到磁盤,就算進程崩潰,也不用擔心數據丟失。另外,如果consumer要消費的消息不在page cache里,才會去磁盤讀取,并且會順便預讀出一些相鄰的塊放入page cache,以方便下一次讀取。
如果Kafka producer的生產速率與consumer的消費速率相差不大,那么就能幾乎只靠對broker page cache的讀寫完成整個生產-消費過程,磁盤訪問非常少。這個結論俗稱為"讀寫空中接力"。并且Kafka持久化消息到各個topic的partition文件時,是只追加的順序寫,充分利用了磁盤順序訪問快的特性,效率高
kafka集群多Topic性能下降
參考:Kafka vs RocketMQ——多Topic對性能穩定性的影響-轉自阿里中間件
死信隊列(Dead-Letter Queue)
當一條消息初次消費失敗,消息隊列 MQ 會自動進行消息重試;達到最大重試次數后,若消費依然失敗,則表明消費者在正常情況下無法正確地消費該消息,此時,消息隊列 MQ 不會立刻將消息丟棄,而是將其發送到該消費者對應的特殊隊列中,這種正常情況下無法被消費的消息稱為死信消息(Dead-Letter Message),存儲死信消息的特殊隊列稱為死信隊列(Dead-Letter Queue)
與此對應的還有一個回退隊列的概念,試想如果消費者在消費時發生了異常,那么就不會對這一次消費進行確認(Ack),進而發生回滾消息的操作之后消息始終會放在隊列的頂部,然后不斷被處理和回滾,導致隊列陷入死循環。為了解決這個問題,可以為每個隊列設置一個回退隊列,它和死信隊列都是為異常的處理提供的一種機制保障。實際情況下,回退隊列的角色可以由死信隊列和重試隊列來扮演
重試隊列其實可以看成是一種回退隊列,具體指消費端消費消息失敗時,為防止消息無故丟失而重新將消息回滾到Broker中。與回退隊列不同的是重試隊列一般分成多個重試等級,每個重試等級一般也會設置重新投遞延時,重試次數越多投遞延時就越大
eg: 消息第一次消費失敗入重試隊列Q1,Q1的重新投遞延遲為5s,在5s過后重新投遞該消息;如果消息再次消費失敗則入重試隊列Q2,Q2的重新投遞延遲為10s,在10s過后再次投遞該消息。以此類推,重試越多次重新投遞的時間就越久,為此需要設置一個上限,超過投遞次數就入死信隊列。重試隊列與延遲隊列有相同的地方,都是需要設置延遲級別,它們彼此的區別是:延遲隊列動作由內部觸發,重試隊列動作由外部消費端觸發;延遲隊列作用一次,而重試隊列的作用范圍會向后傳遞
注意:Kafka不支持重試機制也就不支持消息重試,也不支持死信隊列,因此使用kafka做消息隊列時,如果遇到了消息在業務處理時出現異常的場景時,需要額外實現消息重試的功能。
數據傳輸的事物定義有哪三種?
數據傳輸的事務定義通常有以下三種級別:
- 至多一次: 消息不會被重復發送,最多被傳輸一次,但也有可能一次不傳輸
- 最少一次: 消息不會被漏發送,最少被傳輸一次,但也有可能被重復傳輸.
- 精確的一次(Exactly once): 不會漏傳輸也不會重復傳輸,每個消息都傳輸被一次而且僅僅被傳輸一次,這是大家所期望的