springboot整合Kafka的快速使用教程

? ? ? ?

目錄

一、引入Kafka的依賴

二、配置Kafka

三、創建主題

1、自動創建(不推薦)

2、手動動創建

四、生產者代碼

五、消費者代碼 ?

六、常用的KafKa的命令


? ? ? ? Kafka是一個高性能、分布式的消息發布-訂閱系統,被廣泛應用于大數據處理、實時日志分析等場景。Spring Boot作為目前最流行的Java開發框架之一,其簡潔的配置和豐富的工具使得與Kafka的集成變得更加容易。本文將介紹如何使用Spring Boot整合Kafka,實現高效的數據處理和消息傳遞。

一、引入Kafka的依賴

? ? ? ?<dependency>
? ? ? ? ? ? <groupId>org.springframework.cloud</groupId>
? ? ? ? ? ? <artifactId>spring-cloud-starter-stream-kafka</artifactId>
? ? ? ? </dependency>

二、配置Kafka

spring:kafka:bootstrap-servers: 156.65.20.76:9092,156.65.20.77:9092,156.65.20.78:9092 #指定Kafka集群的地址,這里有三個地址,用逗號分隔。listener:ack-mode: manual_immediate #設置消費者的確認模式為manual_immediate,表示消費者在接收到消息后立即手動確認。concurrency: 3  #設置消費者的并發數為3missing-topics-fatal: false  #設置為false,表示如果消費者訂閱的主題不存在,不會拋出異常。producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer  # 設置消息鍵的序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer #設置消息值的序列化器acks: 1  #一般就是選擇1,兼顧可靠性和吞吐量 ,如果想要更高的吞吐量設置為0,如果要求更高的可靠性就設置為-1consumer:auto-offset-reset: earliest #設置為"earliest"表示將從最早的可用消息開始消費,即從分區的起始位置開始讀取消息。enable-auto-commit: false #禁用了自動提交偏移量的功能,為了避免出現重復數據和數據丟失,一般都是手動提交key-deserializer: org.apache.kafka.common.serialization.StringDeserializer  # 設置消息鍵的反序列化器value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #設置消息值的反序列化器

注:kafka的acks有三個值,可以根據實際情況和需求平衡消息系統的吞吐量和數據安全性,來選擇對應的值。

  • acks=0:這是最不可靠的模式。當設置為acks=0時,生產者在發送消息后不會等待任何服務器端的確認響應。這種模式下,生產者可以迅速繼續發送下一批消息,效率最高,但風險也最大。如果在此模式下發生網絡問題或broker故障,發送的消息可能會永久丟失,生產者無法得知消息是否成功到達Kafka broker。因此,這種配置適合于能夠容忍少量數據丟失的場景,例如實時數據分析或生成非關鍵的實時報表。
  • acks=1:這是默認的配置模式,也是一種折衷方案。在這種模式下,生產者會等待分區的領導者節點(leader)確認消息已經成功寫入磁盤,才會發送確認信息給生產者。這提高了數據的安全性,因為只要領導者節點保存了消息,即使跟隨者(replicas)沒有及時同步,消息也不會丟失。然而,如果領導者在同步給所有追隨者之前崩潰,那么尚未同步的副本將無法獲取該消息,仍然存在消息丟失的風險。
  • acks=all或-1:這是最可靠的模式。在這個模式下,生產者不僅需要領導者節點確認,還會等待所有同步副本(In-sync replicas, ISR)都確認寫入消息后才會收到確認。這極大地增強了數據的持久性保證,確保了即使在多個節點故障的情況下,消息也不會丟失。此模式適用于數據可靠性要求非常高的場景,如金融交易系統或重要的日志記錄

三、創建主題

? ? 1、自動創建(不推薦)

? ? ? ? ?不存在的主題,會自動創建,分區數和副本數均為默認值。而默認值可能會不符合某些場景的要求。

在kafka的安裝目錄conf目錄下找到該配置文件server.properties,添加如下配置:
num.partitions=3 #默認3個分區
auto.create.topics.enable=true #開啟自動創建主題
default.replication.factor=3 #默認3個副本

? ? 2、手動動創建

? ? ? ? ?在kafka的安裝目錄bin目錄下,執行如下命令:?

//創建一個有三個分區和三個副本,名為zhuoye的主題
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 ?--topic zhuoye?

四、生產者代碼

@Slf4j
@Component
public class ALiYunServiceImpl implents IALiYunService {@Autowiredprivate KafkaTemplate kafkaTemplate;@Autowiredprivate ExecutorService executorService;String topicName = "zhuoye";@Overridepublic void queryECSMetricInfo() {//發送到kafka的消息集合,因為使用了多線程,并且在多線程中往該集合進行添加操作,所以需要線程安全的List<Message> messages = Collections.synchronizedList(new ArrayList<>());boolean flag = true;//獲取上次查詢時間Long startTime = Long.valueOf(queryTimeRecordMapper.selectTimeByBelongId(3)) * 1000;Long endTime = System.currentTimeMillis();try {//查詢出所有的運行中的實例List<CloudInstanceAssetDto> cloudInstances = cloudInstanceAssetMapper.queryAllRunningInstance(1, "Running");if (CollectionUtils.isEmpty(cloudInstances)) {return;}//定義計數器CountDownLatch latch = new CountDownLatch(cloudInstances.size());//遍歷查詢for (CloudInstanceAssetDto instance : cloudInstances) {executorService.submit(() -> {try {//獲取內網流出帶寬,并將結果封裝到消息集合中dealMetricDataToMessage(ALiYunConstant.ECS_INTRANET_OUT_RATE, ALiYunConstant.INTRANET_OUT_RATE_NAME, ALiYunConstant.LW_INTRANET_OUT_RATE_CODE,startTime, endTime, instance, messages);} catch (Exception e) {log.error("獲取ECS的指標數據-多線程處理任務異常!", e);} finally {latch.countDown();}});}//等待任務執行完畢latch.await();//將最終的消息集合發送到kafkaif (CollectionUtils.isNotEmpty(messages)) {for (int i = 0; i < messages.size(); i++) {if (StringUtils.isNotBlank(messages.get(i).getValue())&& "noSuchInstance".equals(messages.get(i).getValue())) {continue;}kafkaTemplate.send(topicName,  messages.get(i));}}} catch (Exception e) {flag = false;log.error("獲取ECS的指標數據失敗", e);}//更新記錄上次查詢時間if (flag) {QueryTimeRecord queryTimeRecord = new QueryTimeRecord();queryTimeRecord.setBelongId(3).setLastQueryTime(String.valueOf((endTime - 1000 * 60 * 1) / 1000)); //開始時間往前推1分鐘queryTimeRecordMapper.updateByBelongId(queryTimeRecord);}}

這個時候,如果你想看有沒有把消息發送到kafka的指定主題可以使用如下命令:

kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic zhuoye?

五、消費者代碼 ?

@Slf4j
@Component
public class KafkaConsumer {// 消費監聽@KafkaListener(topics = "zhuoye",groupId ="zhuoye-aliyunmetric")public void consumeExtractorChangeMessage(ConsumerRecord<String, String> record, Acknowledgment ack){try {String value = record.value();//處理數據,存入openTsDb.................................ack.acknowledge();//手動提交}catch (Exception e){log.error("kafa-topic【zhuoye】消費阿里云指標源消息【失敗】");log.error(e.getMessage());}}
}

六、常用的KafKa的命令

//創建主題
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 ?--topic zhuoye
//查看kafka是否接收對應的消息
?kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic zhuoye
// 修改kafka-topic分區數
./kafka-topics.sh --zookeeper localhost:2181 -alter --partitions 6 --topic zhuoye
// 查看topic分區數
./kafka-topics.sh --zookeeper localhost:2181 --describe --topic zhuoye
// 查看用戶組消費情況
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group zhuoye-aliyunmetric --describe

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/17348.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/17348.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/17348.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

山東大學軟件學院項目實訓-創新實訓-基于大模型的旅游平臺(二十一)- 微服務(1)

微服務 1.認識微服務 SpringCloud底層是依賴于SpringBoot的&#xff0c;并且有版本的兼容關系&#xff0c;如下&#xff1a; 2. 服務拆分 需求 &#xff1a; 把訂單信息和用戶信息一起返回 從訂單模塊向用戶模塊發起遠程調用 &#xff0c; 把查到的結果一起返回 步驟 &…

多態(難的起飛)

注意 virtual關鍵字&#xff1a; 1、可以修飾原函數&#xff0c;為了完成虛函數的重寫&#xff0c;滿足多態的條件之一 2、可以菱形繼承中&#xff0c;去完成虛繼承&#xff0c;解決數據冗余和二義性 兩個地方使用了同一個關鍵字&#xff0c;但是它們互相一點關系都沒有 虛函…

JAVASE總結一

1、 2、引用也可以是成員變量&#xff08;實例變量&#xff09;&#xff0c;也可以是局部變量&#xff1b;引用數據類型&#xff0c;引用&#xff0c; 我們是通過引用去訪問JVM堆內存當中的java對象&#xff0c;引用保存了java對象的內存地址&#xff0c;指向了JVM堆內存當中…

ESP32 - Micropython ESP-IDF 雙線教程 脈寬調制(PWM)(1)

ESP32 - Micropython ESP-IDF 雙線教程 脈寬調制&#xff08;PWM&#xff09; PWM 的基本原理PWM 的應用PWM 的優點PWM 的實現方式ESP32-micropython 中的 PWM 功能使用 micropython 控制 PWM 的代碼示例代碼介紹 ESP32-IDF 中的 PWM 功能1. 初始化配置函數2. 引腳綁定函數3. 占…

常見算法200個(5):快速排序(快排)

JS實現快速排序 1.快速排序思路&#xff1a; 選擇數組中的一個值作為基準&#xff0c;將數組中小于該值的數置于該數之前&#xff0c;大于該值的數置于該數之后&#xff0c;接著對該數前后的兩個數組進行重復操作直至排序完成。 2.代碼實現&#xff1a; function quick(arr)…

使用 Snort 進行入侵檢測

使用 Snort 進行入侵檢測 Snort 是一種流行的開源入侵檢測系統。您可以在http://www.snort.org/上獲取它。Snort 分析流量并嘗試檢測和記錄可疑活動。Snort 還能夠根據其所做的分析發送警報。 Snort 安裝 在本課中&#xff0c;我們將從源代碼安裝。此外&#xff0c;我們不會安…

2024 前端面試每日1小時

三日 1. 如何理解Vue的模板編譯原理 Vue的模板編譯實際就是將模板字符串通過解析、優化和代碼生成等步驟轉換為渲染函數的過程。這個過程中&#xff0c;AST扮演了非常重要的角色&#xff0c;它用樹形結構描述了模板的內容和結構&#xff0c;是編譯過程的核心數據結構&#xff…

MySQL——適合不適合創建索引的情況

那些情況適合創建索引 1、字段的數值具有唯一性的限制 索引本身可以起到約束的作用&#xff0c;比如唯一索引、主鍵索引都是可以起到唯一性約束的&#xff0c;因此在我們的數據表中&#xff0c;如果某個字段是唯一性的&#xff0c;就可以直接創建唯一性索引&#xff0c;或者主…

Nodejs 爬蟲 案例

1.安裝&#xff1a; npm install cheerio npm install axios2.介紹&#xff1a; 2.1 cheerio 特點和用途描述&#xff1a; HTML解析和操作&#xff1a;Cheerio 可以將 HTML 字符串加載到內存中&#xff0c;并將其轉換為一個可操作的 DOM 樹結構&#xff0c;從而可以方便地對…

AURIX TC3xx單片機介紹-啟動過程介紹1

從各個域控制器硬件解決方案來看,MPU可能來自多個供應商,有瑞薩,有NXP等,但對于MCU來說,基本都采用英飛凌TC3xx。 今天我們就來看一下TC3xx的啟動過程,主要包含如下內容: uC上電過程中,會經過一個上電時序,從復位狀態“脫離”出來;Boot Firmware是復位后第一個執行的…

使用 Effect 同步-09

有些組件需要與外部系統同步。例如&#xff0c;你可能希望根據 React state 控制非 React 組件、設置服務器連接或在組件出現在屏幕上時發送分析日志。Effects 會在渲染后運行一些代碼&#xff0c;以便可以將組件與 React 之外的某些系統同步。 簡單理解&#xff0c;就是需要操…

Python實現對Word文檔內容出現“重復標題”進行自動去重(4)

前言 本文是該專欄的第4篇,后面會持續分享Python辦公自動化干貨知識,記得關注。 在本專欄上一篇文章《Python實現對Word文檔內容出現“重復標題”進行自動去重(3)》中,筆者有詳細介紹使用python對word文檔內容的目標文本進行自動去重。只不過本文要介紹的“去重方法”與上…

計算機專業必考之計算機指令設計格式

計算機指令設計格式 例題&#xff1a; 1.設相對尋址的轉移指令占3個字節&#xff0c;第一字節為操作碼&#xff0c;第二&#xff0c;第三字節為相對偏移量&#xff0c; 數據在存儲器以低地址為字地址的存放方式。 每當CPU從存儲器取出一個字節時候&#xff0c;自動完成&…

正點原子[第二期]Linux之ARM(MX6U)裸機篇學習筆記-24.1,2 SPI驅動實驗-SPI協議介紹

前言&#xff1a; 本文是根據嗶哩嗶哩網站上“正點原子[第二期]Linux之ARM&#xff08;MX6U&#xff09;裸機篇”視頻的學習筆記&#xff0c;在這里會記錄下正點原子 I.MX6ULL 開發板的配套視頻教程所作的實驗和學習筆記內容。本文大量引用了正點原子教學視頻和鏈接中的內容。…

計算機組成原理易混淆知識點總結(持續更新)

目錄 1.機器字長&#xff0c;存儲字長與指令字長 2.指令周期,機器周期,時鐘周期 3.CPI,IPS,MIPS 4.翻譯程序和匯編程序 5.計算機體系結構和計算機組成的區別和聯系 6.基準程序執行得越快說明機器的性能越好嗎? 1.機器字長&#xff0c;存儲字長與指令字長 不同的機器三者…

AI智能體|扣子Coze文生圖功能接入微信公眾號

大家好&#xff0c;我是無界生長。 AI智能體&#xff5c;扣子Coze文生圖功能接入微信公眾號本文分享了如何將Coze平臺的文生圖功能接入微信公眾號的詳細操作流程&#xff0c;包括創建圖像流、創建并配置Bot、設置提示詞和開場白、調試、發布等步驟。如果看完還沒學會的話&…

網頁圖片加載慢的求解指南

網頁/圖片加載慢的求解指南 一、前言與問題描述 今天剛換上華為的HUAWEI AX3 Pro New&#xff0c;連上WIFI后測速雖然比平時慢&#xff0c;但是也不算太離譜&#xff0c;如下圖所示&#xff1a; 估計讀者們有也和作者一樣&#xff0c;還沒意識到事情的嚴重性&#x1f601;。 …

08Django項目--用戶管理系統--查(前后端)

對應視頻鏈接點擊直達 TOC 一些朋友加我Q反饋&#xff0c;希望有每個階段的完整項目代碼&#xff0c;那從今天開始&#xff0c;我會上傳完整的項目代碼。 用戶管理&#xff0c;簡而言之就是用戶的增刪改查。 08項目點擊下載&#xff0c;可直接運行&#xff08;含數據庫&…

PHP框架 Laravel

現在因為公司需求&#xff0c;需要新開一個Laravel框架的項目&#xff0c;毫無疑問&#xff0c;我又被借調過去了&#xff0c;最近老是被借調&#xff0c;有點陰郁&#xff0c;不過反觀來看&#xff0c;這也是好事&#xff0c;又可以復習和鞏固一下自己的知識點&#xff0c;接下…

大數據開發面試題【Spark篇】

115、Spark的任務執行流程 driver和executor&#xff0c;結構式一主多從模式&#xff0c; driver&#xff1a;spark的驅動節點&#xff0c;用于執行spark任務中的main方法&#xff0c;負責實際代碼的執行工作&#xff1b;主要負責&#xff1a;將代碼邏輯轉換為任務、在executo…