目錄
1、kafka消費的流程
2、kafka的消費模式
2.1、點對點模式
2.2、發布-訂閱模式
3、consumer消息積壓
3.1、處理方案
3.2、積壓量
4、消息過期失效
5、kafka注意事項
????????Kafka消費積壓(Consumer Lag)是指消費者處理消息的速度跟不上生產者發送消息的速度,導致消息在Kafka主題中堆積。
關于kakfa的架構圖,如下所示:
更多關于kafka的介紹,參考:關于MQ之kafka的深入研究-CSDN博客https://blog.csdn.net/weixin_50055999/article/details/148535599?spm=1011.2415.3001.5331
1、kafka消費的流程
????????之前的章節中,介紹了kafka消息由producer通過hash函數存放到broker節點后,每個broker節點由多個topic主題組成,可水平擴展。
????????每個topic由多個partitin組成,partition里面的內容有順序,跨partition無序。
對于點對點模式下:
????????消費組內每個消費者可以消費多個partition、同時保留offset偏移位置,保證下次消費。
對于發布訂閱模式:
????????不同消費組內的消費者可以消費同一個patition,兩個消費組不受影響,各自保留彼此的offset的偏移位置。
如圖所示:
在消費者消費過程的流程如下:
由上圖可知:
1、每個topic里面包含多個partition。
2、每個partition里面的內容是按順序分布的。
3、每個消費者可以消費多個partition。
4、而partition只能被一個消費者消費。
對于不同消費者組,可以共同消費同一個topic里面的消息。
2、kafka的消費模式
Kafka 的消費訂閱模式取決于消費者組的配置方式,可以分為以下兩種主要模式:
2.1、點對點模式
特點:一條消息只能被一個消費者消費
實現方式:
-
所有消費者屬于同一個消費者組(相同的?
group.id
) -
Kafka 會在組內消費者之間自動平衡分區分配
// 消費者1和消費者2使用相同的group.id
Properties props = new Properties();
props.put("bootstrap.servers", "kafka:9092");
props.put("group.id", "my-consumer-group"); // 相同的組ID
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));
工作流程:
-
假設主題有3個分區(P0, P1, P2)
-
如果有1個消費者,它將消費所有3個分區
-
如果增加第二個消費者,Kafka會重新平衡:
-
消費者1可能獲得P0和P1
-
消費者2獲得P2
-
-
消息在每個分區內有序,且只被分配給該分區的消費者消費
2.2、發布-訂閱模式
特點:一條消息可以被多個消費者(不同消費組)消費(本質還是點對點)
實現方式:
-
不同消費者組訂閱同一個主題
-
每個消費者組都會收到完整的消息流
// 組A的消費者
Properties propsA = new Properties();
propsA.put("group.id", "group-a"); // 不同組ID
// ...其他配置
KafkaConsumer<String, String> consumerA = new KafkaConsumer<>(propsA);// 組B的消費者
Properties propsB = new Properties();
propsB.put("group.id", "group-b"); // 不同組ID
// ...其他配置
KafkaConsumer<String, String> consumerB = new KafkaConsumer<>(propsB);
工作流程:
-
生產者發送消息到主題
-
組A的所有消費者(作為一個組)會收到消息的一個副本
-
組B的所有消費者(作為另一個獨立的組)也會收到消息的一個副本
-
在每個組內部,消息仍然遵循點對點模式(組內只有一個消費者收到)
3、consumer消息積壓
????????Kafka消息積壓的問題,核心原因是生產太快、消費太慢,處理速度長期失衡,從而導致消息積壓(Lag)的場景,積壓到超過隊列長度限制,就會出現還未被消費的數據產生丟失的場景。
? ? ? ?如果長時間不解決消息積壓,可能會引發資源緊張、服務延遲或崩潰等問題。解決消息積壓的關鍵是提高消費者的消費能力,并優化Kafka集群的整體處理效率。
3.1、處理方案
1. 如果是Kafka消費能力不足,則可以考慮增加?topic?的 partition 的個數(提高kafka的并行度),同時提升消費者組的消費者數量,消費數 = 分區數 (二者缺一不可)
2. 若是下游數據處理不及時,則提高每批次拉取的數量。批次拉取數量過少(拉取數據/處理時間 < 生產速度),使處理的數據小于生產的數據,也會造成數據積壓。
方法:
1. 增大partion數量。
2. 消費者加了并發,服務, 擴大消費線程。
3. 增加消費組服務數量。
4. kafka單機升級成了集群。
5. 避免消費者消費消息時間過長,導致超時。
6. 使Kafka分區之間的數據均勻分布。
3.2、積壓量
- 生產量:Kafka Topic 在一個時間周期內各partition offset 起止時間差值之和。
- 消費量:Kafka Topic 在一個時間周期內某個消費者的消費量。
- 積壓量:Kafka Topic 的某個Consumer Group殘留在消息中間件未被及時消費的消息量。
4、消息過期失效
????????產生消息堆積,消費不及時,kafka數據有過期時間,一些數據就丟失了,主要是消費不及時。
當出現這種現象的時候,可參考以下經驗,進行規避:
1. 消費kafka消息時,應該盡量減少每次消費時間,可通過減少調用三方接口、讀庫等操作,
? ?從而減少消息堆積的可能性。
2. 如果消息來不及消費,可以先存在數據庫中,然后逐條消費(可以保存消費記錄,方便定位問題)。
3. 每次接受kafka消息時,先打印出日志,包括消息產生的時間戳。
4. kafka消息保留時間(修改kafka配置文件, 默認一周)
5. 任務啟動從上次提交offset處開始消費處理
5、kafka注意事項
1. 由于Kafka消息key設置,在Kafka producer處,給key加隨機后綴,使其均衡。
?
2. 數據量很大,合理的增加Kafka分區數是關鍵。
? ?Kafka分區數是Kafka并行度調優的最小單元,如果Kafka分區數設置的太少,
? ?會影響Kafka consumer消費的吞吐量. 如果利用的是Spark流和Kafka direct approach方式,
? ?也可以對KafkaRDD進行repartition重分區,增加并行度處理.
參考文章:
1、Kafka如何處理大量積壓消息_kafka消息堆積過多了怎么辦-CSDN博客https://blog.csdn.net/AlbenXie/article/details/128300018?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522dcefb6fbf11572c5ef4526b40c68a37c%2522%252C%2522scm%2522%253A%252220140713.130102334..%2522%257D&request_id=dcefb6fbf11572c5ef4526b40c68a37c&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~top_click~default-1-128300018-null-null.142^v102^pc_search_result_base1&utm_term=kafka%E6%B6%88%E6%81%AF%E7%A7%AF%E5%8E%8B%E6%80%8E%E4%B9%88%E5%A4%84%E7%90%86&spm=1018.2226.3001.4187