Kafka 生產者與消費者分區策略全解析:從原理到實踐

一、生產者分區策略

1.1 分區好處

(1)便于合理使用存儲資源,每個Partition在一個Broker上存儲,可以把海量的數據按照分區切割成一塊一塊數據存儲在多臺Broker上。合理控制分區的任務,可以實現負載均衡的效果。

(2)提高并行度,生產者可以以分區為單位發送數據;消費者可以以分區為單位進行消費數據。

1.2 生產者發送消息的分區策略

1)默認的分區器 DefaultPartitioner

在 IDEA 中 ctrl +n,全局查找 DefaultPartitioner。

/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it假如發送消息的時候指定分區,就使用這個分區
* <li>If no partition is specified but a key is present choose a 
partition based on a hash of the key假如發送消息沒有指定分區,指定了Key值,對Key進行hash,然后對分區數取模,得到哪個分區就使用哪個分區
* <li>If no partition or key is present choose the sticky 
partition(粘性分區) that changes when the batch is full.假如分區和key值都沒有指定,使用粘性分區(黏住它,使用它,發送完畢為止)
* 
* See KIP-480 for details about sticky partitioning.
*/
public class DefaultPartitioner implements Partitioner {… …
}

例如:第一次隨機選擇0號分區,等0號分區當前批次滿了(默認16k)或者linger.ms設置的時間到, Kafka再隨機一個分區進行使用(如果還是0會繼續隨機)。

2)案例一

將數據發往指定 partition 的情況下,例如,將所有數據發往分區 1 中。

測試:

①在 node01 上開啟 Kafka 消費者。

kafka-console-consumer.sh --bootstrap-server node01:9092 --topic first

②在 IDEA 中執行代碼,觀察 bigdata01 控制臺中是否接收到消息。

bin/kafka-console-consumer.sh --bootstrap-server node01:9092 --topic first

③在 IDEA 控制臺觀察回調信息。

主題:first->分區:0
主題:first->分區:0 
主題:first->分區:0 
主題:first->分區:0 
主題:first->分區:0

3)案例二

沒有指明 partition 值但有 key 的情況下,將 key 的 hash 值與 topic 的 partition 數進行取余得到 partition 值。

測試:

①key="a"時,在控制臺查看結果。

主題:first->分區:1

主題:first->分區:1

主題:first->分區:1

主題:first->分區:1

主題:first->分區:1

②key="b"時,在控制臺查看結果。

主題:first->分區:2

主題:first->分區:2

主題:first->分區:2

主題:first->分區:2

主題:first->分區:2

③key="f"時,在控制臺查看結果。

主題:first->分區:0

主題:first->分區:0

主題:first->分區:0

主題:first->分區:0

主題:first->分區:0

1.3 自定義分區器

如果研發人員可以根據企業需求,自己重新實現分區器。

1)需求

例如我們實現一個分區器實現,發送過來的數據中如果包含 bigdata,就發往 0 號分區, 不包含bigdata,就發往 1 號分區。

2)實現步驟

(1)定義類實現 Partitioner 接口。

(2)重寫 partition()方法

package com.bigdata.partitioner;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;import java.util.Map;public class MyPartitioner implements Partitioner {/*** 返回信息對應的分區* @param topic 主題* @param key 消息的 key* @param keyBytes 消息的 key 序列化后的字節數組* @param value 消息的 value* @param valueBytes 消息的 value 序列化后的字節數組* @param cluster 集群元數據可以查看分區信息* @return*/@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 假如消息中含有bigdata 發送0分區,否則發送1分區String msg = new String(valueBytes);if(msg.contains("bigdata")){return 0;}return 1;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

(3)使用分區器的方法,在生產者的配置中添加分區器參數

package com.bigdata.producer;import org.apache.kafka.clients.producer.*;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class CustomProducer06 {public static void main(String[] args) throws InterruptedException, ExecutionException {Properties properties = new Properties();// 設置連接kafka集群的ip和端口properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put("bootstrap.servers", "bigdata01:9092");properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.bigdata.partitioner.MyPartitioner");KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("second","abc","加油!:"+i);kafkaProducer.send(producerRecord, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {System.out.printf("消息發送給了%d分區\n",(metadata.partition()));}});}kafkaProducer.close();}
}

4)測試

測試按照以下幾種情況:

第一種:使用了自定義分區器,并且指定分區發送

第二種:使用了自定義分區器,并且發送的時候帶有 key 值

第三種:使用了自定義分區器,沒有指定分區和 key

每一種測試時消息發送帶有 bigdata 的,再 換成不含 bigdata 的。

①在 bigdata01 上開啟 Kafka 消費者。

kafka-console-consumer.sh --bootstrap-server bigdata01:9092 --topic first 

②在 IDEA 控制臺觀察回調信息。

主題:first->分區:0

主題:first->分區:0

主題:first->分區:0

主題:first->分區:0

主題:first->分區:0

注意:假如我自定義了一個分區規則,如果代碼中指定了消息發送到某個分區,自定義的分區規則無效。

比如:我自定義了一個分區器,包含 bigdata 發送 0 分區,不包含發送 1 分區,但假如發送消息的時候指定消息發送到 2 分區,那么消息就必然發送 2 分區。不走咱們自定義的分區器規則了。

如果沒有指定分區規則,指定了 key 值,那么依然走我們的自定義分區器,不走默認。

二、消費者分區策略

1、一個consumer group中有多個consumer組成,一個 topic有多個partition組成,現在的問題是,到底由哪個consumer來消費哪個partition的數據。

2、Kafka有四種主流的分區分配策略: Range、RoundRobin(輪詢)、Sticky(粘性)、CooperativeSticky(配合的粘性)。

可以通過配置參數partition.assignment.strategy,修改分區的分配策略。默認策略是Range + CooperativeSticky

參數名稱

描述

heartbeat.interval.ms

Kafka 消費者和 coordinator 之間的心跳時間,默認 3s。 該條目的值必須小于session.timeout.ms,也不應該高于 session.timeout.ms 的 1/3。

session.timeout.ms

Kafka 消費者和 coordinator 之間連接超時時間,默認 45s。超 過該值,該消費者被移除,消費者組執行再平衡。

max.poll.interval.ms

消費者處理消息的最大時長,默認是 5 分鐘。超過該值,該 消費者被移除,消費者組執行再平衡

partition.assignment.strategy

消 費 者 分 區 分 配 策 略 , 默 認 策 略 是 Range +CooperativeSticky。Kafka 可以同時使用多個分區分配策略。

可 以 選 擇 的 策 略 包 括 : Range 、 RoundRobin 、 Sticky 、CooperativeSticky

2.1 Range 以及再平衡

1)Range 分區策略原理

Range 是對每個 topic 而言的。

首先對同一個topic里面的分區按照序號進行排序,并對消費者按照字母順序進行排序。

假如現在有7個分區,3個消費者,排序后的分區將會是0,1,2,3,4,5,6;消費者排序完之后將會是C0,C1,C2。

通過partitions數/consumer數來決定每個消費者應該消費幾個分區。如果除不盡,那么前面幾個消費者將會多

消費1個分區,

例如,7/3=2余1,除不盡,那么消費者C0便會多消費1個分區。8/3=2余2,除不盡,那么C0和C1分別多

消費一個。

注意:如果只是針對1個topic而言,C0消費者多消費1個分區影響不是很大。但是如果有N多個topic,那么針對每個 topic,消費者C0都將多消費1個分區,topic越多,C0消費的分區會比其他消費者明顯多消費N個分區。

容易產生數據傾斜!

2)Range 分區分配策略案例

(1)修改主題 first 為 7 個分區。

bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --alter --topic first --partitions 7

2)這樣可以由三個消費者

CustomConsumer、CustomConsumer1、CustomConsumer2 組成消費者組,組名都為“test”, 同時啟動 3 個消費者。

(3)啟動生產者,發送 500 條消息,隨機發送到不同的分區。

注意:分區數可以增加,但是不能減少。

一個主題,假如副本數想修改,是否可以直接修改?答案是不可以。

如果想修改,如何修改?制定計劃,執行計劃

Kafka 默認的分區分配策略就是 Range + CooperativeSticky,所以不需要修改策略。

默認是Range,但是在經過一次升級之后,會自動變為CooperativeSticky。這個是官方給出的解釋。

默認的分配器是[RangeAssignor, CooperativeStickyAssignor],默認情況下將使用RangeAssignor,但允許通過一次滾動反彈升級到CooperativeStickyAssignor,該滾動反彈會將RangeAssignor從列表中刪除。

(4)觀看 3 個消費者分別消費哪些分區的數據。

假如消費情況和預想的不一樣:

1、集群是否健康,比如某些kafka進程沒啟動

2、發送數據的時候7個分區沒有使用完,因為它使用了粘性分區。如何讓它發送給7個分區呢,代碼中添加:

// 延遲一會會看到數據發往不同分區

Thread.sleep(20);

發現一個消費者消費了,5,6分區,一個消費了0,1,2分區,一個消費了3,4分區。

此時并沒有修改分區策略,原因是默認是Range.

3Range 分區分配再平衡案例

(1)停止掉 0 號消費者,快速重新發送消息觀看結果(45s 以內,越快越好)。

1 號消費者:消費到 3、4 號分區數據。

2 號消費者:消費到 5、6 號分區數據。

0號的數據,沒人消費。

說明:0 號消費者掛掉后,消費者組需要按照超時時間 45s 來判斷它是否退出,所以需

要等待,時間到了 45s 后,判斷它真的退出就會把任務分配給其他 broker 執行。

(2)再次重新發送消息觀看結果(45s 以后)。

1 號消費者:消費到 0、1、2、3 號分區數據。

2 號消費者:消費到 4、5、6 號分區數據。

說明:消費者 0 已經被踢出消費者組,所以重新按照 range 方式分配。

2.2 RoundRobin(輪詢) 以及再平衡

1)RoundRobin 分區策略原理

RoundRobin針對集群中所有Topic而言。

RoundRobin輪詢分區策略,是把所有的partition和所有的consumer都列出來,然后按照hashcode進行排序,最后

通過輪詢算法來分配partition給到各個消費者。

2RoundRobin 分區分配策略案例

(1)依次在 CustomConsumer、CustomConsumer1、CustomConsumer2 三個消費者代 碼中修改分區分配策略為 RoundRobin。

輪詢的類的全路徑是:
org.apache.kafka.clients.consumer.RoundRobinAssignorA list of class names or class types, ordered by preference, of supported partition assignment strategies that the client will use to distribute partition ownership amongst consumer instances when group management is used. Available options are:org.apache.kafka.clients.consumer.RangeAssignor: Assigns partitions on a per-topic basis.
org.apache.kafka.clients.consumer.RoundRobinAssignor: Assigns partitions to consumers in a round-robin fashion.
org.apache.kafka.clients.consumer.StickyAssignor: Guarantees an assignment that is maximally balanced while preserving as many existing partition assignments as possible.
org.apache.kafka.clients.consumer.CooperativeStickyAssignor: Follows the same StickyAssignor logic, but allows for cooperative rebalancing.

3RoundRobin 分區分配再平衡案例

(1)停止掉 0 號消費者,快速重新發送消息觀看結果(45s 以內,越快越好)。

1 號消費者:消費到 2、5 號分區數據

2 號消費者:消費到 4、1 號分區數據

0 號消費者 以前對應的數據沒有人消費

說明:0 號消費者掛掉后,消費者組需要按照超時時間 45s 來判斷它是否退出,所以需要等待,時間到了 45s 后,判斷它真的退出就會把任務分配給其他 broker 執行。

(2)再次重新發送消息觀看結果(45s 以后)。

1 號消費者:消費到 0、2、4、6 號分區數據

2 號消費者:消費到 1、3、5 號分區數據

說明:消費者 0 已經被踢出消費者組,所以重新按照 RoundRobin 方式分配。

2.3 Sticky 以及再平衡

粘性分區定義:可以理解為分配的結果帶有“粘性的”。即在執行一次新的分配之前, 考慮上一次分配的結果,盡量少的調整分配的變動,可以節省大量的開銷。 粘性分區是 Kafka 從 0.11.x 版本開始引入這種分配策略,首先會盡量均衡的放置分區 到消費者上面,在出現同一消費者組內消費者出現問題的時候,會盡量保持原有分配的分區不變化。

比如分區有 0 1 2 3 4 5 6

消費者有 c1 c2 c3

c1 消費 3個 c2 消費2個 c3 消費2個分區

跟以前不一樣的是,c1 消費的3個分區是隨機的,不是按照 0 1 2 這樣的順序來的。

1)需求

設置主題為 first,7 個分區;準備 3 個消費者,采用粘性分區策略,并進行消費,觀察

消費分配情況。然后再停止其中一個消費者,再次觀察消費分配情況。

2)步驟

(1)修改分區分配策略為粘性。

注意:3 個消費者都應該注釋掉,之后重啟 3 個消費者,如果出現報錯,全部停止等

會再重啟,或者修改為全新的消費者組。

// 修改分區分配策略
ArrayList<String> startegys = new ArrayList<>();
startegys.add("org.apache.kafka.clients.consumer.StickyAssignor");
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, startegys);

(2)使用同樣的生產者發送 500 條消息。

可以看到會盡量保持分區的個數近似劃分分區。

3Sticky 分區分配再平衡案例

(1)停止掉 0 號消費者,快速重新發送消息觀看結果(45s 以內,越快越好)。

1 號消費者:消費到 2、5、3 號分區數據。

2 號消費者:消費到 4、6 號分區數據。

0 號消費者的任務沒人頂替它消費

說明:0 號消費者掛掉后,消費者組需要按照超時時間 45s 來判斷它是否退出,所以需

要等待,時間到了 45s 后,判斷它真的退出就會把任務分配給其他 broker 執行。

(2)再次重新發送消息觀看結果(45s 以后)。

1 號消費者:消費到 2、3、5 號分區數據。

2 號消費者:消費到 0、1、4、6 號分區數據。

說明:消費者 0 已經被踢出消費者組,所以重新按照粘性方式分配。

2.4 CooperativeSticky 的解釋【新的kafka中剛添加的策略】

在消費過程中,會根據消費的偏移量情況進行重新再平衡,也就是粘性分區,運行過程中還會根據消費的實際情況重新分配消費者,直到平衡為止。

好處是:負載均衡,不好的地方是:多次平衡浪費性能。

動態平衡,在消費過程中,實施再平衡,而不是定下來,等某個消費者退出再平衡。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/92616.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/92616.shtml
英文地址,請注明出處:http://en.pswp.cn/web/92616.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

高頻面試點:深入理解 TCP 三次握手與四次揮手

在網絡通信的世界里,TCP(Transmission Control Protocol,傳輸控制協議)是確保數據可靠傳輸的基石。其中,三次握手建立連接、四次揮手斷開連接的過程,更是 Java 秋招面試中的高頻考點。今天,我們就深入剖析這兩個關鍵過程,結合原理、代碼示例與面試真題,幫你吃透知識點…

k8s-nfs實現創建sc的兩種方式

法一&#xff1a;基于 官方 NFS CSI 插件 法二&#xff1a;基于 nfs-subdir-external-provisioner 法一 官方 NFS CSI 插件 大致步驟# 安裝 NFS sudo apt update sudo apt install -y nfs-kernel-server # 創建共享目錄 sudo mkdir -p /data/nfs sudo chmod 777 /data/nfs # 配…

n8n 入門指南:更適合跨境出海搞錢的AI智能體

如果你最近刷到 AI 圈的分享應該會發現——n8n 又火起來了。其實 n8n 早在 2020 年左右就被程序員玩過一波&#xff0c;當時很多人拿它做網站自動發郵件、消息轉發之類的“流程自動化”。但那時候 AI 還沒這么卷&#xff0c;大家也沒覺得多有用。n8n為什么最近又翻紅&#xff1…

【數據分享】各省農業土地流轉率(2010-2023)

數據介紹土地流轉是推動農業規模化、現代化發展的關鍵機制。為助力相關研究&#xff0c;現分享一份覆蓋全國30個省級行政區、時間跨度為2010-2023年的農業土地流轉率面板數據集。本數據直接提取自權威統計年報&#xff0c;具有較高的參考價值。一、數據概覽覆蓋范圍&#xff1a…

音視頻時間戳獲取與同步原理詳解

引言&#xff1a;為什么音視頻同步如此重要&#xff1f; 在音視頻技術領域&#xff0c;"同步"是決定用戶體驗的核心要素。想象一下觀看電影時畫面與聲音錯位0.5秒的場景&#xff1a;角色說話時嘴唇動作與聲音不匹配&#xff0c;爆炸場景的視覺沖擊先于音效到達——這…

Day38--動態規劃--322. 零錢兌換,279. 完全平方數,139. 單詞拆分,56. 攜帶礦石資源(卡碼網),背包問題總結

Day38–動態規劃–322. 零錢兌換&#xff0c;279. 完全平方數&#xff0c;139. 單詞拆分&#xff0c;56. 攜帶礦石資源&#xff08;卡碼網&#xff09;&#xff0c;背包問題總結 今天的是幾道經典的“完全背包”題目。前兩道題目&#xff0c;要區分求的是“價值”&#xff0c;還…

應用層Http協議(1)

應用層Http協議&#xff08;1&#xff09; 在互聯網世界中&#xff0c;HTTP&#xff08;HyperText Transfer Protocol&#xff0c;超文本傳輸協議&#xff09;是一個至關重要的協議。它定義了客戶端&#xff08;如瀏覽器&#xff09;與服務器之間如何通信&#xff0c;以交換或傳…

elementui input無法輸入問題

背景。開發小程序。自定義表單在pc段設置好input輸入框屬性后。 在小程序端無法輸入原因&#xff1a;長度受限制&#xff0c;導致input組件的maxlength屬性認為長度是0導致無法輸入任何值。看解釋是應為遇到空字符串等情況會設置為0解決。因為未找到設置maxlength為0處&#xf…

算法_python_學習記錄_02

算法學習和視頻學習過程中&#xff0c;有許多前幾天還不知道的知識點&#xff0c;現在一點一點歸納整理出來&#xff0c;穩步前進&#xff0c;前進~ 20_貪心算法系列題 00_參考文檔 詳解貪心算法&#xff08;Python實現貪心算法典型例題&#xff09;_順序貪婪算法-CSDN博客P…

Meta AI水印計劃的致命缺陷——IEEE Spectrum深度文獻精讀

一、原文信息 標題: Metas AI Watermarking Plan Is Flimsy, at Best 中文譯名: Meta的AI水印計劃脆弱不堪 作者: David Evan Harris(加州大學伯克利分校)、Lawrence Norden(紐約大學法學院) 發表日期: 2024年3月5日 發表期刊: IEEE Spectrum 二、原文全文翻譯 Met…

gpt-oss 全量技術解讀

一、概述 gpt-oss 是 OpenAI 發布的開放權重&#xff08;open-weight&#xff09;模型系列&#xff0c;面向強推理、Agent 能力與多樣化應用場景。 提供兩種規格&#xff1a; gpt-oss-120b&#xff1a;面向生產與高推理需求&#xff0c;單卡 80GB GPU&#xff08;如 NVIDIA …

實現EtherNet/IP網絡與Modbus TCP網絡之間數據互通

硬件連接與配置使用工業以太網網關&#xff08;如ENE-350&#xff09;作為橋接設備&#xff0c;通過以太網交換機實現硬件互聯。 網關需根據應用場景配置為EtherNet/IP從站或Modbus TCP主/從站模式。案例1&#xff1a;EtherNet IP主站PLC和Modbus TCP主站PLC的互聯網關配置&…

zookeeper因jute.maxbuffer啟動異常問題排查處理

#作者&#xff1a;程宏斌 文章目錄一、前言二、問題描述三、定位過程四、問題根因五、解決方案根本解決方案應急處理方案調大參數可能出現的問題六、總結為什么超出會報錯官方對于jute.maxbuffer的解釋注意事項官方建議一、前言 在分布式系統中&#xff0c;ZooKeeper作為關鍵的…

Java基礎十三: List

目錄 1.Java LinkedList 的高級應用與示例 1.1 LinkedList的基本使用 基本操作示例 1.2 LinkedList獨有的方法 特定方法示例 1.3 隊列模式&#xff08;先進先出&#xff09; 隊列模式示例 1.4 棧模式&#xff08;先進后出&#xff09; 棧模式示例 總結 2.Java Vecto…

[機器學習]03-基于核密度估計(KDE)的鳶尾花數據集分類

關鍵點&#xff1a;使用核密度估計&#xff08;KDE&#xff09; 估計類別條件概率密度&#xff08;高斯核&#xff0c;帶寬0.2&#xff09;采用最大后驗概率&#xff08;MAP&#xff09; 決策準則進行分類程序代碼&#xff1a;import random import matplotlib from sklearn.ne…

jmeter怎么實現多個請求真正的同時發送

1.首先在插件管理器Plugins Manager中搜索插件Parallel Controller&Sampler&#xff0c;勾選上對應的插件后&#xff0c;在右下角點擊Apply Changes and Restart JMeter&#xff0c;安裝插件2.插件安裝完畢后&#xff0c;然后在線程組上面右擊&#xff0c;點擊添加--邏輯控…

復雜環境下車牌識別準確率↑29%:陌訊動態特征融合算法實戰解析

原創聲明本文為原創技術解析&#xff0c;核心技術參數與架構設計引用自《陌訊技術白皮書》&#xff0c;轉載需注明來源。一、行業痛點&#xff1a;車牌識別的現實挑戰在智慧交通、停車場管理等場景中&#xff0c;車牌識別作為關鍵技術環節&#xff0c;長期面臨多重環境干擾。據…

Express中間件和路由及響應方法

1.中間件分類 應用程序級別中間件 通過 app.use() 或 app.METHOD()&#xff08;如 app.get&#xff09;綁定的中間件&#xff0c;作用于整個應用程序。例如 記錄請求日志、解析請求體等全局功能。例如&#xff1a; app.use((req, res, next) > {console.log(Request URL:…

Dokcer創建中間件環境

簡而言之&#xff0c;用docker來搞中間件環境比價好使&#xff0c;不用關心各種環境了 rabbitmqsudo docker run -d \--name rabbitmq \-p 5672:5672 \-p 15672:15672 \rabbitmq:3.8-managementredis 5.0.3 docker start my-redisdocker run --name my-redis -d -p 6379:6379 \…

Linux高級編程-文件操作

1.Linux下的文件類型7種文件類型&#xff1a;b 塊設備文件 -------> 存儲類設備&#xff08;硬盤&#xff09; c 字符設備文件 ------->如輸入輸出設備&#xff08;鼠標鍵盤顯示器...&#xff09; d 目錄文件 ------->文件夾 - 普通文件 -------&g…