如何在Java中使用Kafka

如何在Java中使用Kafka

大家好,我是免費搭建查券返利機器人省錢賺傭金就用微賺淘客系統3.0的小編,也是冬天不穿秋褲,天冷也要風度的程序猿!

Kafka是一個分布式流處理平臺,廣泛用于實時數據流的處理和傳輸。本文將詳細介紹如何在Java中使用Kafka,并通過示例代碼展示如何實現生產者和消費者。

1. 準備工作

在開始編寫代碼之前,需要完成以下準備工作:

  1. 安裝Kafka并啟動Kafka服務器。
  2. 添加Kafka的Java客戶端依賴。

在Maven項目中,可以在pom.xml文件中添加以下依賴:

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.7.0</version>
</dependency>

2. 創建Kafka生產者

Kafka生產者用于向Kafka主題發送消息。以下是創建Kafka生產者的示例代碼:

package cn.juwatech.kafka;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Properties;
import java.util.concurrent.Future;public class ProducerExample {public static void main(String[] args) {// 設置Kafka生產者的配置Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 創建Kafka生產者KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 創建消息ProducerRecord<String, String> record = new ProducerRecord<>("my_topic", "key", "value");try {// 發送消息Future<RecordMetadata> future = producer.send(record);RecordMetadata metadata = future.get();System.out.printf("Message sent to topic:%s partition:%s offset:%s%n", metadata.topic(), metadata.partition(), metadata.offset());} catch (Exception e) {e.printStackTrace();} finally {// 關閉生產者producer.close();}}
}

3. 創建Kafka消費者

Kafka消費者用于從Kafka主題中讀取消息。以下是創建Kafka消費者的示例代碼:

package cn.juwatech.kafka;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class ConsumerExample {public static void main(String[] args) {// 設置Kafka消費者的配置Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "my_group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 創建Kafka消費者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 訂閱主題consumer.subscribe(Collections.singletonList("my_topic"));// 持續消費消息try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("Consumed message with key:%s value:%s from topic:%s partition:%s offset:%s%n",record.key(), record.value(), record.topic(), record.partition(), record.offset());}}} finally {// 關閉消費者consumer.close();}}
}

4. 運行生產者和消費者

確保Kafka服務器已啟動并且my_topic主題已創建。然后,按照以下步驟運行生產者和消費者:

  1. 運行生產者代碼,將消息發送到Kafka主題。
  2. 運行消費者代碼,消費Kafka主題中的消息。

生產者和消費者之間的通信流程如下:

  1. 生產者將消息發送到my_topic主題。
  2. 消費者訂閱my_topic主題并消費消息。

5. 高級配置與優化

在實際應用中,可以根據需要調整Kafka生產者和消費者的配置,以提高性能和可靠性。例如:

  • 批量發送消息: 配置linger.msbatch.size參數,減少網絡請求次數。
  • 消費者組協調: 使用ConsumerConfig.GROUP_ID_CONFIG配置消費者組,實現負載均衡。
  • 自動提交偏移量: 使用enable.auto.commit參數控制偏移量提交策略。

以下是一些常用的配置參數及其說明:

props.put("acks", "all"); // 確保消息被完全提交
props.put("retries", 0); // 發送失敗時不重試
props.put("batch.size", 16384); // 批量發送大小
props.put("linger.ms", 1); // 延遲發送時間
props.put("buffer.memory", 33554432); // 緩沖區大小

總結

本文詳細介紹了如何在Java中使用Kafka,包括創建生產者和消費者的基本步驟,以及一些高級配置與優化建議。通過本文的學習,相信大家能夠掌握基本的Kafka使用方法,并能在實際項目中應用。

微賺淘客系統3.0小編出品,必屬精品!

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/38732.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/38732.shtml
英文地址,請注明出處:http://en.pswp.cn/web/38732.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

什么是Web3D交互展示?有什么優勢?

在智能互聯網蓬勃發展的時代&#xff0c;傳統的圖片、文字及視頻等展示手段因缺乏互動性&#xff0c;正逐漸在吸引用戶注意力和提升宣傳效果上顯得力不從心。而Web3D交互展示技術的橫空出世&#xff0c;則為眾多品牌與企業開啟了一扇全新的展示之門&#xff0c;讓線上產品體驗從…

【C語言】extern 關鍵字

在C語言中&#xff0c;extern關鍵字用于聲明一個變量或函數是定義在另一個文件中的。它使得在多個文件之間共享變量或函數成為可能。extern關鍵字常見于大型項目中&#xff0c;通常用于聲明全局變量或函數&#xff0c;這些變量或函數的定義位于其他文件中。 基本用法 變量聲明…

Python基礎入門知識

目錄 引言 簡要介紹Python語言 為什么要學習Python Python的應用領域 Python安裝和環境配置 Python的下載和安裝(Windows, macOS, Linux) 配置Python環境變量 安裝和使用IDE(如PyCharm, VS Code) Python基本語法 注釋 變量和數據類型(數字,字符串,列表,元組,字典,…

P3374 【模板】樹狀數組 1

題目描述 如題&#xff0c;已知一個數列&#xff0c;你需要進行下面兩種操作&#xff1a; 將某一個數加上 &#x1d465;x 求出某區間每一個數的和 輸入格式 第一行包含兩個正整數 &#x1d45b;,&#x1d45a;n,m&#xff0c;分別表示該數列數字的個數和操作的總個數。 …

<sa8650>sa8650 qcxserver-之-攝像頭傳感器VB56G4A驅動開發<1>

<sa8650>sa8650 qcxserver-之-攝像頭傳感器VB56G4A驅動開發 <1> 一、前言二、QCX架構三、QCX 傳感器驅動程序定制開發3.1 sensor硬件接口3.2 sensor配置文件3.2.1 cameraconfig.c3.2.2 cameraconfigsa8650_water.c3.2.3 新增編譯MK3.2.4 參數解析3.2.4.1 struct Camera…

干式電抗器的工作原理是什么

干式電抗器是電力系統中常用的無功補償設備&#xff0c;主要用于調節電網的電壓、提高功率因數、限制短路電流等。它的工作原理主要是通過在電路中引入一個與負載電流相反的磁場&#xff0c;從而產生一個與負載電流相抵消的電抗力&#xff0c;達到調節電壓和功率因數的目的。 干…

常微分方程算法之編程示例十-兩點狄利克雷邊值問題(理查德森外推法)

目錄 一、研究問題 二、C++代碼 三、計算結果 一、研究問題 本節我們采用理查德森法對示例八中的兩點狄利克雷邊值問題進行外推求解,相應的原理及推導思路請參考: 常微分方程算法之高精度算法(Richardson法+緊差分法)_richardson外推法-CSDN博客https://blog.csdn.net/…

20_系統測試與維護

目錄 測試基礎知識 測試原則 動態測試 靜態測試 測試策略 測試階段 測試用例設計 黑盒測試用例設計 白盒測試用例設計 McCabe度量法 魯棒性測試 缺陷探測率(Defect Detection Percentage,DDP) 調試 系統維護基礎 系統轉換 系統維護指標 軟件容錯技術 嵌入式安…

Stream流學習mapping

Stream流學習mapping 一、前言1. 基本用法2. 結合 Collectors.mapping3. 自定義轉換函數4.總結 一、前言 在Java的Stream API中&#xff0c;mapping 是一個非常有用的中間操作&#xff0c;它可以將流中的元素映射成其他形式。通常與 Collectors.groupingBy 或者 Collectors.ma…

【AI 大模型訓練數據白皮書 2024】

文末有福利&#xff01; 自《中共中央國務院關于構建數據基礎制度更好發揮數據要素作用的意見》發布以來&#xff0c;我國數據要素建設不斷深入&#xff0c;在國家數據局等 17 部門聯合印發的《“數據要素 ” 三年行動計劃&#xff08;2024 - 2026 年&#xff09;》進一步明確…

z-index的工作原理

z-index的工作原理 HTML文檔中的元素卻是存在于三個維度之中。除了大家熟知的平面畫布中的x軸和y軸&#xff0c;還有控制第三維度的z軸。 像 margin , float , offset 這些屬性&#xff0c;控制著元素在x軸和y軸上的表現形式一樣。 z-index 這個屬性控制著元素在z軸上的表現形…

不使用AMap.DistrictSearch,通過poi數據繪制省市縣區塊

個人申請高德地圖key時無法使用AMap.DistrictSearch&#xff0c;可以通過poi數據繪制省市縣區塊 1.進入POI數據網站找到需要的省市縣&#xff0c;下載對應的GeoJson文件 &#xff0c;此處為poi數據網站鏈接 2.? 處理geoJson數據&#xff0c;可以直接新建json文件&#xff0c;…

FIPS PUB 196 ENTITY AUTHENTICATION USING PUBLIC KEY CRYPTOGRAPHY

部分原文 3.3 Mutual authentication protocol The following mutual entity authentication protocol is based on Section 522. “Three pass authentication”, ofISO/IEC 9798-3. Certain authentication token fields and protocol steps are specified in greater deta…

在Windows命令行中設置定時關機

在Windows命令行中設置定時關機&#xff0c;你可以使用shutdown命令。下面是幾個實用的例子&#xff1a; 立即關機: shutdown /s /t 0延遲關機: 假設你想在30分鐘后關機&#xff0c;可以使用&#xff08;30分鐘等于1800秒&#xff09;:shutdown /s /t 1800定時關機: 如果你想在…

【機器學習】在【Pycharm】中的實踐教程:使用【邏輯回歸模型】進行【乳腺癌檢測】

目錄 案例背景 具體問題 1. 環境準備 小李的理解 知識點 2. 數據準備 2.1 導入必要的庫和數據集 小李的理解 知識點 2.2 數據集基本信息 小李的理解 知識點 注意事項 3. 數據預處理 3.1 劃分訓練集和測試集 小李的理解 知識點 注意事項 3.2 數據標準化 小李…

controller不同的后端路徑對應vue前端傳遞數據發送請求的方式,vue請求參數 param 與data 如何對應后端參數

目錄 案例一&#xff1a; 為什么使用post發送請求&#xff0c;參數依舊會被拼接帶url上呢&#xff1f;這應該就是param 與data傳參的區別。即param傳參數參數會被拼接到url后&#xff0c;data會以請求體傳遞 補充&#xff1a;后端controller 參數上如果沒寫任何注解&#xff0c…

第二高的薪水

第二高的薪水&#xff1a; 描述 查詢并返回 Employee 表中第二高的薪水 。如果不存在第二高的薪水&#xff0c;查詢應該返回 null(Pandas 則返回 None) pandas import pandas as pddef second_highest_salary(employee: pd.DataFrame):# 1. 刪除所有重復的薪水.employee emp…

第一后裔進不去游戲怎么辦 第一后裔免費加速器推薦

Steam年度最熱心愿榜單第五的游戲終于上線了&#xff0c;包好玩的新游&#xff0c;第一后裔&#xff0c;為什么說他肯定好玩呢&#xff1f;因為游戲第一次測試在兩年前就開始了&#xff0c;中間也斷斷續續測試了好多次&#xff0c;很多小伙伴都是體驗過游戲的&#xff0c;經過多…

MySQL 9.0正式版本來了!

MySQL 9.0 第一個正式版本于 2024 年 7 月 1 日發布&#xff0c;這是一個創新版&#xff0c;意味著它會增加一些新功能、修復一些問題并棄用一些舊功能。 性能相關 MySQL 9.0 支持將 EXPLAIN ANALYZE 命令輸出的 JSON 數據存儲到用戶定義的變量中&#xff0c;語法如下&#x…

【硬件模塊】PN532 NFC讀卡串口通信

PN532 PN532是一款功能豐富的非接觸式通訊收發模塊&#xff0c;其基于8051單片機核心&#xff0c;集成了多種通信接口和工作模式&#xff0c;以滿足不同應用場景的需求。以下是PN532功能相關的詳細介紹&#xff1a; 多種通信接口&#xff1a;PN532支持I2C、SPI和UART&#xff0…