Kafka入門-生產者

生產者

生產者發送流程:
在這里插入圖片描述

延遲時間為0ms時,也就意味著每當有數據就會直接發送

異步發送API

異步發送和同步發送的不同在于:異步發送不需要等待結果,同步發送必須等待結果才能進行下一步發送。

普通異步發送

首先導入所需的kafka依賴

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version>
</dependency>
public class CustomProducer {public static void main(String[] args) {//配置Properties properties = new Properties();//連接集群properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.27.101:9092,192.168.27.102:9092");//指定對應的key和value的序列化類型properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//創建Kafka生產者對象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);//異步發送數據kafkaProducer.send(new ProducerRecord<>("first","XXX learn Kafka"));for (int i = 0; i < 8; i++) {kafkaProducer.send(new ProducerRecord<>("first","learning Kafka-"+i));}//關閉資源kafkaProducer.close();}
}
帶回調函數的異步發送

回調函數會在producer收到ack時調用,為異步調用,該方法有兩個參數:元數據信息和異常信息,如果異常信息為null,說明消息發送成功,如果異常現象不為null,說明消息發送失敗。

修改發送方法,采用回調

//異步發送數據,并有回調函數
kafkaProducer.send(new ProducerRecord<>("first","XXX learn Kafka"));
for (int i = 0; i < 8; i++) {kafkaProducer.send(new ProducerRecord<>("first", "learning Kafka-" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e == null){System.out.println("主題: "+ recordMetadata.topic() +" 分區:"+recordMetadata.partition());}}});}

運行方法就能看到返回的主題、分區

主題: first 分區:2

同步發送

同步發送只需更改發送方式

//同步發送數據kafkaProducer.send(new ProducerRecord<>("first","XXX learn Kafka"));for (int i = 0; i < 8; i++) {kafkaProducer.send(new ProducerRecord<>("first","learning Kafka-"+i)).get();}

為什么要分區

  1. 便于合理使用存儲資源,每個Partition在一個Broker上存儲,可以把海量的數據按照分區切割成一塊一塊數據存儲在多臺Broker上,合理控制分區的任務,可以實現負載均衡的效果。
  2. 提高并行度,生產者可以以分區為單位發送數據,而消費者則可以以分區為單位進行消費

分區策略:

  1. 默認分區策略:

    • 如果在記錄中指定了分區,那么直接使用指定的分區

      例如在send方法指定分區2,key為""

          kafkaProducer.send(new ProducerRecord<>("first",2,"", "learning Kafka-" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e == null){System.out.println("主題: "+ recordMetadata.topic() +" 分區:"+recordMetadata.partition());}}});
      
    • 如果未指定分區但存在鍵key,則根據key的哈希值與topic的partition數目進行取余選擇分區

      例如在send方法中不指定分區,設置key

      kafkaProducer.send(new ProducerRecord<>("first","haha", "learning Kafka-" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e == null){System.out.println("主題: "+ recordMetadata.topic() +" 分區:"+recordMetadata.partition());}}});
      
    • 如果不存在分區也沒有鍵key,那么使用黏性分區,會隨機選擇一個分區并且盡可能一直使用該分區,如果該分區batch已滿或者已完成,kafka會再隨機一個分區進行使用(和上一個分區不同)。

自定義分區器

首先自定義一個分區器

public class MyPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {//獲取數據String msgValues = value.toString();int partition;//如果發送的數據包含aha字段則發送到0號分區,不包含則發往1號分區if(msgValues.contains("aha")){partition = 0;}else {partition = 1;}return partition;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

在創建Kafka對象之前設置配置,選擇自定義的分區器

//關聯自定義分區
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.hzp.kafka.producer.MyPartitioner");

注意:如果使用自定義分區的同時,還在send方法內指定分區,那么以指定分區為準。

生產者提高吞吐量

生產者發消息就相當于用貨車從本地倉庫(緩沖區)送貨到kafka,相關的參數有兩個,一個是batch size批次大小,一個是linger.ms等待時間。batch size默認為16k,相當于貨車的容量大小,如果貨車裝滿了就發往kafka。但是通常情況下等待時間為0ms,也就是每當倉庫來了一箱貨就直接送到kafka,不管貨車是否裝滿。

因此提高吞吐量主要有以下方法:

  1. 修改linger.ms,增長等待時間或者增加批次大小,讓貨車盡量裝多一點貨甚至裝滿再發送。(等待時間會造成一定的延遲,通常控制在5-100ms)
  2. 發送數據時,采取壓縮的方式
  3. 增大緩沖區大小,緩沖區大小通常為32m。相當于增加倉庫大小,讓倉庫能夠存儲更多的貨物。
        //緩沖區大小(單位為kb,默認32M)1024*1024*32properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);//批次大小(單位為kb,默認16kb)1024*16properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);//linger.ms (單位為ms)properties.put(ProducerConfig.LINGER_MS_CONFIG,10);//壓縮 設置壓縮類型為snappy,可配置的值有gzip、snappy、lz4、zstdproperties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");

數據可靠性

數據可靠性與ACK應答級別有關

acks:

  • 0:生產者發送過來的數據,不需要等數據落盤就應答。

    如果不等數據落盤就應答,容易造成數據丟失,生產者發送數據就不管了,可靠性差,效率高。

  • 1:生產者發送過來的數據,Leader收到數據后應答

    如果Leader接收到數據,并且應答之后,突然掛掉了,但是此時Leader還沒有同步數據給其他節點,此時就造成數據丟失。生產者發送數據Leader應答,可靠性中等,效率中等。

  • -1:生產者發送的數據,Leader和ISR隊列中的所有節點收齊數據后應答

    生產者發送數據需要Leader和ISR隊列里面所有的Follower應答,可靠性高,效率低。

    如果Leader收到數據并且和Follower同步數據時,有一個Follower因為故障,長時間不能與Leader同步,這應該如何解決?

    解決方案:Leader維護了一個動態的in-sync replica set(ISR)也就是與Leader保持同步的Follower+Leader的集合(Leader:0,ISR:0,1,2)。如果Follower長時間未向Leader發送通信請求或者同步數據,則該Follower將被踢出ISR。該時間閾值由replica.lag.time.max.ms參數設定,默認30s。這樣就不用長時間等待以故障的節點。

如果分區副本為1,那么ACK應答-1和1沒有區別,掛了數據就直接丟失,如果ISR里面也只有一個(Leader:0,ISR:0),那么說明沒有Follower跟Leader同步,那么仍然會數據丟失。因此可以得到:數據完全可靠的條件:ACK級別設置為-1、分區副本大于等于2、ISR應答里的最小副本數量大于等于2。

通常情況下,acks=0很少使用,acks=1主要用于傳輸普通日志(大量但并不重要的數據),允許個別數據丟失,acks=-1一般用于傳輸重要的數據比如金錢這類對可靠性要求比較高的場景。

acks=-1仍然存在問題,比如現在Leader:0,ISR:0,1,2。生產者發送數據data,Leader:0接收到data后與1、2同步數據。同步數據完成之后,即將應答之前,Leader突然掛掉了,那么此時就會從1,2中選擇一個成為新的Leader。假設1成為新的Leader,此時生產者沒有收到應答,再次發送數據data,那么此時Leader:1就接收到了兩份data數據,造成數據重復。

java設置acks,以及重試次數

//acks 設置為1
properties.put(ProducerConfig.ACKS_CONFIG,"1");
//重試次數 默認為int的最大值
properties.put(ProducerConfig.RETRIES_CONFIG,3);

數據去重

在剛剛的數據可靠性中,我們知道怎么讓數據能夠完全可靠,就是讓ACK級別設置為-1、分區副本大于等于2、ISR應答里的最小副本數量大于等于2。從數據傳遞來看,這種設置就是數據傳遞至少一次(At Least One);而當ACK級別設置為0,那么數據傳遞最多一次(At Most One)。

At Least One可以保證數據不丟失,但是不能保證數據不重復,At Most One可以保證數據不重復,但是不能保證數據不丟失。那么如果既想數據不丟失,又想數據不重復,此時就要依靠冪等性和事務。

冪等性

冪等性就是指Producer不論向Broker發送多少次重復數據,Broker端都只會持久化一條數據,保證了不重復。

重復數據的判斷標準就是<PID、Partition、SqlNumber>相同的消息,Broker只會持久化一條數據。Pid標識指的是ProducerId,生產者編號,Kafka每重啟一次就分配一個新的;Partiton標識分區號;SqlNumber是單調自增的,因此冪等性能夠保證在單分區、單會話內不重復。

冪等性的使用只需設置enable.idempotence即可,默認為true,關閉只需設置為false。

事務

事務開啟之前,必須先開啟冪等性。事務底層依賴冪等性。

數據有序

Kafka單分區內有序,但是多分區時,分區與分區之間無序。

數據亂序

kafka保證數據單分區有序的條件是:

  1. 如果沒有開啟冪等性,那么需要設置max.in.flight.request.per.connection的值為1
  2. 如果開啟冪等性,那么需要設置max.in.flight.request.per.connection的值小于等于5.

在kafka1.x版本之后當kafka啟用冪等,那么kafka服務端會緩存producer發來的最近5個request的元數據,而冪等性的實現依賴單調遞增的序號SqlNumber。如果發送時出現亂序,那么會根據單調遞增的序號進行重排序。也就是說當開啟了冪等性并且緩存的請求個數小于5,那么會在服務端進行一次重新排序,讓數據有序。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/908338.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/908338.shtml
英文地址,請注明出處:http://en.pswp.cn/news/908338.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

分類預測 | Matlab實現CNN-LSTM-Attention高光譜數據分類

分類預測 | Matlab實現CNN-LSTM-Attention高光譜數據分類 目錄 分類預測 | Matlab實現CNN-LSTM-Attention高光譜數據分類分類效果功能概述程序設計參考資料 分類效果 功能概述 代碼功能 該MATLAB代碼實現了一個結合CNN、LSTM和注意力機制的高光譜數據分類模型&#xff0c;核心…

gemini和chatgpt數據對比:誰在卷性能、價格和場景?

先把結論“劇透”給趕時間的朋友&#xff1a;頂配 Gemini Ultra/2.5 Pro 在紙面成績上普遍領先&#xff0c;而 ChatGPT 家族&#xff08;GPT-4o / o3 / 4.1&#xff09;則在延遲、生態和穩定性上占優。下面把核心數據拆開講&#xff0c;方便你對號入座。附帶參考來源&#xff0…

代碼訓練LeetCode(23)隨機訪問元素

代碼訓練(23)LeetCode之隨機訪問元素 Author: Once Day Date: 2025年6月5日 漫漫長路&#xff0c;才剛剛開始… 全系列文章可參考專欄: 十年代碼訓練_Once-Day的博客-CSDN博客 參考文章: 380. O(1) 時間插入、刪除和獲取隨機元素 - 力扣&#xff08;LeetCode&#xff09;力…

C++面試5——對象存儲區域詳解

C++對象存儲區域詳解 核心觀點:內存是程序員的戰場,存儲區域決定對象的生殺大權!棧對象自動赴死,堆對象生死由你,全局對象永生不死,常量區對象只讀不滅。 一、四大地域生死簿 棧區(Stack) ? 特點:自動分配釋放,速度極快(類似高鐵進出站) ? 生存期:函數大括號{}就…

STM32 智能小車項目 L298N 電機驅動模塊

今天開始著手做智能小車的項目了 在智能小車或機器人項目中&#xff0c;我們經常會聽到一個詞叫 “H 橋電機驅動”&#xff0c;尤其是常見的 L298N 模塊&#xff0c;就是基于“雙 H 橋”原理設計的。那么&#xff0c;“H 橋”到底是什么&#xff1f;為什么要用“雙 H 橋”來驅動…

python項目如何創建docker環境

這里寫自定義目錄標題 python項目創建docker環境docker配置國內鏡像源構建一個Docker 鏡像驗證鏡像合理的創建標題&#xff0c;有助于目錄的生成如何改變文本的樣式插入鏈接與圖片如何插入一段漂亮的代碼片生成一個適合你的列表創建一個表格設定內容居中、居左、居右SmartyPant…

MySQL-多表關系、多表查詢

一. 一對多(多對一) 1. 例如&#xff1b;一個部門下有多個員工 在數據庫表中多的一方(員工表)、添加字段&#xff0c;來關聯一的一方(部門表)的主鍵 二. 外鍵約束 1.如將部門表的部門直接刪除&#xff0c;然而員工表還存在其部門下的員工&#xff0c;出現了數據的不一致問題&am…

【 HarmonyOS 5 入門系列 】鴻蒙HarmonyOS示例項目講解

【 HarmonyOS 5 入門系列 】鴻蒙HarmonyOS示例項目講解 一、前言&#xff1a;移動開發聲明式 UI 框架的技術變革 在移動操作系統的發展歷程中&#xff0c;UI 開發模式經歷了從命令式到聲明式的重大變革。 根據華為開發者聯盟 2024 年數據報告顯示&#xff0c;HarmonyOS 設備…

【SSM】SpringMVC學習筆記7:前后端數據傳輸協議和異常處理

這篇學習筆記是Spring系列筆記的第7篇&#xff0c;該筆記是筆者在學習黑馬程序員SSM框架教程課程期間的筆記&#xff0c;供自己和他人參考。 Spring學習筆記目錄 筆記1&#xff1a;【SSM】Spring基礎&#xff1a; IoC配置學習筆記-CSDN博客 對應黑馬課程P1~P20的內容。 筆記2…

借助 Spring AI 和 LM Studio 為業務系統引入本地 AI 能力

Spring AI 1.0.0-SNAPSHOTLM Studio 0.3.16qwen3-4b 參考 Unable to use spring ai with LMStudio using spring-ai openai module Issue #2441 spring-projects/spring-ai GitHub LM Studio 下載安裝 LM Studio下載 qwen3-4b 模型。對于 qwen3 系列模型&#xff0c;測試…

C++學習-入門到精通【13】標準庫的容器和迭代器

C學習-入門到精通【13】標準庫的容器和迭代器 目錄 C學習-入門到精通【13】標準庫的容器和迭代器一、標準模板庫簡介1.容器簡介2.STL容器總覽3.近容器4.STL容器的通用函數5.首類容器的通用typedef6.對容器元素的要求 二、迭代器簡介1.使用istream_iterator輸入&#xff0c;使用…

Vue Router的核心實現原理深度解析

1. Vue Router的基本架構 Vue Router的核心功能是實現前端路由&#xff0c;即在不重新加載頁面的情況下更改應用的視圖。它的基本架構包括&#xff1a; 路由配置&#xff1a;定義路徑與組件的映射關系路由實例&#xff1a;管理路由狀態和提供導航方法路由視圖&#xff1a;渲染…

設計模式——狀態設計模式(行為型)

摘要 狀態設計模式是一種行為型設計模式&#xff0c;核心在于允許對象在內部狀態改變時改變行為。它通過狀態對象封裝不同行為&#xff0c;使狀態切換靈活清晰。該模式包含環境類、抽象狀態類和具體狀態類等角色&#xff0c;具有避免大量分支判斷、符合單一職責和開閉原則等特…

C++ 觀察者模式:設計與實現詳解

一、引言 在現代軟件開發中,組件間的交互與通信是系統設計的核心挑戰之一。觀察者模式(Observer Pattern)作為一種行為設計模式,提供了一種優雅的解決方案,用于實現對象間的一對多依賴關系。本文將深入探討 C++ 中觀察者模式的設計理念、實現方式及其應用場景。 二、觀察…

Windows 賬號管理與安全指南

Windows 賬號管理與安全指南 概述 Windows 賬號管理是系統安全的基礎&#xff0c;了解如何正確創建、管理和保護用戶賬戶對于系統管理員和安全專業人員至關重要。本文詳細介紹 Windows 系統中的賬戶管理命令、隱藏賬戶創建方法以及安全防護措施。 基礎賬戶管理命令 net use…

[藍橋杯]擺動序列

擺動序列 題目描述 如果一個序列的奇數項都比前一項大&#xff0c;偶數項都比前一項小&#xff0c;則稱為一個擺動序列。即 a2i<a2i?1,a2i1 >a2ia2i?<a2i?1?,a2i1? >a2i?。 小明想知道&#xff0c;長度為 mm&#xff0c;每個數都是 1 到 nn 之間的正整數的…

Python 網絡編程 -- WebSocket編程

作者主要是為了用python構建實時網絡通信程序。 概念性的東西越簡單越好理解,因此,下面我從晚上摘抄的概念 我的理解。 什么是網絡通信? 更確切地說&#xff0c;網絡通信是兩臺計算機上的兩個進程之間的通信。比如&#xff0c;瀏覽器進程和新浪服務器上的某個Web服務進程在通…

GM DC Monitor如何實現TCP端口狀態監控-操作分享

本節講解如何通過現有指標提取監控腳本制作自定義的TCP端口監控指標 一、功能介紹 通過提取已有的監控指標的監控命令&#xff0c;來自定義TCP端口的監控指標。 二、配置端口監控 1&#xff09;定位監控腳本 確定腳本及參數如下&#xff1a; check_protocol_tcp.pl --plug…

LabVIEW與Modbus/TCP溫濕度監控系統

基于LabVIEW 開發平臺與 Modbus/TCP 通信協議&#xff0c;設計一套適用于實驗室環境的溫濕度數據采集監控系統。通過上位機與高精度溫濕度采集設備的遠程通信&#xff0c;實現多設備溫濕度數據的實時采集、存儲、分析及報警功能&#xff0c;解決傳統人工采集效率低、環境適應性…

Ntfs!ReadIndexBuffer函數分析之nt!CcGetVirtualAddress函數之nt!CcGetVacbMiss

第一部分&#xff1a; NtfsMapStream( IrpContext, Scb, LlBytesFromIndexBlocks( IndexBlock, Scb->ScbType.Index.IndexBlockByteShift ), Scb->ScbType.Index.BytesPerIndexBuffer, &am…