【大數據之Kafka】三、Kafka生產者之消息發送流程及同步異步發送API

??將外部傳送給過來的數據發送到kafka集群。

1 發送原理

(1)創建main()線程,創建producer對象,調用send方法,經過攔截器(可選)、序列化器、分區器。

(2)分區器將數據發送到分區中,每個分區創建一個隊列(分區是在內存中完成的),內存總大小為32M,每個批次的大小為16K。

(3)sender線程將緩沖隊列中的數據讀取出來發往Kafka集群,根據batch.size和linger.ms拉取數據(即每批次的數據滿了之后或者設置的時間到了之后拉取數據)。

(4)sender線程拉取數據,以每個節點為一組,當第一個請求數據發送到broker1中,broker沒有及時應答,還是能發送第二個請求,最多有5個請求都沒有收到應答就不會再繼續發送請求。

(5)selector打通輸入流和輸出流。

(6)鏈路接通后發送數據。

(7)Kafka集群收到數據后根據副本機制進行副本同步。

(8)Kafka集群收到數據后根據應答機制進行應答。

(9)selector根據Kafka集群反饋的消息進行判斷。

(10)如果成功則刪掉該請求同時在緩沖隊列里清理掉對應的每一個分區的數據;如果失敗則進行重試,重新發送請求,知道成功為止。

在這里插入圖片描述

2 生產者重要參數列表

在這里插入圖片描述

3 異步發送API

3.1 普通異步發送

(1)需求:創建Kafka 生產者,采用異步的方式發送到 Kafka Broker
(2)分析:異步發送即將外部的數據發送到緩沖隊列里(不管緩沖隊列中的數據有沒有發送到Kafka集群)。

步驟:
(1)創建kafka工程,在pom.xml中導入依賴:

<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency>
</dependencies>

(2)創建類:com.astudy.kafka.producer.CustomProducer

package com.study.kafka.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class CustomProducer {public static void main(String[] args) {//0.創建 kafka 生產者的配置對象Properties properties = new Properties();//給 kafka 配置對象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");// key,value 序列化(必須):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//1.創建 kafka 生產者對象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);//2.調用 send 方法,發送消息for (int i = 0; i < 3; i++) {kafkaProducer.send(new ProducerRecord<>("first","test"+i));}//3.關閉資源kafkaProducer.close();}
}

(3)測試:
在hadoop102上開啟Kafka消費者:

 kafka-console-consumer.sh --bootstrap-server hadoop102:9092,hadoop103:9092 --topic first

在 IDEA 中執行代碼,觀察 hadoop102 控制臺中是否接收到消息:
在這里插入圖片描述

3.2 帶回調函數的異步發送

分析:
??回調函數會在 producer 收到 ack 時調用,為異步調用,該方法有兩個參數,分別是元數據信息(RecordMetadata)和異常信息(Exception),如果 Exception 為 null,說明消息發送成功,如果Exception 不為 null,說明消息發送失敗。

package com.study.kafka.producer;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class CustomProducerCallback {public static void main(String[] args) throws InterruptedException {//0.創建 kafka 生產者的配置對象Properties properties = new Properties();//給 kafka 配置對象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");// key,value 序列化(必須):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//1.創建 kafka 生產者對象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);//2.調用 send 方法,發送消息for (int i = 0; i < 3; i++) {kafkaProducer.send(new ProducerRecord<>("first", "test" + i), new Callback() {// 該方法在 Producer 收到 ack 時調用,為異步調用@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null) {// 沒有異常,輸出信息到控制臺System.out.println("topic:" + recordMetadata.topic() + "  partition:" + recordMetadata.partition());}else {// 出現異常打印e.printStackTrace();}}});// 延遲一會會看到數據發往不同分區Thread.sleep(2);}//3.關閉資源kafkaProducer.close();}
}

測試:
在hadoop102上開啟Kafka消費者:

 kafka-console-consumer.sh --bootstrap-server hadoop102:9092,hadoop103:9092 --topic first

在 IDEA 中執行代碼,觀察 hadoop102 控制臺中是否接收到消息:
在這里插入圖片描述
在這里插入圖片描述

4 同步發送API

分析:只需在異步發送的基礎上,再調用一下 get()方法即可。

package com.study.kafka.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class CustomProducerSync {public static void main(String[] args) throws ExecutionException, InterruptedException {//0.創建 kafka 生產者的配置對象Properties properties = new Properties();//給 kafka 配置對象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");// key,value 序列化(必須):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//1.創建 kafka 生產者對象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);//2.調用 send 方法,發送消息for (int i = 0; i < 3; i++) {kafkaProducer.send(new ProducerRecord<>("first","test"+i)).get();}//3.關閉資源kafkaProducer.close();}
}

測試:
在hadoop102上開啟Kafka消費者:

 kafka-console-consumer.sh --bootstrap-server hadoop102:9092,hadoop103:9092 --topic first

在 IDEA 中執行代碼,觀察 hadoop102 控制臺中是否接收到消息:
在這里插入圖片描述

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/38845.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/38845.shtml
英文地址,請注明出處:http://en.pswp.cn/news/38845.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【Android Framework (十二) 】- 智能硬件設備開發

文章目錄 前言智能硬件的定義與應用智能硬件產品開發流程智能硬件開發所涉及的技術體系概述關于主板選型主板CPU芯片的選擇關于串口通信 總結 前言 針對我過往工作經歷&#xff0c;曾在一家智能科技任職Android開發工程師&#xff0c;簡單介紹下關于任職期間接觸和開發過的一些…

DDPM: Denoising Diffusion Probabilistic Models

DDPM: Denoising Diffusion Probabilistic Models 去噪擴散模型前向過程-加噪聲反向過程-去噪聲 去噪擴散模型 論文題目&#xff1a;Denoising Diffusion Probabilistic Models (DDPM) 論文來源&#xff1a;NIPS, 2020 論文地址&#xff1a;https://arxiv.org/abs/2006.11239 論…

RH850從0搭建Autosar開發環境【24】- Davinci Configurator之DEM模塊配置詳解(上)

DEM模塊配置詳解 - 上 一、Autosar中DEM模塊簡介1.DEM對其他模塊的依賴2.DEM模塊架構2.1 DEM模塊Dem Satellite(s) 和Master2.2 診斷事件處理2.2.1 基于計數器的算法2.2.2 基于時間的算法三、配置錯誤項處理3.1 容器DemEventParameter3.2 容器DemOperationCycleRef3.3 容器DemO…

13.3 目標檢測和邊界框

錨框的計算公式 假設原圖的高為H,寬為W 詳細公式推導 以同一個像素點為錨框&#xff0c;可以生成 (n個縮放 m個寬高比 -1 )個錨框 錨框的作用&#xff1a; 不用直接去預測真實框的四個坐標&#xff0c;而是&#xff1a; 1.先生成多個錨框。 2.預測每個錨框里是否含有要預測…

C++:哈希表——模擬散列表

模擬散列表 維護一個集合&#xff0c;支持如下幾種操作&#xff1a; 1.“I x”&#xff0c;插入一個數x 2.“Q x”&#xff0c;詢問數x是否在集合中出現過 現在要進行N次操作&#xff0c;對于每個詢問操作輸出對應的結果 輸入格式 第一行包含整數N&#xff0c;表示操作數量 …

【Linux】【驅動】雜項設備驅動

【Linux】【驅動】雜項設備驅動 Linux三大設備驅動1. 我們這節課要講的雜項設備驅動是屬于我們這三大設備驅動里面的哪個呢?2.雜項設備除了比字符設備代碼簡單&#xff0c;還有別的區別嗎?3.主設備號和次設備號是什么? 掛載驅動 雜項設備驅動是字符設備驅動的一種&#xff0…

小程序制作教程:從零開始搭建企業小程序

在如今的數字化時代&#xff0c;企業介紹小程序成為了企業展示與推廣的重要工具。通過企業介紹小程序&#xff0c;企業可以向用戶展示自己的品牌形象、產品服務以及企業文化等內容&#xff0c;進而提高用戶對企業的認知度和信任度。本文將介紹如何從零開始搭建一個企業介紹小程…

Linux常用命令詳細大全

目錄 1、查看目錄與文件&#xff1a;ls2、切換目錄&#xff1a;cd3、顯示當前目錄&#xff1a;pwd4、創建空文件&#xff1a;touch5、創建目錄&#xff1a;mkdir6、查看文件內容&#xff1a;cat7、分頁查看文件內容&#xff1a;more8、查看文件尾內容&#xff1a;tail9、拷貝&a…

小程序 vant 項目記錄總結 使用 scss 分享 訂閱消息 wxs 分包 echarts圖表 canvas getCurrentPages頁面棧

小程序 vant vant 下載 npm init -ynpm i vant/weapp -S --production修改 app.json 將 app.json 中的 “style”: “v2” 去除 修改 project.config.json {..."setting": {..."packNpmManually": true,"packNpmRelationList": [{"p…

域名配置HTTPS

一、注冊域名 這個可以在各大平臺注冊&#xff0c;具體看一下就會注冊了&#xff0c;自己挑選一個自己喜歡的域名。 步驟一般也就是先實名&#xff0c;實名成功了才能注冊域名。 二、辦理SSL證書 這里使用的是阿里云的SSL免費證書 1、申請證書 二、填寫申請 三、域名綁定生…

公司電腦三維圖紙加密、機械圖擋加密軟件

機械圖紙加密軟件的問世&#xff0c;讓很多的網絡公司都大受其帶來的工作中的便利。在安裝了機械圖紙加密軟件后&#xff0c;不僅可以很好的管理員工在工作時的上網娛樂&#xff0c;在對整個公司員工的工作效率上也有著明顯的提高&#xff0c;那么對于機械圖紙加密軟件的具體特…

【C#】靜默安裝、SQL SERVER靜默安裝等

可以通過cmd命令行來執行&#xff0c;也可以通過代碼來執行&#xff0c;一般都需要管理員權限運行 代碼 /// <summary>/// 靜默安裝/// </summary>/// <param name"fileName">安裝文件路徑</param>/// <param name"arguments"…

word 應用 打不開 顯示一直是正在啟動中

word打開來顯示一直正在啟動中&#xff0c;其他調用word的應用也打不開&#xff0c;網上查了下以后進程關閉spoolsv.exe,就可以正常打開word了

演進式架構

演進能力是一種元特征和保護其他所有架構特征的架構封裝器IEEE 的軟件架構定義中的41 視圖模型。它關注不同角色的不同視角&#xff0c;將整個系統劃分成了邏輯視圖、開發視圖、進程視圖和物理視圖架構師確定了可審計性、數據、安全性、性能、合法性和伸縮性是該應用的關鍵架構…

機器學習:特征工程之特征預處理

目錄 特征預處理 1、簡述 2、內容 3、歸一化 3.1、魯棒性 3.2、存在的問題 4、標準化 ?所屬專欄&#xff1a;人工智能 文中提到的代碼如有需要可以私信我發給你&#x1f60a; 特征預處理 1、簡述 什么是特征預處理&#xff1a;scikit-learn的解釋&#xff1a; provide…

linux系統服務學習(六)FTP服務學習

文章目錄 FTP、NFS、SAMBA系統服務一、FTP服務概述1、FTP服務介紹2、FTP服務的客戶端工具3、FTP的兩種運行模式&#xff08;了解&#xff09;☆ 主動模式☆ 被動模式 4、搭建FTP服務&#xff08;重要&#xff09;5、FTP的配置文件詳解&#xff08;重要&#xff09; 二、FTP任務…

Python基礎語法入門(第二十天)——文件操作

一、基礎內容 在Python中&#xff0c;路徑可以以不同的表現形式進行表示。以下是一些常用的路徑表現形式&#xff1a; 1. 絕對路徑&#xff1a;它是完整的路徑&#xff0c;從根目錄開始直到要操作的文件或文件夾。在Windows系統中&#xff0c;絕對路徑以盤符開始&#xff0c;…

【學會動態規劃】環形子數組的最大和(20)

目錄 動態規劃怎么學&#xff1f; 1. 題目解析 2. 算法原理 1. 狀態表示 2. 狀態轉移方程 3. 初始化 4. 填表順序 5. 返回值 3. 代碼編寫 寫在最后&#xff1a; 動態規劃怎么學&#xff1f; 學習一個算法沒有捷徑&#xff0c;更何況是學習動態規劃&#xff0c; 跟我…

CSS 兩欄布局和三欄布局的實現

文章目錄 一、兩欄布局的實現1. floatmargin2. flaotBFC3. 定位margin4. flex 布局5. grid布局 二、三欄布局的實現1. float margin2. float BFC3. 定位 margin(或者定位BFC)4. flex布局5. 圣杯布局6. 雙飛翼布局 一、兩欄布局的實現 兩欄布局其實就是左側定寬&#xff0c;…

高層建筑全景vr火災隱患排查模擬培訓軟件助力群眾防范火災傷害

隨著城市化進程的加快&#xff0c;樓宇建筑的數量也在不斷增加。然而&#xff0c;樓宇消防安全問題也日益突出。為了提高樓宇員工和居民的消防安全意識&#xff0c;樓宇VR消防安全教育培訓應運而生。VR安全培訓公司深圳華銳視點制作的樓宇vr消防安全教育培訓&#xff0c;包括消…