從零開始的 Kafka 學習(四)| 生產消息

1. 生產消息

1.1 生產消息的基本步驟

(一)創建Map類型的配置對象,根據場景增加相應的配置屬性:

參數名參數作用類型默認值推薦值
bootstrap.servers集群地址,格式為:brokerIP1:端口號,brokerIP2:端口號必須
key.serializer對生產數據Key進行序列化的類完整名稱必須Kafka提供的字符串序列化類:StringSerializer
value.serializer對生產數據Value進行序列化的類完整名稱必須Kafka提供的字符串序列化類:StringSerializer
interceptor.classes攔截器類名,多個用逗號隔開可選
batch.size數據批次字節大小。此大小會和數據最大估計值進行比較,取大值。估值=61+21+(keySize+1+valueSize+1+1)可選16K
retries重試次數可選整型最大值0或整型最大值
request.timeout.ms請求超時時間可選30s
linger.ms數據批次在緩沖區中停留時間可選
acks請求應答類型:all(-1), 0, 1可選all(-1)根據數據場景進行設置
retry.backoff.ms兩次重試之間的時間間隔可選100ms
buffer.memory數據收集器緩沖區內存大小可選32M64M
max.in.flight.requests.per.connection每個節點連接的最大同時處理請求的數量可選5小于等于5
enable.idempotence冪等性,可選true根據數據場景進行設置
partitioner.ignore.keys是否放棄使用數據key選擇分區可選false
partitioner.class分區器類名可選null

(二)創建待發送數據

在 Kafka 中傳遞的數據我們稱之為消息(message)或記錄(record),所以Kafka發送數據前,需要將待發送的數據封裝為指定的數據類型:

在這里插入圖片描述

在這里插入圖片描述

相關屬性必須在構建數據模型時指定,其中主題和value的值時必須要傳遞的。如果配置中開啟了自動創建主題,那么 Topic 主題可以不存在。value 就是我們需要真正傳遞的數據了,而 Key 可以用于數據的分區定位。

(三)創建生產者對象,發送生產的數據:

根據前面提供的配置信息創建生產者對象,通過這個生產者對象向 Kafka 服務器節點發送數據,而具體的發送是由生產者對象創建時,內部構件的多個組件實現的,多個組件的關系類似與生產者消費者模式。

在這里插入圖片描述

(1)數據生產者(KafkaProducer):生產者對象,用于對我們的數據進行必要的轉換和處理,將處理后的數據放入到數據收集器中,類似于生產者消費者模式下的生產者。

  • 如果配置攔截器棧(interceptor.classes),那么將數據進行攔截處理。某一個攔截器出現異常并不會影響后續的攔截器處理。
  • 因為發送的數據為 KV 數據,所以需要根據配置信息中的序列化對象對數據中 Key 和 Value 分別進行序列化處理。
  • 計算數據嗦發送的分區位置。
  • 將數據追加到數據收集器中。

(2)數據收集器(RecordAccumulator):用于收集,轉換我們生產的數據,蕾西與生產者消費者模式下的緩沖區。為了優化數據的傳輸,Kafka 并不是生產一條數據就向 Broker 發送一條數據,而是通過合并單條消息,進行批量(批次)發送,提高吞吐量,減少帶寬消耗。

  • 默認情況下,一個發送批次的數據容量為 16k,這個可以通過參數 batch.size進行改善。
  • 批次是和分區進行綁定的。也就是說發往同一個分區的數據會進行合并,形成一個批次。
  • 如果當前批次能容納數據,那么直接將數據追加到批次中即可,如果不能容納數據,那么會產生新的批次放入到當前分區的批次隊列中,這個隊列使用的是 Java 雙端隊列 Deque。舊的批次關閉不再接收新的數據,等待發送。

(3)數據發送器(Sender):線程對象,用于從收集器中獲取數據,向服務節點發送。類似于生產者消費者模式下的消費者。因為是線程對象,所以啟動后會不斷輪詢獲取數據收集器中已經關閉的批次數據。對批次進行整合后再發送到 Broker 節點中

  • 因為數據真正發送的地方是 Broker 節點,不是分區。所以需要將從數據收集器中收集到的批次數據按照可用 Broker 節點重新組合成List集合。
  • 將組合后的<節點,List<批次>>的數據封裝成客戶端請求(請求鍵為:Produce)發送到網絡客戶端對象的緩沖區,由網絡客戶端對象通過網絡發送給 Broker 節點。
  • Broker 節點獲取客戶端請求,并根據請求鍵進行后續的數據處理:向分區中增加數據。

在這里插入圖片描述

1.2 生產消息的基本代碼

// TODO 配置屬性集合
Map<String, Object> configMap = new HashMap<>();
// TODO 配置屬性:Kafka服務器集群地址
configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// TODO 配置屬性:Kafka生產的數據為KV對,所以在生產數據進行傳輸前需要分別對K,V進行對應的序列化操作
configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
// TODO 創建Kafka生產者對象,建立Kafka連接
//      構造對象時,需要傳遞配置參數
KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);
// TODO 準備數據,定義泛型
//      構造對象時需要傳遞 【Topic主題名稱】,【Key】,【Value】三個參數
ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key1", "value1"
);
// TODO 生產(發送)數據
producer.send(record);
// TODO 關閉生產者連接
producer.close();

1.3 發送消息

1.3.1 攔截器

生產者 API 在數據準備好發送給 Kafka 服務器之前,允許我們對生產的數據進行統一的處理,比如校驗,整合數據等等。這些處理我們是可以通過 Kafka 提供的攔截器完成。

這里的攔截器是可以配置多個的。執行時,會按照聲明順序執行完一個后,再執行下一個。并且某一個攔截器如果出現異常,只會跳出當前攔截器邏輯,并不會影響后續攔截器的處理。所以開發時,需要將攔截器的這種處理方法考慮進去。

在這里插入圖片描述

1.3.1.1 增加攔截器類

(1)實現生產者攔截器接口 ProducerInterceptor

/*** TODO 自定義數據攔截器*      1. 實現Kafka提供的生產者接口ProducerInterceptor*      2. 定義數據泛型 <K, V>*      3. 重寫方法*         onSend*         onAcknowledgement*         close*         configure*/
public class KafkaInterceptorMock implements ProducerInterceptor<String, String> {@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {return record;}@Override	public void onAcknowledgement(RecordMetadata metadata, Exception exception) {}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

(2)實現接口中的方法,根據業務功能重寫具體的方法

方法名作用
onSend數據發送前,會執行此方法,進行數據發送前的預處理
onAcknowledgement數據發送后,獲取應答時,會執行此方法
close生產者關閉時,會執行此方法,完成一些資源回收和釋放的操作
configure創建生產者對象的時候,會執行此方法,可以根據場景對生產者對象的配置進行統一修改或轉換。
1.3.1.2 配置攔截器
public class ProducerInterceptorTest {public static void main(String[] args) {Map<String, Object> configMap = new HashMap<>();configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configMap.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());configMap.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());configMap.put( ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, KafkaInterceptorMock.class.getName());KafkaProducer<String, String> producer = null;try {producer = new KafkaProducer<>(configMap);for ( int i = 0; i < 1; i++ ) {ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i);final Future<RecordMetadata> send = producer.send(record);}} catch ( Exception e ) {e.printStackTrace();} finally {if ( producer != null ) {producer.close();}}}
}
1.3.2 回調方法

Kafka 發送數據時,可以同時傳遞回調對象(Callback)用于對數據的發送結果進行對應處理,具體代碼實現采用匿名類或 Lambda 表達式都可以。

public class KafkaProducerASynTest {public static void main(String[] args) {Map<String, Object> configMap = new HashMap<>();configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);// TODO 循環生產數據for ( int i = 0; i < 1; i++ ) {// TODO 創建數據ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i);// TODO 發送數據producer.send(record, new Callback() {// TODO 回調對象public void onCompletion(RecordMetadata recordMetadata, Exception e) {// TODO 當數據發送成功后,會回調此方法System.out.println("數據發送成功:" + recordMetadata.timestamp());}});}producer.close();}
}
1.3.3 異步發送

Kafka 發送數據時,底層的實現類似于生產者消費者模式。對應的,底層會由主線程代碼作為生產者向緩沖區中放數據,而數據發送線程會從緩沖區中獲取數據進行發送。Broker 接收到數據后進行后續處理。

如果 Kafka 通過主線程代碼將一條數據放入到緩沖區后,無需等待數據的后續發送過程,就直接發送下一條數據的場合,我們就稱之為異步發送。

在這里插入圖片描述

public class KafkaProducerASynTest {public static void main(String[] args) {Map<String, Object> configMap = new HashMap<>();configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);// TODO 循環生產數據for ( int i = 0; i < 10; i++ ) {// TODO 創建數據ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i);// TODO 發送數據producer.send(record, new Callback() {// TODO 回調對象public void onCompletion(RecordMetadata recordMetadata, Exception e) {// TODO 當數據發送成功后,會回調此方法System.out.println("數據發送成功:" + recordMetadata.timestamp());}});// TODO 發送當前數據System.out.println("發送數據");}producer.close();}
}
1.3.4 同步發送

Kafka 發送數據時,底層的實現類似于生產者消費者模式。對應的,底層會由主線程代碼作為生產者向緩沖區中放數據,而數據發送線程會從緩沖區中獲取數據進行發送。Broker 接收到數據后進行后續處理。

如果 Kafka 通過主線程代碼將一條數據放入到緩沖區后,需等待數據的后續發送操作的應答狀態,才能發送下一條數據的場合,我們就稱之為同步發送。所以這里的所謂同步,就是生產數據的線程需要等待線程的應答(響應)結果。

代碼實現上,采用的是 JDK1.5 增加的JUC 并發編程的 Future 接口的 get 方法實現。

在這里插入圖片描述

public class KafkaProducerASynTest {public static void main(String[] args) throws Exception {Map<String, Object> configMap = new HashMap<>();configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);// TODO 循環生產數據for ( int i = 0; i < 10; i++ ) {// TODO 創建數據ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i);// TODO 發送數據producer.send(record, new Callback() {// TODO 回調對象public void onCompletion(RecordMetadata recordMetadata, Exception e) {// TODO 當數據發送成功后,會回調此方法System.out.println("數據發送成功:" + recordMetadata.timestamp());}}).get();// TODO 發送當前數據System.out.println("發送數據");}producer.close();}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/899407.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/899407.shtml
英文地址,請注明出處:http://en.pswp.cn/news/899407.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

k8s1.22 kubeadm 部署

k8s1.22 kubeadm 部署 1、更改hostname hostnamectl set-hostname master-001 && su root hostnamectl set-hostname node-001 && su root hostnamectl set-hostname node-002 && su root配置hsots cat >> /etc/hosts <<EOF 192.168.20.…

新手村:邏輯回歸-理解04:熵是什么?

新手村&#xff1a;邏輯回歸04&#xff1a;熵是什么? 熵是什么? 前置條件 在開始學習邏輯回歸中的熵理論之前&#xff0c;需要掌握以下基礎知識&#xff1a; 概率論與統計學&#xff1a; 概率分布&#xff08;如伯努利分布、正態分布&#xff09;。條件概率和貝葉斯定理。期…

STM32通用定時器結構框圖

STM32單片機快速入門 通用定時器框圖 TIM9和TIM12 通用定時器框圖 TIM9和TIM12 &#xff08;二&#xff09; 通用定時器框圖

3.28-2 jmeter讀取mysql

jmeter操作mysql 1.下載數據驅動&#xff0c;安裝數據驅動 &#xff08;1&#xff09;存放四個路徑 a.jre下的lib C:\Program Files\Java\jre1.8.0_60\lib &#xff08;2&#xff09;存放在jre 下的lib 中的ext 路徑&#xff1a; C:\Program Files\Java\jre1.8.0_60\lib\…

TDengine 中的保留關鍵詞

簡介 本節很重要&#xff0c;請大家收藏&#xff0c;避免在編寫程序的時候踩坑。因為關鍵字是被 TDengine 系統使用的&#xff0c;如果你在 SQL 中使用了保留關鍵詞&#xff0c;并且沒有被反引號包括時&#xff0c;會報語法錯誤&#xff0c;當你不知道這個是保留關鍵詞時&…

美攝科技開啟智能汽車車內互動及娛樂解決方案2.0

在科技飛速發展的今天&#xff0c;汽車已不再僅僅是簡單的代步工具&#xff0c;而是逐漸演變為集出行、娛樂、社交于一體的智能移動空間。美攝科技&#xff0c;作為前沿視覺技術與人工智能應用的領航者&#xff0c;憑借其卓越的技術實力和創新精神&#xff0c;攜手汽車行業&…

Postman CORS 測試完全指南:輕松模擬跨域請求,排查 CORS 相關問題

在使用 Postman 進行 API 測試時&#xff0c;通常不會遇到跨域問題&#xff0c;因為 Postman 是一個獨立的客戶端應用程序&#xff0c;不同于在瀏覽器中運行的 JavaScript 代碼&#xff0c;它沒有同源策略&#xff08;SOP&#xff09;的限制。跨域資源共享&#xff08;CORS&…

基于SpringBoot和Vue的SQL TO API平臺的設計與實現

文章目錄 前言一、系統功能模塊二、數據庫設計1. 實體屬性圖1. 實體屬性圖1.1 職員表1.2 數據源配置表1.3 接口配置表1.4 請求記錄表 2. E-R圖 三、系統實現1. 登錄頁面2. 職員管理頁面1.1 創建用戶1.2 編輯用戶 2. 數據源管理2.1 創建數據源2.2 編輯數據源 3. 接口管理3.1 創建…

【Portainer】Docker可視化組件安裝

Portainer Portainer 是用于管理容器化環境的一體化平臺工程解決方案&#xff0c;提供廣泛的定制功能&#xff0c;以滿足個人開發人員和企業團隊的需求。 官方地址: https://www.portainer.io/ 安裝 在 WSL / Docker Desktop 上使用 Docker 安裝 Portainer CE 通過命令或UI頁…

Gateway實戰(二)、負載均衡

spring cloud- Gateway實戰二:負載均衡 負載均衡簡單了解一)、實操案例-自動負載均衡二)、實操案例-手動負載均衡1、手動負載均衡方式一2、手動負載均衡方式二負載均衡 簡單了解 我們引入負載均衡,主要是為了 讓網絡流量能均勻的分發到多個服務器上,以此來提高系統性能、…

MATLAB 2024b深度學習,圖神經網絡(GNN)

隨著人工智能技術向多學科交叉融合與工程實踐領域縱深發展&#xff0c;MATLAB 2024b深度學習工具箱通過架構創新與功能強化&#xff0c;為科研創新和行業應用提供了全棧式解決方案。本培訓基于該版本工具鏈的三大革新方向展開&#xff1a;一是構建覆蓋經典模型與前沿架構的體系…

unity中Xcharts圖表鼠標懸浮表現異常

鼠標懸浮在面板附近&#xff0c;只顯示單獨的一個項目 而且無論鼠標如何移動&#xff0c;根本沒有效果。 解決方案&#xff1a; 需要在對應的Canvas上綁定主相機才可以 鼠標移動到項目上就有信息展示了

使用Django創建項目及介紹

注意&#xff1a;創建項目之前先進入虛擬環境 第一步&#xff1a;創建 Django 項目 運行以下命令創建一個名為 myproject 的 Django 項目&#xff1a; django-admin startproject myproject 這會生成一個名為 myproject 的文件夾&#xff0c;結構如下&#xff1a; myproje…

CentOS 安裝LAMP全過程 - 完整步驟與最佳實踐

在開始搭建 LAMP 環境之前&#xff0c;需要確保系統已經滿足以下條件&#xff1a; 1、操作系統&#xff1a;CentOS 7 或 CentOS 8 2、網絡連接&#xff1a;系統必須能夠訪問互聯網以下載所需的軟件包 3、權限&#xff1a;需要 root 權限或者通過sudo 提權執行命令 先更新系…

Java基礎關鍵_031_反射(一)

目 錄 一、概述 二、獲取 Class 的四種方式 1.Class.forName("完整全限定類名") 2.getClass() 3.class 屬性 4.通過類加載器獲取 三、通過反射機制實例化對象 1.newInstance()&#xff08;已過時&#xff09; 2.配置文件利用反射機制實例化對象 四、反射 Cla…

MySQL高級語句深度解析與應用實踐

一、窗口函數&#xff1a;數據分析的利器 1. 窗口函數基礎概念 窗口函數(Window Function)是MySQL 8.0引入的強大特性&#xff0c;它可以在不減少行數的情況下對數據進行聚合計算和分析 SELECT employee_name,department,salary,RANK() OVER (PARTITION BY department ORDER…

【機器學習基礎 4】 Pandas庫

一、Pandas庫簡介 Pandas 是一個開源的 Python 數據分析庫&#xff0c;主要用于數據清洗、處理、探索與分析。其核心數據結構是 Series&#xff08;一維數據&#xff09;和 DataFrame&#xff08;二維表格數據&#xff09;&#xff0c;可以讓我們高效地操作結構化數據。Pandas …

ETCD --- ?租約(Lease)?詳解

一、租約的核心概念 1. ?租約(Lease)? 一個租約是一個有時間限制的“授權”,綁定到鍵值對上。每個租約有一個唯一的ID(64位整數),通過etcdctl或客戶端API創建。創建租約時需指定TTL(Time-To-Live),即租約的有效期(單位:秒)。客戶端需定期向etcd發送續約(KeepAl…

33.[前端開發-JavaScript基礎]Day10-常見事件-鼠標事件-鍵盤事件-定時器-案例

1 window定時器 window定時器方法 setTimeout的使用 setInterval的使用 2 輪播消息提示 案例實戰一 – 輪播消息提示 3 關閉隱藏消息 案例實戰二 – 關閉隱藏消息 4 側邊欄展示 案例實戰三 – 側邊欄展示 5 tab切換實現 案例實戰四 – 登錄框&#xff08;作業&#xff09;…

react ant design樹穿梭框實現搜索并展開到子節點、同級節點選擇及同時選擇數量限制功能

功能點&#xff1a; 點擊節點前的箭頭&#xff0c;可以手動展開或折疊該節點的子節點。在搜索框中輸入關鍵詞&#xff0c;匹配的節點及其父節點會自動展開。清空搜索框后&#xff0c;恢復到用戶手動控制的展開狀態。勾選節點時仍然遵守 "最多勾選 6 個節點" 和 &quo…