Kafka分區策略實現

引言

Kafka 的分區策略決定了生產者發送的消息會被分配到哪個分區中,合理的分區策略有助于實現負載均衡、提高消息處理效率以及滿足特定的業務需求。

輪詢策略(默認)

  • 輪詢策略是 Kafka 默認的分區策略(當消息沒有指定鍵時)。生產者會按照順序依次將消息發送到各個分區中,確保每個分區都能均勻地接收到消息,從而實現負載均衡。簡單高效,能使各個分區的消息量相對均衡,充分利用每個分區的存儲和處理能力。
  • import org.apache.kafka.clients.producer.*;
    import java.util.Properties;public class RoundRobinProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "message-" + i);producer.send(record);}producer.close();}
    }

    隨機策略

  • 隨機策略會隨機地將消息分配到一個分區中。這種策略在某些情況下可以實現一定程度的負載均衡,但由于是隨機分配,可能會導致分區之間的消息分布不夠均勻。可以通過自定義分區器來實現隨機策略。
  • import org.apache.kafka.clients.producer.*;
    import java.util.List;
    import java.util.Map;
    import java.util.Random;public class RandomPartitioner implements Partitioner {private final Random random = new Random();@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);return random.nextInt(partitions.size());}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
    }// 使用隨機分區器的生產者示例
    public class RandomProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("partitioner.class", "RandomPartitioner");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "message-" + i);producer.send(record);}producer.close();}
    }

    按鍵哈希策略

  • 當消息指定了鍵時,Kafka 會根據鍵的哈希值將消息分配到特定的分區中。相同鍵的消息會被分配到同一個分區,這有助于保證具有相同業務邏輯的消息順序性。可以保證消息的局部有序性,例如在處理用戶相關的消息時,將同一個用戶的消息發送到同一個分區,方便后續的處理和分析。
  • import org.apache.kafka.clients.producer.*;
    import java.util.Properties;public class KeyBasedProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "user-" + (i % 2), "message-" + i);producer.send(record);}producer.close();}
    }

    自定義分區策略(實現接口)

  • 當上述默認策略無法滿足業務需求時,可以自定義分區策略。通過實現org.apache.kafka.clients.producer.Partitioner接口,重寫partition方法來實現自定義的分區邏輯。例如,根據消息的某些特定字段(如時間、地理位置等)來進行分區,以滿足特定的業務需求。

  • import org.apache.kafka.clients.producer.*;
    import java.util.List;
    import java.util.Map;public class CustomPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);// 自定義分區邏輯,這里簡單示例根據消息值的長度分區String message = (String) value;return message.length() % partitions.size();}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
    }// 使用自定義分區器的生產者示例
    public class CustomProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("partitioner.class", "CustomPartitioner");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "message-" + i);producer.send(record);}producer.close();}
    }

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/894506.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/894506.shtml
英文地址,請注明出處:http://en.pswp.cn/news/894506.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

c++ stl 遍歷算法和查找算法

概述&#xff1a; 算法主要由頭文件<algorithm> <functional> <numeric> 提供 <algorithm> 是所有 STL 頭文件中最大的一個&#xff0c;提供了超過 90 個支持各種各樣算法的函數&#xff0c;包括排序、合并、搜索、去重、分解、遍歷、數值交換、拷貝和…

2.2 實現雙向鏈表的快速排序

實現一個雙向鏈表的快速排序。 1>程序代碼 #include <stdio.h> #include <string.h> #include <unistd.h> #include <stdlib.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <pthread.h>…

力扣動態規劃-19【算法學習day.113】

前言 ###我做這類文章一個重要的目的還是記錄自己的學習過程&#xff0c;我的解析也不會做的非常詳細&#xff0c;只會提供思路和一些關鍵點&#xff0c;力扣上的大佬們的題解質量是非常非常高滴&#xff01;&#xff01;&#xff01; 習題 1.矩形中移動的最大次數 題目鏈接…

Gurobi基礎語法之 addConstr, addConstrs, addQConstr, addMQConstr

在新版本的 Gurobi 中&#xff0c;向 addConstr 這個方法中傳入一個 TempConstr 對象&#xff0c;在模型中就會根據這個對象生成一個約束。更重要的是&#xff1a;TempConstr 對象可以傳給所有addConstr系列方法&#xff0c;所以下面先介紹 TempConstr 對象 TempConstr TempC…

五子棋對弈

問題描述 "在五子棋的對弈中&#xff0c;友誼的小船說翻就翻&#xff1f;" 不&#xff01;對小藍和小橋來說&#xff0c;五子棋不僅是棋盤上的較量&#xff0c;更是心與心之間的溝通。這兩位摯友秉承著"友誼第一&#xff0c;比賽第二"的宗旨&#xff0c;決…

使用 HTTP::Server::Simple 實現輕量級 HTTP 服務器

在Perl中&#xff0c;HTTP::Server::Simple 模塊提供了一種輕量級的方式來實現HTTP服務器。該模塊簡單易用&#xff0c;適合快速開發和測試HTTP服務。本文將詳細介紹如何使用 HTTP::Server::Simple 模塊創建和配置一個輕量級HTTP服務器。 安裝 HTTP::Server::Simple 首先&…

在AI技術深度滲透的背景下,2025年傳媒互聯網行業的哪些細分場景和產品形態將迎來爆發式增長?

一、AI技術重構傳媒互聯網行業版圖&#xff1a;從底層邏輯到應用場景 近年來&#xff0c;AI技術已從實驗室走向商業化落地&#xff0c;而傳媒互聯網行業因其龐大的用戶基數、高頻交互場景和豐富的數據積累&#xff0c;成為AI應用的主戰場。根據華源證券最新行業周報&#xff0…

Docker Hub 鏡像 Pull 失敗的解決方案

目錄 引言一、問題二、原因三、解決方法四、參考文獻 引言 在云原生技術火熱的當下&#xff0c;Docker可謂是其基礎&#xff0c;由于其簡單以及方便性&#xff0c;讓開發人員不必再為環境配置問題而傷腦筋&#xff0c;因為可將其看作一個虛擬機程序去理解。所以掌握好它可謂是…

neo4j-community-5.26.0 create new database

1.edit neo4j.conf 把 # The name of the default database initial.dbms.default_databasehonglouneo4j # 寫上自己的數據庫名稱 和 # Name of the service #5.0 server.windows_service_nameneo4j #4.0 dbms.default_databaseneo4j #dbms.default_databaseneo4jwind serve…

unity實現回旋鏢函數

最近學習unity2D&#xff0c;想實現一個回旋鏢武器&#xff0c;發出后就可以在角色周圍回旋。 一、目標 1.不是一次性的&#xff0c;扔出去、返回、沒有了&#xff1b;而是扔出去&#xff0c;返回到角色后方相同距離&#xff0c;再次返回&#xff1b;再次返回&#xff0c;永遠…

【C++基礎】字符串/字符讀取函數解析

最近在學C以及STL&#xff0c;打個基礎 參考&#xff1a; c中的char[] ,char* ,string三種字符串變量轉化的兼容原則 c讀取字符串和字符的6種函數 字符串結構 首先明確三種字符串結構的兼容關系&#xff1a;string>char*>char [] string最靈活&#xff0c;內置增刪查改…

求一個數的數根(高精度)

上一期我們講的是求一個數的數根&#xff0c;和本期唯一不同的是&#xff0c;數據范圍不同了&#xff0c;上一期這個數是小于等于10的18次方的&#xff0c;這一期是小于等于10的1000次方的&#xff0c;開一個變量&#xff1f;肯定不行&#xff0c;我們需要再開一個數組&#xf…

SpringBoot源碼解析(九):Bean定義接口體系

SpringBoot源碼系列文章 SpringBoot源碼解析(一)&#xff1a;SpringApplication構造方法 SpringBoot源碼解析(二)&#xff1a;引導上下文DefaultBootstrapContext SpringBoot源碼解析(三)&#xff1a;啟動開始階段 SpringBoot源碼解析(四)&#xff1a;解析應用參數args Sp…

Vue 3 30天精進之旅:Day 13 - 路由守衛

在構建單頁面應用時&#xff0c;路由守衛是一個非常重要的概念。它允許我們在路由進入或離開時執行一些操作&#xff0c;比如驗證用戶權限、處理數據加載、執行導航確認等。Vue Router提供了多種類型的路由守衛&#xff0c;使我們能夠靈活地控制路由的行為。在今天的學習中&…

【TypeScript】基礎:數據類型

文章目錄 TypeScript一、簡介二、類型聲明三、數據類型anyunknownnervervoidobjecttupleenumType一些特殊情況 TypeScript 是JavaScript的超集&#xff0c;代碼量比JavaScript復雜、繁多&#xff1b;但是結構更清晰 一、簡介 為什么需要TypeScript&#xff1f; JavaScript的…

C++模板編程——可變參函數模板

目錄 1. 可變參函數模板基本介紹 2. 參數包展開——通過遞歸函數 3. 參數包展開——通過編譯期間if語句(constexpr if) 4. 重載 5. 后記 進來看的小伙伴們應該對C中的模板有了一定了解&#xff0c;下面給大家介紹一下可變參函數模板。過于基礎的概念將不仔細介紹。 1. 可變…

ChatGPT-4o和ChatGPT-4o mini的差異點

在人工智能領域&#xff0c;OpenAI再次引領創新潮流&#xff0c;近日正式發布了其最新模型——ChatGPT-4o及其經濟實惠的小型版本ChatGPT-4o Mini。這兩款模型雖同屬于ChatGPT系列&#xff0c;但在性能、應用場景及成本上展現出顯著的差異。本文將通過圖文并茂的方式&#xff0…

三數之和(15)

15. 三數之和 - 力扣&#xff08;LeetCode&#xff09; 可以一起總結的題目&#xff1a;三角形的最大周長&#xff08;976&#xff09;-CSDN博客 解法&#xff1a; class Solution { public:vector<vector<int>> threeSum(vector<int>& nums) {vector…

2025最新源支付V7全套開源版+Mac云端+五合一云端

2025最新源支付V7全套開源版Mac云端五合一云端 官方1999元&#xff0c; 最新非網上那種功能不全帶BUG開源版&#xff0c;可以自己增加授權或二開 擁有卓越的性能和豐富的功能。它采用全新輕量化的界面UI&#xff0c;讓您能更方便快捷地解決知識付費和運營贊助的難題 它基于…

9 點結構模塊(point.rs)

一、point.rs源碼 use super::UnknownUnit; use crate::approxeq::ApproxEq; use crate::approxord::{max, min}; use crate::length::Length; use crate::num::*; use crate::scale::Scale; use crate::size::{Size2D, Size3D}; use crate::vector::{vec2, vec3, Vector2D, V…