【微服務】SpringBoot 整合Kafka 項目實戰操作詳解

目錄

一、前言

二、Kafka 介紹

2.1 什么是 Apache Kafka

2.2 Kafka 核心概念與架構

2.3 Kafka 為什么如此強大

2.4 Kafka 在微服務領域的應用場景

三、Docker 部署Kakfa服務

3.1 環境準備

3.2 Docker部署Kafka操作過程

3.2.1 創建docker網絡

3.2.2 啟動zookeeper容器

3.2.3 啟動kafka容器

3.3 kafka使用效果驗證

四、SpringBoot整合Kafka完整過程

4.1 前置準備

4.1.1 環境依賴

4.1.2 導入核心依賴

4.1.3 添加配置文件

4.2 代碼整合過程

4.2.1 增加消息傳遞對象類

4.2.2 創建發送消息的工具類

4.2.3 創建消費者

4.2.4 增加一個操作topic的工具類

4.2.5 創建測試接口

4.2.6 效果驗證

五、寫在文末


一、前言

kafka在眾多的領域中都有著廣泛的使用。作為一款久經考驗性能強勁的消息中間件,在大數據、微服務、電商、金融等眾多領域的IT系統中承擔著重要的角色。利用kafka的高吞吐、高性能等特點,應用程序很容易進行適合高并發的架構拓展設計,為架構優化、系統性能提升、應用程序解耦等場景提供了有力的支撐。在微服務領域,kafka的應用,可以讓微服務的設計能夠應對更多復雜的業務場景,本文以SpringBoot為例,詳細介紹如何在SpringBoot的微服務項目中集成和使用kafka。

二、Kafka 介紹

2.1 什么是 Apache Kafka

Apache Kafka 最初由 LinkedIn 開發,并于 2011 年開源,現已發展成為一款開源的分布式事件流平臺。它的核心設計目標是能夠高效地處理實時數據流,具備高吞吐量、可擴展性、持久性和容錯性。官網:Apache Kafka

你可以把它理解為一個高度耐用、永不丟失的“消息隊列”,但它的能力遠不止于此。它更像是一個中央神經系統,用于連接不同應用程序、系統和數據源,讓數據能夠以流的形式在其中可靠地流動。

2.2 Kafka 核心概念與架構

要理解 Kafka,首先需要了解幾個核心概念:

  1. Topic(主題):數據的類別或Feed名稱。消息總是被發布到特定的 Topic,好比數據庫中的表。

  2. Producer(生產者):向 Topic 發布消息的客戶端應用程序。

  3. Consumer(消費者):訂閱 Topic 并處理發布的消息的客戶端應用程序。

  4. Broker(代理):一個 Kafka 服務器就是一個 Broker。一個 Kafka 集群由多個 Broker 組成,以實現高可用和負載均衡。

  5. Partition(分區):每個 Topic 可以被分成多個 Partition。分區是 Kafka 實現水平擴展和并行處理的基礎。消息在分區內是有序的(但跨分區不保證順序)。

  6. Consumer Group(消費者組):由多個消費者實例組成,共同消費一個 Topic。同一個分區只會被分配給同一個消費者組內的一個消費者,從而實現“負載均衡”和“Scale-Out”(橫向擴展)的消費模式。

  7. Offset(偏移量):消息在分區中的唯一標識。消費者通過管理 Offset 來追蹤自己消費到了哪里,即使重啟也不會丟失位置。

2.3 Kafka 為什么如此強大

  • 高吞吐量:即使使用普通的硬件,也能支持每秒數十萬甚至百萬級的消息處理。

  • 可擴展性:通過簡單地增加 Broker 和分區,可以輕松擴展集群,處理更大的數據量。

  • 持久性與可靠性:消息被持久化到磁盤,并且通過副本機制(Replication)在多臺服務器上進行備份,防止數據丟失。

  • 實時性:消息產生后立刻可供消費,延遲極低,是真正的實時流處理平臺。

2.4 Kafka 在微服務領域的應用場景

在微服務架構中,服務被拆分為多個小型、獨立的單元。這些服務之間需要通信和協作,而 Kafka 正是實現這種松耦合、異步通信的理想選擇。具體來說,Kafka 可以在下面的場景中使用。

1)服務間異步通信(解耦)

  • 場景:訂單服務創建訂單后,需要通知庫存服務扣減庫存、通知用戶服務發送短信、通知分析服務更新統計數據。

  • 傳統問題:如果使用同步 HTTP 調用(如 REST),訂單服務需要等待所有調用成功后才能返回,導致響應慢,且任何一個下游服務故障都會導致整個操作失敗(緊耦合)。

  • Kafka 方案:訂單服務只需將一條 OrderCreated 事件發送到 Kafka 的 orders Topic,然后就可以立即返回響應。庫存、用戶、分析等服務作為消費者,各自獨立地從該 Topic 拉取消息并進行處理。實現了服務的徹底解耦:訂單服務不關心誰消費、何時消費、消費是否成功。

2)事件溯源(Event Sourcing)與 CQRS

  • 事件溯源:不存儲對象的當前狀態,而是存儲所有改變狀態的事件序列。Kafka 的持久化日志特性使其成為存儲這些事件的完美“事件存儲”。

  • CQRS(命令查詢職責分離):寫模型(命令端)在處理完命令后,將領域事件發布到 Kafka。讀模型(查詢端)訂閱這些事件,并更新自己的物化視圖(如 Elasticsearch、Redis 中的查詢專用數據)。這極大地提高了系統的查詢性能和靈活性。

3)流處理與實時數據管道

使用 Kafka Streams 或 ksqlDB 這樣的流處理庫,可以直接在微服務中構建復雜的實時數據處理邏輯。

場景:實時監控用戶點擊流、實時計算儀表盤、實時風控、實時推薦。

4)日志聚合(Log Aggregation)

將多個微服務的日志集中收集到 Kafka 中,然后再導入到 ELK(Elasticsearch, Logstash, Kibana)或 Splunk 等中央日志系統中進行分析和查詢。Kafka 作為緩沖層,可以應對日志量的突發高峰,防止沖垮后端的日志存儲系統。

5)小結:

在 Java 微服務架構中,Apache Kafka 遠不止一個消息隊列。它還具備下面多種角色:

  • 服務的粘合劑:以事件驅動的方式連接孤立的微服務,實現高度解耦和彈性。

  • 實時數據流的中心樞紐:所有重要的業務事件都流經此處,為構建實時應用提供數據基礎。

  • 流處理平臺:允許開發者直接在微服務中編寫實時數據處理邏輯。

其與 Spring Boot 等主流 Java 框架的無縫集成,使得它成為構建現代、可擴展、高響應的 Java 微服務系統時不可或缺的基礎設施。選擇 Kafka,意味著為你的微服務系統選擇了面向未來的架構模式。

三、Docker 部署Kakfa服務

為了后面在工程中對接與使用Kafka,需要提前準備一個Kafka的服務,下面使用docker快速部署一個Kakfa,參考下面的過程完成基于Docker環境部署kafka的操作流程。

3.1 環境準備

服務器推薦:2核4G(至少),提前在服務器安裝好docker環境

3.2 Docker部署Kafka操作過程

3.2.1 創建docker網絡

搭配zookeeper進行使用,這個是部署kafka比較經典的方式,為了更好的讓kafka與zookeeper交互,提前創建一個docker網絡

docker network create kafka-net

3.2.2 啟動zookeeper容器

使用下面的命令啟動zookeeper容器

docker run -d \--name zookeeper_01 \--network kafka-net \-p 12179:2181 \-e ZOO_TICK_TIME=2000 \zookeeper:latest

參數解釋:

  • -d: 后臺運行容器。

  • --name: 為容器指定一個名稱。

  • --network: 加入創建的 kafka-net 網絡。

  • -p 12179:2181: 將容器的 12179端口映射到宿主機的 2181 端口。

  • -e ZOO_TICK_TIME=2000: 設置 ZooKeeper 的基本時間單位(毫秒)。

3.2.3 啟動kafka容器

使用下面的命令啟動kafka容器

docker run -d \
--name kafka_01 \
-p 19091:9092 \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ZOOKEEPER_CONNECT=服務器公網IP:12179 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://服務器公網IP:19091 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
wurstmeister/kafka:latest

docker 參數說明:

  • KAFKA_ADVERTISED_LISTENERS: 非常重要。Broker 發布給客戶端(生產者、消費者)的連接地址。如果客戶端在宿主機外,需替換 localhost 為宿主機 IP。

  • KAFKA_LISTENERS: Broker 實際監聽的地址和協議,0.0.0.0 表示監聽所有網絡接口。

  • KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 設置內部偏移量主題的副本因子,單機設為 1 即可。

3.3 kafka使用效果驗證

kafka的服務搭建完成之后,接下來驗證下是否可以先通過客戶端操作命令正常使用topic進行收發消息。

1)進入 Kafka 容器:

docker exec -it kafka_01 /bin/bash

2)創建一個topic

kafka-topics.sh --create --zookeeper 公網IP:12179 --replication-factor 1 --partitions 1 --topic test-topic

3)啟動生產者窗口

開啟一個生產者窗口,嘗試往上面的topic中發送消息

kafka-console-producer.sh --broker-list 公網IP:19091 --topic test-topic

看到下面的效果說明生產端連接上了

4)啟動消費者窗口

開啟一個新的消費者窗口,嘗試從上面的topic中接收消息

kafka-console-consumer.sh --bootstrap-server 公網IP:19091 --topic test-topic --from-beginning

看到下面的效果說明消費端接收消息就緒了

5)發送消息

接下來從生產者窗口發送一條消息,可以看到消息能夠正常的發送出去,同時消費端也能接收到消息

四、SpringBoot整合Kafka完整過程

接下來通過案例操作詳細介紹下如何在springboot項目中整合與使用kafka。

4.1 前置準備

4.1.1 環境依賴

首先確保你的開發環境滿足以下要求:

  • Java 17+ (Spring Boot 3 需要 Java 17 或更高版本)

  • Apache Kafka (本地安裝或使用 Docker 容器)

  • Maven 或 Gradle 構建工具

4.1.2 導入核心依賴

提前創建一個springboot工程,并導入如下核心依賴

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>mcp-client</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>17</maven.compiler.source><maven.compiler.target>17</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.3.3</version><relativePath/></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Mysql Connector --><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId></dependency><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>3.0.3</version></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.5</version><exclusions><exclusion><groupId>org.mybatis</groupId><artifactId>mybatis-spring</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>com.github.ulisesbocchio</groupId><artifactId>jasypt-spring-boot-starter</artifactId><version>3.0.5</version> <!-- 建議使用最新版本 --></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><finalName>${project.artifactId}</finalName><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

4.1.3 添加配置文件

在工程的yml配置文件中增加下面有關kafka的信息

server:port: 8082spring:kafka:bootstrap-servers: IP:9092#生產者配置producer:retries: 3batch-size: 16384 # 批量處理大小buffer-memory: 33554432 # 生產者緩沖內存大小acks: all # 消息確認機制key-serializer: org.apache.kafka.common.serialization.StringSerializer#value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # JSON類型的序列化value-serializer: org.apache.kafka.common.serialization.StringSerializer  # 字符串類型的序列化#消費者配置consumer:group-id: consumer-group # 消費者組IDauto-offset-reset: earliest # 當無初始offset或offset失效時的處理方式enable-auto-commit: false # 關閉自動提交偏移量key-deserializer: org.apache.kafka.common.serialization.StringDeserializer#value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer # JSON類型的反序列化value-deserializer: org.apache.kafka.common.serialization.StringDeserializer    # 字符串類型的序列化#需要搭配對象類型的序列化一起使用#properties:#spring.json.trusted.packages: "*"listener:ack-mode: MANUAL_IMMEDIATE # 手動立即提交偏移量:cite[4]mybatis-plus:# 不支持多包, 如有需要可在注解配置 或 提升掃包等級# 例如 com.**.**.mappermapperPackage: com.congge.mapper# 對應的 XML 文件位置mapperLocations: classpath*:mapper/**/*Mapper.xml# 實體掃描,多個package用逗號或者分號分隔typeAliasesPackage: com.congge.entityglobal-config:dbConfig:# 主鍵類型# AUTO 自增 NONE 空 INPUT 用戶輸入 ASSIGN_ID 雪花 ASSIGN_UUID 唯一 UUID# 如需改為自增 需要將數據庫表全部設置為自增idType: ASSIGN_ID# 邏輯已刪除值(默認為 1)logic-delete-value: 1# 邏輯未刪除值(默認為 0)logic-not-delete-value: 0

4.2 代碼整合過程

參考下面的操作過程完成代碼的整合。

4.2.1 增加消息傳遞對象類

自定義一個消息對象,用于承載某些復雜場景下對傳遞消息的要求

package com.congge.kafka;import java.time.LocalDateTime;public class MessageData {private Long id;private String content;private LocalDateTime timestamp;// 構造方法public MessageData() {}public MessageData(Long id, String content, LocalDateTime timestamp) {this.id = id;this.content = content;this.timestamp = timestamp;}// Getter和Setter方法public Long getId() {return id;}public void setId(Long id) {this.id = id;}public String getContent() {return content;}public void setContent(String content) {this.content = content;}public LocalDateTime getTimestamp() {return timestamp;}public void setTimestamp(LocalDateTime timestamp) {this.timestamp = timestamp;}@Overridepublic String toString() {return "MessageData{" +"id=" + id +", content='" + content + '\'' +", timestamp=" + timestamp +'}';}}

4.2.2 創建發送消息的工具類

創建一個工具類用于發送消息,為了后續使用方便,該類作為spring bean配置到IOC容器中,其他類需要使用的時候直接注入即可。

  • 該類中定義了多個發送消息的方法,可用于發送不同類型的消息,比如比較常用的字符串消息,對象消息等,可以根據實際的業務場景選型使用;

package com.congge.utils;import com.congge.kafka.MessageData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;import java.time.LocalDateTime;
import java.util.concurrent.CompletableFuture;@Service
public class KafkaProducerService {private static final Logger logger = LoggerFactory.getLogger(KafkaProducerService.class);private static final String TOPIC_NAME = "test-topic";private final KafkaTemplate<String, Object> kafkaTemplate;public KafkaProducerService(KafkaTemplate<String, Object> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}/*** 發送字符串消息*/public void sendStringMessage(String message) {CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_NAME, message);future.whenComplete((result, ex) -> {if (ex == null) {logger.info("Sent message=[{}] with offset=[{}]", message, result.getRecordMetadata().offset());} else {logger.error("Unable to send message=[{}] due to: {}", message, ex.getMessage());}});}/*** 發送對象消息(JSON格式)*/public void sendObjectMessage(Long id, String content) {MessageData messageData = new MessageData(id, content, LocalDateTime.now());CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_NAME, messageData);future.whenComplete((result, ex) -> {if (ex == null) {logger.info("Sent message=[{}] with offset=[{}]", messageData, result.getRecordMetadata().offset());} else {logger.error("Unable to send message=[{}] due to: {}", messageData, ex.getMessage());}});}/*** 發送帶鍵的消息*/public void sendMessageWithKey(String key, String message) {CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_NAME, key, message);future.whenComplete((result, ex) -> {if (ex == null) {logger.info("Sent message=[{}] with key=[{}] and offset=[{}]",message, key, result.getRecordMetadata().offset());} else {logger.error("Unable to send message=[{}] with key=[{}] due to: {}",message, key, ex.getMessage());}});}
}

4.2.3 創建消費者

創建一個服務類,專門用于消費 Kafka 主題中的消息

  • 在實際開發中,建議不同的業務使用各自的監聽器的類,避免業務重度耦合

package com.congge.utils;import com.congge.kafka.MessageData;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumerService {private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerService.class);/*** 消費字符串消息*/@KafkaListener(topics = "test-topic", groupId = "consumer-group")public void consumeStringMessage(String message,Acknowledgment acknowledgment,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,@Header(KafkaHeaders.OFFSET) long offset) {try {logger.info("Received string message: [{}] from topic: {}, partition: {}, offset: {}",message, topic, partition, offset);// 業務處理邏輯processMessage(message);// 手動提交偏移量acknowledgment.acknowledge();logger.info("Acknowledged message from topic: {}, partition: {}, offset: {}",topic, partition, offset);} catch (Exception e) {logger.error("Error processing message: {}", message, e);// 可根據業務需求決定是否重試或將消息發送到DLT(死信主題)}}/*** 消費對象消息(JSON格式)*/@KafkaListener(topics = "example-topic", groupId = "object-consumer-group")public void consumeObjectMessage(MessageData messageData,Acknowledgment acknowledgment,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,@Header(KafkaHeaders.OFFSET) long offset) {try {logger.info("Received object message: [{}] from topic: {}, partition: {}, offset: {}",messageData, topic, partition, offset);// 業務處理邏輯processObjectMessage(messageData);// 手動提交偏移量acknowledgment.acknowledge();logger.info("Acknowledged message from topic: {}, partition: {}, offset: {}",topic, partition, offset);} catch (Exception e) {logger.error("Error processing message: {}", messageData, e);}}/*** 消費帶鍵的消息*/@KafkaListener(topics = "example-topic", groupId = "key-consumer-group")public void consumeMessageWithKey(ConsumerRecord<String, String> record,Acknowledgment acknowledgment) {try {logger.info("Received message with key: [{}], value: [{}] from topic: {}, partition: {}, offset: {}",record.key(), record.value(), record.topic(), record.partition(), record.offset());// 業務處理邏輯processMessageWithKey(record.key(), record.value());// 手動提交偏移量acknowledgment.acknowledge();logger.info("Acknowledged message with key: [{}] from topic: {}, partition: {}, offset: {}",record.key(), record.topic(), record.partition(), record.offset());} catch (Exception e) {logger.error("Error processing message with key: {}", record.key(), e);}}private void processMessage(String message) {// 實現你的業務邏輯logger.info("Processing message: {}", message);}private void processObjectMessage(MessageData messageData) {// 實現你的業務邏輯logger.info("Processing object message: {}", messageData);}private void processMessageWithKey(String key, String value) {// 根據鍵處理消息的邏輯logger.info("Processing message with key: {} and value: {}", key, value);}
}

4.2.4 增加一個操作topic的工具類

增加一個操作topic的類,用于手動創建topic

package com.congge.kafka;import org.apache.kafka.clients.admin.*;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.stereotype.Service;import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;@Service
public class KafkaTopicService {private final AdminClient adminClient;public KafkaTopicService(KafkaAdmin kafkaAdmin) {this.adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties());}/*** 創建單個Topic*/public void createTopic(String topicName, int partitions, short replicationFactor) {NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);createTopic(newTopic);}/*** 創建Topic(使用NewTopic對象)*/public void createTopic(NewTopic newTopic) {CreateTopicsResult result = adminClient.createTopics(Collections.singleton(newTopic));try {result.all().get(); // 等待創建完成} catch (InterruptedException | ExecutionException e) {Thread.currentThread().interrupt();throw new RuntimeException("Failed to create topic: " + newTopic.name(), e);}}/*** 檢查Topic是否存在*/public boolean topicExists(String topicName) {try {ListTopicsResult topics = adminClient.listTopics();Set<String> topicNames = topics.names().get();return topicNames.contains(topicName);} catch (InterruptedException | ExecutionException e) {Thread.currentThread().interrupt();throw new RuntimeException("Failed to check topic existence: " + topicName, e);}}/*** 獲取Topic詳情*/public TopicDescription getTopicDescription(String topicName) {try {Map<String, TopicDescription> descriptions =adminClient.describeTopics(Collections.singleton(topicName)).all().get();return descriptions.get(topicName);} catch (InterruptedException | ExecutionException e) {Thread.currentThread().interrupt();throw new RuntimeException("Failed to get topic description: " + topicName, e);}}/*** 獲取所有Topic列表*/public Set<String> listAllTopics() {try {ListTopicsResult topics = adminClient.listTopics();return topics.names().get();} catch (InterruptedException | ExecutionException e) {Thread.currentThread().interrupt();throw new RuntimeException("Failed to list topics", e);}}/*** 安全關閉AdminClient*/public void close() {if (adminClient != null) {adminClient.close();}}
}

4.2.5 創建測試接口

為了方便驗證效果,這里創建一個接口,通過接口發送消息到kafka的topic中,然后通過監聽器到topic中的消息就算成功。參考下面的代碼。

package com.congge.kafka;import com.congge.utils.KafkaProducerService;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;@RestController
@RequestMapping("/api/kafka")
public class KafkaController {private final KafkaProducerService kafkaProducerService;private final KafkaTopicService topicService;public KafkaController(KafkaProducerService kafkaProducerService,KafkaTopicService topicService) {this.kafkaProducerService = kafkaProducerService;this.topicService = topicService;}//localhost:8082/api/kafka/send-string?message=test producer@GetMapping("/send-string")public String sendStringMessage(@RequestParam String message) {kafkaProducerService.sendStringMessage(message);return "String message sent: " + message;}//localhost:8082/api/kafka/send-object?id=1&content=test producer@GetMapping("/send-object")public String sendObjectMessage(@RequestParam Long id, @RequestParam String content) {kafkaProducerService.sendObjectMessage(id, content);return "Object message sent with ID: " + id + " and content: " + content;}@GetMapping("/send-with-key")public String sendMessageWithKey(@RequestParam String key, @RequestParam String message) {kafkaProducerService.sendMessageWithKey(key, message);return "Message with key sent: " + key + " - " + message;}/*** 創建Topic* localhost:8082/api/kafka/create-topic?topicName=eva&partitions=1&replicationFactor=2**/@GetMapping("/create-topic")public ResponseEntity<String> createTopic(@RequestParam String topicName,@RequestParam(defaultValue = "1") int partitions,@RequestParam(defaultValue = "1") short replicationFactor) {if (topicService.topicExists(topicName)) {return ResponseEntity.badRequest().body("Topic already exists: " + topicName);}try {topicService.createTopic(topicName, partitions, replicationFactor);return ResponseEntity.ok("Topic created successfully: " + topicName);} catch (Exception e) {return ResponseEntity.internalServerError().body("Failed to create topic: " + e.getMessage());}}
}

4.2.6 效果驗證

1)創建一個測試使用的topic

如果你已經提前通過命令行工具創建過topic了的話,就不用執行接口創建了,這里創建一個名為zcy-test的topic,調用上面的創建接口

2)發送字符串消息

調用發消息接口,發送成功后

然后在監聽器中監聽到了,實際業務中,你要處理的業務邏輯就在監聽器中進行處理

3)發送對象消息

在實際業務中,為了更方便的對消息進行處理,發送對象消息也是很常用的一種方式,參考下面的接口示例

然后在基于對象方式的監聽器中就可以看到了

五、寫在文末

本文比較詳細的介紹了如何在SpringBoot集成和使用Kakfa,并通過案例詳細演示了其使用過程,更深入的技術點有興趣的同學可以基于此繼續深入研究,本篇到此結束,感謝觀看。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/96894.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/96894.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/96894.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

多樓層室內定位可視化 Demo(A*路徑避障)

<!DOCTYPE html> <html lang"en"> <head> <meta charset"UTF-8"> <title>多樓層室內定位可視化 Demo&#xff08;A*避障&#xff09;</title> <style>body { margin: 0; overflow: hidden; }#layerControls { p…

vue2+jessibuca播放h265視頻(能播h264)

文檔地址&#xff1a;http://jessibuca.monibuca.com/api.html#background 1,文件放在public中 2,在html中引入 3&#xff0c;子組件 <template><div :id"container id"></div> </template><script> export default {props: [url,…

Docker命令大全:從基礎到高級實戰指南

Docker命令大全&#xff1a;從基礎到高級實戰指南 Docker作為現代容器化技術的核心工具&#xff0c;其命令體系是開發運維的必備技能。本文將系統整理常用命令&#xff0c;助您高效管理容器生態。一、基礎命令篇 1. 鏡像管理 # 拉取鏡像 $ docker pull nginx:latest# 查看本地鏡…

不鄰排列:如何優雅地避開“數字CP“

排列組合奇妙冒險&#xff1a;如何優雅地避開"數字CP"&#xff1f; ——容斥原理教你破解連續數對排列難題 &#x1f4dc; 問題描述 題目&#xff1a;求1,2,3,4,5,6,7,81,2,3,4,5,6,7,81,2,3,4,5,6,7,8的排列個數&#xff0c;使得排列中不出現連續的12,23,34,45,56,6…

S7-200 SMART PLC 安全全指南:配置、漏洞解析與復現防護

在工業自動化領域&#xff0c;PLC&#xff08;可編程邏輯控制器&#xff09;作為核心控制單元&#xff0c;其安全性直接關系到生產系統的穩定運行與數據安全。西門子 S7-200 SMART 系列 PLC 憑借高性價比、易用性等優勢&#xff0c;廣泛應用于中小型自動化項目。但實際使用中&a…

【計算機網絡 | 第14篇】應用層協議

文章目錄 應用層協議的核心定義&#xff1a;“通信合同”的關鍵內容&#x1f95d;應用層協議的分類&#xff1a;公共標準 vs 專有協議&#x1f9fe;公共標準協議專有協議 應用層協議與網絡應用的關系&#x1f914;案例1&#xff1a;Web應用案例2&#xff1a;Netflix視頻服務 應…

小迪web自用筆記33

再次提到預編譯&#xff0c;不會改變固定邏輯。id等于什么的只能更換頁面。過濾器&#xff1a;代碼一旦執行在頁面中&#xff0c;就會執行&#xff0c;xss跨站。Js的特性是顯示在頁面中之后開始執行&#xff0c;那個代碼是打印過后然后再渲染。是的&#xff0c;核心是**“打印&…

Zynq開發實踐(FPGA之第一個vivado工程)

【 聲明&#xff1a;版權所有&#xff0c;歡迎轉載&#xff0c;請勿用于商業用途。 聯系信箱&#xff1a;feixiaoxing 163.com】數字電路設計&#xff0c;如果僅僅是寫寫代碼&#xff0c;做做verilog仿真&#xff0c;那么其實是不需要轉移到fpga上面的。這就好比是算法工程師&a…

【Selenium】Selenium 測試失敗排查:一次元素定位超時的完整解決之旅

Selenium 測試失敗排查:一次元素定位超時的完整解決之旅 在自動化測試過程中,我們經常會遇到元素定位超時的問題。本文記錄了一次完整的 Selenium TimeoutException 排查過程,從問題發現到最終解決,涵蓋了各種常見陷阱和解決方案。 問題背景 測試用例在執行過程中失敗,…

32.網絡基礎概念(二)

局域網網絡傳輸流程圖兩臺主機在同一個局域網&#xff0c;是否能夠直接通信&#xff1f;以太網原理舉例&#xff1a;上課&#xff0c;老師點名小王讓他站起來回答問題。教室里的其他人是可以聽見的&#xff0c;為什么其他人不響應&#xff1f;因為老師叫的是小王&#xff0c;和…

【高并發內存池】六、三種緩存的回收內存過程

文章目錄前言Ⅰ. thread cache的內存回收Ⅱ. central cache的內存回收Ⅲ. page cache的內存回收前言 ? 前面我們將內存的申請流程都走通了&#xff0c;現在就是內存回收的過程&#xff0c;主要是從 thread cache 開始&#xff0c;一層一層往下回收&#xff0c;因為我們調用的…

DeerFlow 實踐:華為IPD流程的評審智能體設計

目錄 一、項目背景與目標 二、IPD 流程關鍵評審點與 TR 點解析 &#xff08;一&#xff09;4 個關鍵評審點 &#xff08;二&#xff09;6 個 TR 點 三、評審智能體詳細設計與協作機制 機制設計核心原則 &#xff08;一&#xff09;概念評審&#xff08;CDCP&#xff09;…

【ubuntu】ubuntu中找不到串口設備問題排查

ubuntu中找不到串口問題排查1. 檢查設備識別情況2. 檢查并安裝驅動3. 檢查內核消息4. 禁用brltty服務1. 停止并禁用 brltty 服務2. 完全移除 brltty 包3. 重啟系統或重新插拔設備5.輸出結果問題&#xff1a;虛擬機ubuntu中&#xff0c;已經顯示串口設備連接成功&#xff0c;但是…

Unity 性能優化 之 靜態資源優化 (音頻 | 模型 | 紋理 | 動畫)

Unity 之 性能優化 -- 靜態資源優化參考性能指標靜態資源資源工作流程資源分類原理小結Audio 實戰優化建議模型導入工作流程DCC中模型導出.DCC中Mesh生產規范模型導出檢查流程模型優化建議紋理優化紋理基礎概念紋理類型紋理大小紋理顏色空間紋理壓縮紋理圖集紋理過濾紋理Mipmap…

GitHub 熱榜項目 - 日榜(2025-09-13)

GitHub 熱榜項目 - 日榜(2025-09-13) 生成于&#xff1a;2025-09-13 統計摘要 共發現熱門項目&#xff1a;18 個 榜單類型&#xff1a;日榜 本期熱點趨勢總結 本期GitHub熱榜項目呈現三大技術熱點&#xff1a;AI開發工具化&#xff08;如GenKit、ROMA多智能體框架&#xff…

Pytest 常見問題及其解決方案

常見問題及解決方案 1. 測試通過了,但覆蓋率不達標 現象: 雖然所有測試都通過了,但覆蓋率報告顯示某些代碼沒有被覆蓋。 解決方案: 檢查覆蓋率配置:確保 .coveragerc 或 pytest.ini 中正確設置了要分析的源代碼路徑。 使用標記(markers)排除測試文件本身:避免測試代…

直擊3D內容創作痛點-火山引擎多媒體實驗室首次主持SIGGRAPH Workshop,用前沿技術降低沉浸式內容生成門檻

當3D、VR技術在游戲、教育、醫療、文化領域遍地開花&#xff0c;“內容短缺”卻成了制約行業爆發的關鍵瓶頸——傳統3D/4D創作不僅耗時耗力、依賴專業技能&#xff0c;還難以適配消費級設備&#xff0c;讓許多創作者望而卻步。近日&#xff0c;由火山引擎多媒體實驗室聯合領域頂…

華為基本命令

我們使用的是華為官方的模擬器eNSP 一、華為設備的模式 華為的設備有兩種模式&#xff1a; 用戶視圖和系統視圖 用戶視圖只能讀取&#xff0c;或者進行一些基礎查詢 系統視圖能對設備和接口進行一些配置管理&#xff0c;和一些高級操作 在“用戶視圖”下使用system-view系統可…

2025.9.14英語紅寶書【必背16-20】

單詞組合 中文速記句子 英文句子 confine, misery, necessitate, negotiate, preach, precaution, precision, stretch 病人被 confine(限制) 在床上,感受 misery(痛苦),情況 necessitate(需要) 醫生 negotiate(商討),牧師 preach(布道) 并提醒 precaution(預防)…

HUST-STAR電控組視覺任務

視覺任務 注意&#xff1a;視覺部分建議采用 python 完成&#xff0c;下面教程也大多針對 python。其原因在于 python 配置相應環境更為輕松&#xff0c;且內置庫較為豐富&#xff0c;屬于初學者友好類型。沒接觸過 python 也不必擔心&#xff0c;它的大體邏輯與 C 相近&#…