目錄
一、前言
二、Kafka 介紹
2.1 什么是 Apache Kafka
2.2 Kafka 核心概念與架構
2.3 Kafka 為什么如此強大
2.4 Kafka 在微服務領域的應用場景
三、Docker 部署Kakfa服務
3.1 環境準備
3.2 Docker部署Kafka操作過程
3.2.1 創建docker網絡
3.2.2 啟動zookeeper容器
3.2.3 啟動kafka容器
3.3 kafka使用效果驗證
四、SpringBoot整合Kafka完整過程
4.1 前置準備
4.1.1 環境依賴
4.1.2 導入核心依賴
4.1.3 添加配置文件
4.2 代碼整合過程
4.2.1 增加消息傳遞對象類
4.2.2 創建發送消息的工具類
4.2.3 創建消費者
4.2.4 增加一個操作topic的工具類
4.2.5 創建測試接口
4.2.6 效果驗證
五、寫在文末
一、前言
kafka在眾多的領域中都有著廣泛的使用。作為一款久經考驗性能強勁的消息中間件,在大數據、微服務、電商、金融等眾多領域的IT系統中承擔著重要的角色。利用kafka的高吞吐、高性能等特點,應用程序很容易進行適合高并發的架構拓展設計,為架構優化、系統性能提升、應用程序解耦等場景提供了有力的支撐。在微服務領域,kafka的應用,可以讓微服務的設計能夠應對更多復雜的業務場景,本文以SpringBoot為例,詳細介紹如何在SpringBoot的微服務項目中集成和使用kafka。
二、Kafka 介紹
2.1 什么是 Apache Kafka
Apache Kafka 最初由 LinkedIn 開發,并于 2011 年開源,現已發展成為一款開源的分布式事件流平臺。它的核心設計目標是能夠高效地處理實時數據流,具備高吞吐量、可擴展性、持久性和容錯性。官網:Apache Kafka
你可以把它理解為一個高度耐用、永不丟失的“消息隊列”,但它的能力遠不止于此。它更像是一個中央神經系統,用于連接不同應用程序、系統和數據源,讓數據能夠以流的形式在其中可靠地流動。
2.2 Kafka 核心概念與架構
要理解 Kafka,首先需要了解幾個核心概念:
-
Topic(主題):數據的類別或Feed名稱。消息總是被發布到特定的 Topic,好比數據庫中的表。
-
Producer(生產者):向 Topic 發布消息的客戶端應用程序。
-
Consumer(消費者):訂閱 Topic 并處理發布的消息的客戶端應用程序。
-
Broker(代理):一個 Kafka 服務器就是一個 Broker。一個 Kafka 集群由多個 Broker 組成,以實現高可用和負載均衡。
-
Partition(分區):每個 Topic 可以被分成多個 Partition。分區是 Kafka 實現水平擴展和并行處理的基礎。消息在分區內是有序的(但跨分區不保證順序)。
-
Consumer Group(消費者組):由多個消費者實例組成,共同消費一個 Topic。同一個分區只會被分配給同一個消費者組內的一個消費者,從而實現“負載均衡”和“Scale-Out”(橫向擴展)的消費模式。
-
Offset(偏移量):消息在分區中的唯一標識。消費者通過管理 Offset 來追蹤自己消費到了哪里,即使重啟也不會丟失位置。
2.3 Kafka 為什么如此強大
-
高吞吐量:即使使用普通的硬件,也能支持每秒數十萬甚至百萬級的消息處理。
-
可擴展性:通過簡單地增加 Broker 和分區,可以輕松擴展集群,處理更大的數據量。
-
持久性與可靠性:消息被持久化到磁盤,并且通過副本機制(Replication)在多臺服務器上進行備份,防止數據丟失。
-
實時性:消息產生后立刻可供消費,延遲極低,是真正的實時流處理平臺。
2.4 Kafka 在微服務領域的應用場景
在微服務架構中,服務被拆分為多個小型、獨立的單元。這些服務之間需要通信和協作,而 Kafka 正是實現這種松耦合、異步通信的理想選擇。具體來說,Kafka 可以在下面的場景中使用。
1)服務間異步通信(解耦)
-
場景:訂單服務創建訂單后,需要通知庫存服務扣減庫存、通知用戶服務發送短信、通知分析服務更新統計數據。
-
傳統問題:如果使用同步 HTTP 調用(如 REST),訂單服務需要等待所有調用成功后才能返回,導致響應慢,且任何一個下游服務故障都會導致整個操作失敗(緊耦合)。
-
Kafka 方案:訂單服務只需將一條
OrderCreated
事件發送到 Kafka 的orders
Topic,然后就可以立即返回響應。庫存、用戶、分析等服務作為消費者,各自獨立地從該 Topic 拉取消息并進行處理。實現了服務的徹底解耦:訂單服務不關心誰消費、何時消費、消費是否成功。
2)事件溯源(Event Sourcing)與 CQRS
-
事件溯源:不存儲對象的當前狀態,而是存儲所有改變狀態的事件序列。Kafka 的持久化日志特性使其成為存儲這些事件的完美“事件存儲”。
-
CQRS(命令查詢職責分離):寫模型(命令端)在處理完命令后,將領域事件發布到 Kafka。讀模型(查詢端)訂閱這些事件,并更新自己的物化視圖(如 Elasticsearch、Redis 中的查詢專用數據)。這極大地提高了系統的查詢性能和靈活性。
3)流處理與實時數據管道
使用 Kafka Streams 或 ksqlDB 這樣的流處理庫,可以直接在微服務中構建復雜的實時數據處理邏輯。
場景:實時監控用戶點擊流、實時計算儀表盤、實時風控、實時推薦。
4)日志聚合(Log Aggregation)
將多個微服務的日志集中收集到 Kafka 中,然后再導入到 ELK(Elasticsearch, Logstash, Kibana)或 Splunk 等中央日志系統中進行分析和查詢。Kafka 作為緩沖層,可以應對日志量的突發高峰,防止沖垮后端的日志存儲系統。
5)小結:
在 Java 微服務架構中,Apache Kafka 遠不止一個消息隊列。它還具備下面多種角色:
-
服務的粘合劑:以事件驅動的方式連接孤立的微服務,實現高度解耦和彈性。
-
實時數據流的中心樞紐:所有重要的業務事件都流經此處,為構建實時應用提供數據基礎。
-
流處理平臺:允許開發者直接在微服務中編寫實時數據處理邏輯。
其與 Spring Boot 等主流 Java 框架的無縫集成,使得它成為構建現代、可擴展、高響應的 Java 微服務系統時不可或缺的基礎設施。選擇 Kafka,意味著為你的微服務系統選擇了面向未來的架構模式。
三、Docker 部署Kakfa服務
為了后面在工程中對接與使用Kafka,需要提前準備一個Kafka的服務,下面使用docker快速部署一個Kakfa,參考下面的過程完成基于Docker環境部署kafka的操作流程。
3.1 環境準備
服務器推薦:2核4G(至少),提前在服務器安裝好docker環境
3.2 Docker部署Kafka操作過程
3.2.1 創建docker網絡
搭配zookeeper進行使用,這個是部署kafka比較經典的方式,為了更好的讓kafka與zookeeper交互,提前創建一個docker網絡
docker network create kafka-net
3.2.2 啟動zookeeper容器
使用下面的命令啟動zookeeper容器
docker run -d \--name zookeeper_01 \--network kafka-net \-p 12179:2181 \-e ZOO_TICK_TIME=2000 \zookeeper:latest
參數解釋:
-
-d
: 后臺運行容器。 -
--name
: 為容器指定一個名稱。 -
--network
: 加入創建的kafka-net
網絡。 -
-p 12179:2181
: 將容器的12179
端口映射到宿主機的 2181 端口。 -
-e ZOO_TICK_TIME=2000
: 設置 ZooKeeper 的基本時間單位(毫秒)。
3.2.3 啟動kafka容器
使用下面的命令啟動kafka容器
docker run -d \
--name kafka_01 \
-p 19091:9092 \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ZOOKEEPER_CONNECT=服務器公網IP:12179 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://服務器公網IP:19091 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
wurstmeister/kafka:latest
docker 參數說明:
-
KAFKA_ADVERTISED_LISTENERS
: 非常重要。Broker 發布給客戶端(生產者、消費者)的連接地址。如果客戶端在宿主機外,需替換localhost
為宿主機 IP。 -
KAFKA_LISTENERS
: Broker 實際監聽的地址和協議,0.0.0.0
表示監聽所有網絡接口。 -
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
: 設置內部偏移量主題的副本因子,單機設為 1 即可。
3.3 kafka使用效果驗證
kafka的服務搭建完成之后,接下來驗證下是否可以先通過客戶端操作命令正常使用topic進行收發消息。
1)進入 Kafka 容器:
docker exec -it kafka_01 /bin/bash
2)創建一個topic
kafka-topics.sh --create --zookeeper 公網IP:12179 --replication-factor 1 --partitions 1 --topic test-topic
3)啟動生產者窗口
開啟一個生產者窗口,嘗試往上面的topic中發送消息
kafka-console-producer.sh --broker-list 公網IP:19091 --topic test-topic
看到下面的效果說明生產端連接上了
4)啟動消費者窗口
開啟一個新的消費者窗口,嘗試從上面的topic中接收消息
kafka-console-consumer.sh --bootstrap-server 公網IP:19091 --topic test-topic --from-beginning
看到下面的效果說明消費端接收消息就緒了
5)發送消息
接下來從生產者窗口發送一條消息,可以看到消息能夠正常的發送出去,同時消費端也能接收到消息
四、SpringBoot整合Kafka完整過程
接下來通過案例操作詳細介紹下如何在springboot項目中整合與使用kafka。
4.1 前置準備
4.1.1 環境依賴
首先確保你的開發環境滿足以下要求:
-
Java 17+ (Spring Boot 3 需要 Java 17 或更高版本)
-
Apache Kafka (本地安裝或使用 Docker 容器)
-
Maven 或 Gradle 構建工具
4.1.2 導入核心依賴
提前創建一個springboot工程,并導入如下核心依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>mcp-client</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>17</maven.compiler.source><maven.compiler.target>17</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.3.3</version><relativePath/></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Mysql Connector --><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId></dependency><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>3.0.3</version></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.5</version><exclusions><exclusion><groupId>org.mybatis</groupId><artifactId>mybatis-spring</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>com.github.ulisesbocchio</groupId><artifactId>jasypt-spring-boot-starter</artifactId><version>3.0.5</version> <!-- 建議使用最新版本 --></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><finalName>${project.artifactId}</finalName><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
4.1.3 添加配置文件
在工程的yml配置文件中增加下面有關kafka的信息
server:port: 8082spring:kafka:bootstrap-servers: IP:9092#生產者配置producer:retries: 3batch-size: 16384 # 批量處理大小buffer-memory: 33554432 # 生產者緩沖內存大小acks: all # 消息確認機制key-serializer: org.apache.kafka.common.serialization.StringSerializer#value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # JSON類型的序列化value-serializer: org.apache.kafka.common.serialization.StringSerializer # 字符串類型的序列化#消費者配置consumer:group-id: consumer-group # 消費者組IDauto-offset-reset: earliest # 當無初始offset或offset失效時的處理方式enable-auto-commit: false # 關閉自動提交偏移量key-deserializer: org.apache.kafka.common.serialization.StringDeserializer#value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer # JSON類型的反序列化value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 字符串類型的序列化#需要搭配對象類型的序列化一起使用#properties:#spring.json.trusted.packages: "*"listener:ack-mode: MANUAL_IMMEDIATE # 手動立即提交偏移量:cite[4]mybatis-plus:# 不支持多包, 如有需要可在注解配置 或 提升掃包等級# 例如 com.**.**.mappermapperPackage: com.congge.mapper# 對應的 XML 文件位置mapperLocations: classpath*:mapper/**/*Mapper.xml# 實體掃描,多個package用逗號或者分號分隔typeAliasesPackage: com.congge.entityglobal-config:dbConfig:# 主鍵類型# AUTO 自增 NONE 空 INPUT 用戶輸入 ASSIGN_ID 雪花 ASSIGN_UUID 唯一 UUID# 如需改為自增 需要將數據庫表全部設置為自增idType: ASSIGN_ID# 邏輯已刪除值(默認為 1)logic-delete-value: 1# 邏輯未刪除值(默認為 0)logic-not-delete-value: 0
4.2 代碼整合過程
參考下面的操作過程完成代碼的整合。
4.2.1 增加消息傳遞對象類
自定義一個消息對象,用于承載某些復雜場景下對傳遞消息的要求
package com.congge.kafka;import java.time.LocalDateTime;public class MessageData {private Long id;private String content;private LocalDateTime timestamp;// 構造方法public MessageData() {}public MessageData(Long id, String content, LocalDateTime timestamp) {this.id = id;this.content = content;this.timestamp = timestamp;}// Getter和Setter方法public Long getId() {return id;}public void setId(Long id) {this.id = id;}public String getContent() {return content;}public void setContent(String content) {this.content = content;}public LocalDateTime getTimestamp() {return timestamp;}public void setTimestamp(LocalDateTime timestamp) {this.timestamp = timestamp;}@Overridepublic String toString() {return "MessageData{" +"id=" + id +", content='" + content + '\'' +", timestamp=" + timestamp +'}';}}
4.2.2 創建發送消息的工具類
創建一個工具類用于發送消息,為了后續使用方便,該類作為spring bean配置到IOC容器中,其他類需要使用的時候直接注入即可。
-
該類中定義了多個發送消息的方法,可用于發送不同類型的消息,比如比較常用的字符串消息,對象消息等,可以根據實際的業務場景選型使用;
package com.congge.utils;import com.congge.kafka.MessageData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;import java.time.LocalDateTime;
import java.util.concurrent.CompletableFuture;@Service
public class KafkaProducerService {private static final Logger logger = LoggerFactory.getLogger(KafkaProducerService.class);private static final String TOPIC_NAME = "test-topic";private final KafkaTemplate<String, Object> kafkaTemplate;public KafkaProducerService(KafkaTemplate<String, Object> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}/*** 發送字符串消息*/public void sendStringMessage(String message) {CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_NAME, message);future.whenComplete((result, ex) -> {if (ex == null) {logger.info("Sent message=[{}] with offset=[{}]", message, result.getRecordMetadata().offset());} else {logger.error("Unable to send message=[{}] due to: {}", message, ex.getMessage());}});}/*** 發送對象消息(JSON格式)*/public void sendObjectMessage(Long id, String content) {MessageData messageData = new MessageData(id, content, LocalDateTime.now());CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_NAME, messageData);future.whenComplete((result, ex) -> {if (ex == null) {logger.info("Sent message=[{}] with offset=[{}]", messageData, result.getRecordMetadata().offset());} else {logger.error("Unable to send message=[{}] due to: {}", messageData, ex.getMessage());}});}/*** 發送帶鍵的消息*/public void sendMessageWithKey(String key, String message) {CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_NAME, key, message);future.whenComplete((result, ex) -> {if (ex == null) {logger.info("Sent message=[{}] with key=[{}] and offset=[{}]",message, key, result.getRecordMetadata().offset());} else {logger.error("Unable to send message=[{}] with key=[{}] due to: {}",message, key, ex.getMessage());}});}
}
4.2.3 創建消費者
創建一個服務類,專門用于消費 Kafka 主題中的消息
-
在實際開發中,建議不同的業務使用各自的監聽器的類,避免業務重度耦合
package com.congge.utils;import com.congge.kafka.MessageData;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumerService {private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerService.class);/*** 消費字符串消息*/@KafkaListener(topics = "test-topic", groupId = "consumer-group")public void consumeStringMessage(String message,Acknowledgment acknowledgment,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,@Header(KafkaHeaders.OFFSET) long offset) {try {logger.info("Received string message: [{}] from topic: {}, partition: {}, offset: {}",message, topic, partition, offset);// 業務處理邏輯processMessage(message);// 手動提交偏移量acknowledgment.acknowledge();logger.info("Acknowledged message from topic: {}, partition: {}, offset: {}",topic, partition, offset);} catch (Exception e) {logger.error("Error processing message: {}", message, e);// 可根據業務需求決定是否重試或將消息發送到DLT(死信主題)}}/*** 消費對象消息(JSON格式)*/@KafkaListener(topics = "example-topic", groupId = "object-consumer-group")public void consumeObjectMessage(MessageData messageData,Acknowledgment acknowledgment,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,@Header(KafkaHeaders.OFFSET) long offset) {try {logger.info("Received object message: [{}] from topic: {}, partition: {}, offset: {}",messageData, topic, partition, offset);// 業務處理邏輯processObjectMessage(messageData);// 手動提交偏移量acknowledgment.acknowledge();logger.info("Acknowledged message from topic: {}, partition: {}, offset: {}",topic, partition, offset);} catch (Exception e) {logger.error("Error processing message: {}", messageData, e);}}/*** 消費帶鍵的消息*/@KafkaListener(topics = "example-topic", groupId = "key-consumer-group")public void consumeMessageWithKey(ConsumerRecord<String, String> record,Acknowledgment acknowledgment) {try {logger.info("Received message with key: [{}], value: [{}] from topic: {}, partition: {}, offset: {}",record.key(), record.value(), record.topic(), record.partition(), record.offset());// 業務處理邏輯processMessageWithKey(record.key(), record.value());// 手動提交偏移量acknowledgment.acknowledge();logger.info("Acknowledged message with key: [{}] from topic: {}, partition: {}, offset: {}",record.key(), record.topic(), record.partition(), record.offset());} catch (Exception e) {logger.error("Error processing message with key: {}", record.key(), e);}}private void processMessage(String message) {// 實現你的業務邏輯logger.info("Processing message: {}", message);}private void processObjectMessage(MessageData messageData) {// 實現你的業務邏輯logger.info("Processing object message: {}", messageData);}private void processMessageWithKey(String key, String value) {// 根據鍵處理消息的邏輯logger.info("Processing message with key: {} and value: {}", key, value);}
}
4.2.4 增加一個操作topic的工具類
增加一個操作topic的類,用于手動創建topic
package com.congge.kafka;import org.apache.kafka.clients.admin.*;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.stereotype.Service;import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;@Service
public class KafkaTopicService {private final AdminClient adminClient;public KafkaTopicService(KafkaAdmin kafkaAdmin) {this.adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties());}/*** 創建單個Topic*/public void createTopic(String topicName, int partitions, short replicationFactor) {NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);createTopic(newTopic);}/*** 創建Topic(使用NewTopic對象)*/public void createTopic(NewTopic newTopic) {CreateTopicsResult result = adminClient.createTopics(Collections.singleton(newTopic));try {result.all().get(); // 等待創建完成} catch (InterruptedException | ExecutionException e) {Thread.currentThread().interrupt();throw new RuntimeException("Failed to create topic: " + newTopic.name(), e);}}/*** 檢查Topic是否存在*/public boolean topicExists(String topicName) {try {ListTopicsResult topics = adminClient.listTopics();Set<String> topicNames = topics.names().get();return topicNames.contains(topicName);} catch (InterruptedException | ExecutionException e) {Thread.currentThread().interrupt();throw new RuntimeException("Failed to check topic existence: " + topicName, e);}}/*** 獲取Topic詳情*/public TopicDescription getTopicDescription(String topicName) {try {Map<String, TopicDescription> descriptions =adminClient.describeTopics(Collections.singleton(topicName)).all().get();return descriptions.get(topicName);} catch (InterruptedException | ExecutionException e) {Thread.currentThread().interrupt();throw new RuntimeException("Failed to get topic description: " + topicName, e);}}/*** 獲取所有Topic列表*/public Set<String> listAllTopics() {try {ListTopicsResult topics = adminClient.listTopics();return topics.names().get();} catch (InterruptedException | ExecutionException e) {Thread.currentThread().interrupt();throw new RuntimeException("Failed to list topics", e);}}/*** 安全關閉AdminClient*/public void close() {if (adminClient != null) {adminClient.close();}}
}
4.2.5 創建測試接口
為了方便驗證效果,這里創建一個接口,通過接口發送消息到kafka的topic中,然后通過監聽器到topic中的消息就算成功。參考下面的代碼。
package com.congge.kafka;import com.congge.utils.KafkaProducerService;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;@RestController
@RequestMapping("/api/kafka")
public class KafkaController {private final KafkaProducerService kafkaProducerService;private final KafkaTopicService topicService;public KafkaController(KafkaProducerService kafkaProducerService,KafkaTopicService topicService) {this.kafkaProducerService = kafkaProducerService;this.topicService = topicService;}//localhost:8082/api/kafka/send-string?message=test producer@GetMapping("/send-string")public String sendStringMessage(@RequestParam String message) {kafkaProducerService.sendStringMessage(message);return "String message sent: " + message;}//localhost:8082/api/kafka/send-object?id=1&content=test producer@GetMapping("/send-object")public String sendObjectMessage(@RequestParam Long id, @RequestParam String content) {kafkaProducerService.sendObjectMessage(id, content);return "Object message sent with ID: " + id + " and content: " + content;}@GetMapping("/send-with-key")public String sendMessageWithKey(@RequestParam String key, @RequestParam String message) {kafkaProducerService.sendMessageWithKey(key, message);return "Message with key sent: " + key + " - " + message;}/*** 創建Topic* localhost:8082/api/kafka/create-topic?topicName=eva&partitions=1&replicationFactor=2**/@GetMapping("/create-topic")public ResponseEntity<String> createTopic(@RequestParam String topicName,@RequestParam(defaultValue = "1") int partitions,@RequestParam(defaultValue = "1") short replicationFactor) {if (topicService.topicExists(topicName)) {return ResponseEntity.badRequest().body("Topic already exists: " + topicName);}try {topicService.createTopic(topicName, partitions, replicationFactor);return ResponseEntity.ok("Topic created successfully: " + topicName);} catch (Exception e) {return ResponseEntity.internalServerError().body("Failed to create topic: " + e.getMessage());}}
}
4.2.6 效果驗證
1)創建一個測試使用的topic
如果你已經提前通過命令行工具創建過topic了的話,就不用執行接口創建了,這里創建一個名為zcy-test的topic,調用上面的創建接口
2)發送字符串消息
調用發消息接口,發送成功后
然后在監聽器中監聽到了,實際業務中,你要處理的業務邏輯就在監聽器中進行處理
3)發送對象消息
在實際業務中,為了更方便的對消息進行處理,發送對象消息也是很常用的一種方式,參考下面的接口示例
然后在基于對象方式的監聽器中就可以看到了
五、寫在文末
本文比較詳細的介紹了如何在SpringBoot集成和使用Kakfa,并通過案例詳細演示了其使用過程,更深入的技術點有興趣的同學可以基于此繼續深入研究,本篇到此結束,感謝觀看。