[Java實戰]Spring Boot整合Kafka:高吞吐量消息系統實戰(二十七)

[Java實戰]Spring Boot整合Kafka:高吞吐量消息系統實戰(二十七)

一、引言

Apache Kafka作為一款高吞吐量、低延遲的分布式消息隊列系統,廣泛應用于實時數據處理、日志收集和事件驅動架構。結合Spring Boot的自動化配置能力,可以快速搭建高性能消息系統。本文將從環境搭建、代碼實現、原理分析到測試優化,全面解析Spring Boot與Kafka的整合實踐。

二、環境準備

1. Kafka安裝與啟動

  1. 下載Kafka:從Apache Kafka官網下載最新版本(推薦3.x+)。
  2. 啟動Zookeeper(Kafka依賴):
    bin/zookeeper-server-start.sh config/zookeeper.properties
    
  3. 啟動Kafka服務
    bin/kafka-server-start.sh config/server.properties
    

2. 創建Topic

bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

說明:手動創建Topic可指定分區數(如3),提升并發處理能力。

三、環境準備(docker)

1. 使用Docker快速啟動Kafka

通過Docker可以快速部署Kafka服務,無需手動安裝依賴,步驟如下:

  1. 創建docker-compose.yml文件
    在項目根目錄下新建文件,內容如下:
    version: '3'
    services:zookeeper:image: docker.1ms.run/confluentinc/cp-zookeeper:7.4.0ports:- "2181:2181"environment:ZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_TICK_TIME: 2000kafka:image: docker.1ms.run/confluentinc/cp-kafka:7.4.0ports:- "9092:9092"environment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.231.132:9092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"  # 禁止自動創建Topicdepends_on:- zookeeper
    

    關鍵配置說明

    • KAFKA_ADVERTISED_LISTENERS: 確保客戶端能通過localhost:9092訪問Kafka。
  • KAFKA_AUTO_CREATE_TOPICS_ENABLE: 設為false避免自動創建Topic,推薦手動控制。
  1. 啟動Kafka服務
    執行以下命令啟動服務:
    docker-compose up -d#停掉
    #docker-compose down
    

2. 創建Topic

通過Docker執行命令創建Topic:

docker exec -it kafka-kafka-1 kafka-topics --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

注意

  • kafka-kafka-1為容器名稱(根據實際名稱調整)。
  • --partitions 3指定分區數,提升并發處理能力。

3.安裝成功截圖

在這里插入圖片描述

四、Spring Boot項目搭建

1. 添加依賴

pom.xml中引入Spring Kafka:

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

2. 配置文件

application.yml配置Kafka連接及序列化方式:

spring:kafka:bootstrap-servers: localhost:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: my-groupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

關鍵參數auto-offset-reset: earliest確保消費者從最早消息開始消費。

五、代碼實現

1. 生產者配置

@Service
public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;// 發送消息(支持回調)public void sendMessage(String topic, String message) {ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);future.addCallback(result -> {System.out.println("發送成功: " + result.getRecordMetadata().offset());}, ex -> {System.out.println("發送失敗: " + ex.getMessage());});}
}

高級特性:回調機制可監控消息發送狀態。

2. 消費者配置

@Service
public class KafkaConsumer {@KafkaListener(topics = "my-topic", groupId = "my-group")public void consume(String message) {System.out.println("接收到消息: " + message);// 業務處理邏輯}
}

批量消費:通過設置spring.kafka.consumer.max-poll-records可支持批量處理。

3.測試結果

KafkaController編寫:

@RestController
public class KafkaController {@Autowiredprivate KafkaProducerService kafkaProducer;@PostMapping("/send")public ResponseEntity<String> sendMs(@RequestBody String request) {kafkaProducer.sendMessage("my-topic","你好");return ResponseEntity.ok("ok");}
}

測試結果:

在這里插入圖片描述

六、原理分析

1. Spring Kafka核心組件

  • KafkaTemplate:封裝生產者操作,支持異步發送和事務管理。
  • @KafkaListener:基于監聽器模式,自動創建消費者并訂閱Topic。
  • ConsumerFactory/ProducerFactory:工廠類管理Kafka客戶端配置。

2. 高吞吐量優化

  • 生產者端:調整batch.size(批次大小)和linger.ms(等待時間)提升批量發送效率。
  • 消費者端:增加分區數、配置多線程消費(ConcurrentKafkaListenerContainerFactory)。

七、高級特性

1. 自定義分區策略

實現Partitioner接口,指定消息路由規則:

public class CustomPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 自定義分區邏輯(如按Key哈希)return key.hashCode() % cluster.partitionCountForTopic(topic);}
}

配置文件中指定分區器:

spring:kafka:producer:properties:partitioner.class: com.example.CustomPartitioner

2. 事務支持

通過KafkaTransactionManager實現事務消息:

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;public void sendInTransaction() {kafkaTemplate.executeInTransaction(operations -> {operations.send("topic1", "Message1");operations.send("topic2", "Message2");return null;});
}

八、測試步驟

1. 單元測試(使用嵌入式Kafka)

添加測試依賴:

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope>
</dependency>

編寫測試類:

@SpringBootTest
@EmbeddedKafka(topics = "test-topic")
public class KafkaTest {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@Testpublic void testSendAndReceive() {kafkaTemplate.send("test-topic", "Hello Kafka");// 通過監聽器驗證消息接收}
}

說明:嵌入式Kafka無需外部服務,適合CI/CD環境。

九、總結

本文從環境搭建到代碼實現,結合Spring Boot與Kafka的高吞吐量特性,實現了消息系統的快速開發。通過自定義分區、事務支持和批量消費等高級功能,可進一步優化系統性能。實際應用中需根據業務場景調整參數,并借助監控工具(如Kafka Manager)持續優化。

參考文檔

  • Spring Kafka官方文檔
  • Apache Kafka架構解析

希望本教程對您有幫助,請點贊??收藏?關注支持!歡迎在評論區留言交流技術細節!

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/906397.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/906397.shtml
英文地址,請注明出處:http://en.pswp.cn/news/906397.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Kotlin Multiplatform--04:經驗總結(持續更新)

Kotlin Multiplatform--04&#xff1a;經驗總結&#xff08;持續更新&#xff09; 引言 引言 本章用來記載筆者開發過程中的一些經驗總結 一、Ktor設置Header 在官方文檔中&#xff0c;想要設置Header的示例代碼如下&#xff1a; client.get("https://ktor.io&qu…

在 Ubuntu 系統中,將 JAR 包安裝為服務

在 Ubuntu 系統中&#xff0c;將 JAR 包安裝為服務可以通過 systemd 來實現。以下是詳細的操作步驟&#xff1a; 準備工作 確保 JAR 文件路徑和 Java 運行時環境已準備好。驗證 Java 是否可用&#xff1a; java -version創建 systemd 服務文件 systemd 的服務文件通常位于 …

電商項目-商品微服務-品牌管理微服務開發

一、功能分析 品牌管理微服務包括&#xff1a; &#xff08;1&#xff09;查詢全部列表數據 &#xff08;2&#xff09;根據ID查詢實體數據 &#xff08;3&#xff09;增加 &#xff08;4&#xff09;修改 &#xff08;5&#xff09;刪除 &#xff08;6&#xff09;分頁…

Spring Boot開發—— 整合Lucene構建輕量級毫秒級響應的全文檢索引擎

文章目錄 一、為什么選擇 Lucene?輕量級搜索的底層密碼二、核心原理:Lucene 的倒排索引2.1 倒排索引:速度之源2.2 段合并優化策略三、Spring Boot集成Lucene實戰3.1 依賴配置3.2 實體與索引設計3.3 核心索引服務(含異常處理)3.4 使用示例(測試類)四、高級優化技巧4.1 索…

SpringBootDay1|面試題

目錄 一、springboot框架 1、什么是springboot 2、Spring Boot的主要優點 3、springboot核心注解 4、定義banner&#xff08;springboot的logo&#xff09; 5、springboot配置文件 6、springboot 整合 jdbc 二、面試題 1&#xff09;springmvc的作用 ?編輯 2&#x…

jQuery Ajax中dataType 和 content-type 參數的作用詳解

jQuery Ajax中dataType與contentType參數解析 一、核心概念對比 參數作用對象數據類型默認值dataType響應數據預期接收的數據格式jQuery自動判斷&#xff08;根據響應頭MIME類型&#xff09;contentType請求數據發送數據的編碼格式application/x-www-form-urlencoded 二、da…

幾款常用的虛擬串口模擬器

幾款常用的虛擬串口模擬器&#xff08;Virtual Serial Port Emulator&#xff09;&#xff0c;適用于 Windows 系統&#xff0c;可用于開發和調試串口通信應用&#xff1a; 1. com0com (開源免費) 特點&#xff1a; 完全開源免費&#xff0c;無功能限制。 可創建多個虛擬串口…

LLM筆記(六)線性代數

公式速查表 1. 向量與矩陣&#xff1a;表示、轉換與知識存儲的基礎 向量表示 (Vectors): 語義的載體 在LLM中&#xff0c;向量 x ∈ R d \mathbf{x}\in\mathbb{R}^d x∈Rd 是信息的基本單元&#xff0c;承載著豐富的語義信息&#xff1a; 詞嵌入向量 (Word Embeddings)&am…

[特殊字符] Word2Vec:將詞映射到高維空間,它到底能解決什么問題?

一、在 Word2Vec 之前,我們怎么處理語言? 在 Word2Vec 出現之前,自然語言處理更多是“工程方法”,例如字符串匹配、關鍵詞提取、正則規則...。但這些表示通常缺乏語義,詞與詞之間看不出任何聯系以及非常淺顯。當然,技術沒有好壞,只有適合的場景。例如: 關鍵詞匹配非常…

棧和隊列的模擬實現

棧和隊列的模擬實現 容器適配器priority_queue(優先級隊列&#xff09;priority_queue的使用priority_queue的模擬實現&#xff1a; 仿函數什么叫仿函數&#xff1f;需要自己實現仿函數的情況&#xff1a; 棧的模擬實現隊列的模擬實現deque&#xff08;vector和list的縫合怪&am…

idea本地debug斷點小技巧

idea本地debug斷點小技巧 簡單的設置斷點條件 斷點后&#xff0c;右鍵這個斷點&#xff0c;可以在 condition 中填寫能得出布爾的表達式 a 1 你如果寫如下&#xff0c;表示先給他賦值&#xff0c;然后斷住 a 2; true 斷點后設置某個變量的值 在 debug 區域可以設置變量…

Oracle中如何解決FREE BUFFER WAITS

基于性能上的考慮&#xff0c;服務器進程在掃描LRU主列的同時&#xff0c;會將臟塊移至LRU-W列&#xff0c;如果發現沒有足夠可用&#xff08;可替換&#xff09;的BUFFER CACHE&#xff0c;進程并不會無止盡地掃描整條LRU主列&#xff0c;而是在掃描到某個閥值&#xff08;該閥…

Git命令使用全攻略:從創建分支到合并的完整流程

Git命令使用全攻略&#xff1a;從創建分支到合并的完整流程 引言一、初始化項目與基礎配置1.1 克隆遠程倉庫1.2 查看當前分支狀態 二、創建與管理分支2.1 從main分支創建新功能分支2.2 查看分支列表2.3 提交代碼到新分支2.4 推送分支到GitHub 三、版本發布與標簽管理3.1 創建輕…

MATLAB跳動的愛心

520&#xff0c;一個會動的心~~~ function particleHeart2 % author : slandarer% 所需匿名函數 col1Func(n) repmat([255,158,196]./255,[n,1])repmat([-39,-81,-56]./255,[n,1]).*rand([n,1]); col2Func(n) repmat([118,156,216]./255,[n,1])repmat([137,99,39].*.1./255,[n,…

Go的單測gomock及覆蓋率命令

安裝gomock&#xff1a; go get github.com/golang/mock/gomockgo get github.com/golang/mock/mockgen 使用 mockgen 生成 mock 代碼: 參考 mockgen -sourceservice/user.go -destinationservice /mocks/mock_user_service.go -packagemocks go test -coverprofilecoverage.ou…

vue添加loading后修復頁面渲染問題

問題&#xff1a;想要通過選擇流程&#xff08;1&#xff09;后加載出角色信息&#xff08;2&#xff09; 選擇后無法展示經過排查&#xff0c;再調用接口給角色數組賦值后&#xff0c;頁面在接口調用完之前就已經渲染完成。接口是采用的異步加載解決&#xff1a;loadingRoles…

Python入門手冊:Python簡介,什么是Python

在當今數字化時代&#xff0c;編程語言猶如一把把神奇的鑰匙&#xff0c;能夠開啟通往技術世界的大門。而Python&#xff0c;無疑是其中最閃耀的一顆明星。今天&#xff0c;就讓我們一起走進Python的世界&#xff0c;從它的起源、應用領域以及優缺點三個方面&#xff0c;來全面…

用PyTorch在超大規模下訓練深度學習模型:并行策略全解析

我猜咱們每個人肯定都累壞了&#xff0c;天天追著 LLM 研究社區跑&#xff0c;感覺每天都冒出個新的最牛模型&#xff0c;把之前的基準都給打破了呢。要是你好奇為啥創新速度能這么快&#xff0c;那主要就是研究人員能夠在超大規模下訓練和驗證模型啦&#xff0c;這全靠并行計算…

提示工程(Prompt Engineering)應用技巧

Prompt&#xff08;提示&#xff09;就是用戶與大模型交互輸入的代稱。即我們給大模型的輸入稱為 Prompt&#xff0c;而大模型返回的輸出一般稱為 Completion。 Prompt 需要清晰明確地表達需求&#xff0c;提供充足上下文&#xff0c;使語言模型能夠準確理解我們的意圖。更長、…

[原創](現代Delphi 12指南):[macOS 64bit App開發]: 如何獲取目錄大小?

[作者] 常用網名: 豬頭三 出生日期: 1981.XX.XX 企鵝交流: 643439947 個人網站: 80x86匯編小站 編程生涯: 2001年~至今[共24年] 職業生涯: 22年 開發語言: C/C++、80x86ASM、Object Pascal、Objective-C、C#、R、Python、PHP、Perl、 開發工具: Visual Studio、Delphi、XCode、…