引言:
- 本文總字數:約 9800 字
- 預計閱讀時間:40 分鐘
為什么 Kafka 是高吞吐場景的首選?
在當今的分布式系統中,消息隊列已成為不可或缺的基礎設施。面對不同的業務場景,選擇合適的消息隊列至關重要。目前主流的消息中間件中,Kafka 以其獨特的設計脫穎而出:
- 超高吞吐量:單機可輕松處理每秒數十萬條消息
- 持久化存儲:基于磁盤的高效存儲機制,支持海量消息堆積
- 水平擴展:通過分區機制實現無縫擴展
- 流處理能力:內置流處理 API,支持復雜的數據轉換和處理
根據 Apache Kafka 官方數據,Kafka 在全球財富 100 強公司中被廣泛采用,包括 Netflix、Uber、LinkedIn 等,處理著每天 PB 級別的數據。其發布 - 訂閱模式和日志存儲特性,使其特別適合日志收集、事件溯源、實時分析等場景。
本文將帶你全面掌握 SpringBoot 與 Kafka 的整合方案,從環境搭建到高級特性,從代碼實現到性能調優,讓你既能理解底層原理,又能解決實際開發中的各種問題。
一、Kafka 核心概念與架構
1.1 核心概念解析
Kafka 的核心概念包括:
- Producer:消息生產者,負責向 Kafka 發送消息
- Consumer:消息消費者,負責從 Kafka 讀取消息
- Broker:Kafka 服務器節點,一個 Kafka 集群由多個 Broker 組成
- Topic:主題,消息的分類名稱,生產者向主題發送消息,消費者從主題讀取消息
- Partition:分區,每個主題可以分為多個分區,分區是 Kafka 并行處理的基本單位
- Replica:副本,為保證數據可靠性,每個分區可以有多個副本
- Leader:主副本,每個分區有一個主副本,負責處理讀寫請求
- Follower:從副本,同步主副本的數據,主副本故障時可升級為主副本
- Consumer Group:消費者組,多個消費者可以組成一個消費者組,共同消費一個主題的消息
- Offset:偏移量,每個分區中的消息都有一個唯一的偏移量,用于標識消息在分區中的位置
1.2 架構原理
Kafka 的整體架構如圖所示:
消息流轉流程:
- 生產者將消息發送到指定主題
- 消息被分配到主題的一個分區中(可通過分區策略指定)
- 分區的主副本負責接收并存儲消息,同時從副本同步數據
- 消費者組中的消費者從分區讀取消息,每個分區只能被消費者組中的一個消費者消費
- 消費者通過偏移量記錄自己的消費位置
根據 Kafka 官方文檔(Apache Kafka),這種架構設計使得 Kafka 具有極高的吞吐量和可靠性,能夠滿足大規模數據處理的需求。
1.3 分區與消費者組機制
分區是 Kafka 實現高吞吐量的關鍵機制:
- 每個分區是一個有序的、不可變的消息序列
- 消息被追加到分區的末尾,類似日志文件
- 分區可以分布在不同的 Broker 上,實現負載均衡
消費者組機制則實現了消息的并行消費:
- 每個消費者組獨立消費主題的所有消息
- 同一個消費者組中的消費者共享消費負載
- 每個分區只能被消費者組中的一個消費者消費
- 消費者數量不應超過分區數量,否則多余的消費者將處于空閑狀態
分區與消費者組的關系如圖所示:
二、環境搭建
2.1 安裝 Kafka
我們采用最新穩定版 Kafka 3.6.1 進行安裝,步驟如下:
- 安裝 Java 環境(Kafka 依賴 Java):
# 對于Ubuntu/Debian
sudo apt-get update
sudo apt-get install openjdk-17-jdk# 對于CentOS/RHEL
sudo yum install java-17-openjdk
- 下載并解壓 Kafka:
wget https://downloads.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz
tar -xzf kafka_2.13-3.6.1.tgz
cd kafka_2.13-3.6.1
- 啟動 ZooKeeper(Kafka 依賴 ZooKeeper 管理元數據):
# 后臺啟動ZooKeeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
- 啟動 Kafka Broker:
# 后臺啟動Kafka
bin/kafka-server-start.sh -daemon config/server.properties
- 創建測試主題:
bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
- 查看主題列表:
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
2.2 安裝 Docker 方式(推薦)
使用 Docker Compose 安裝 Kafka 更加簡單快捷:
創建 docker-compose.yml 文件:
version: '3'
services:zookeeper:image: confluentinc/cp-zookeeper:7.5.0environment:ZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_TICK_TIME: 2000ports:- "2181:2181"kafka:image: confluentinc/cp-kafka:7.5.0depends_on:- zookeeperports:- "9092:9092"environment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXTKAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
啟動服務:
docker-compose up -d
三、SpringBoot 集成 Kafka 基礎
3.1 創建項目并添加依賴
我們使用 SpringBoot 3.2.0(最新穩定版)來創建項目,首先在 pom.xml 中添加必要的依賴:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.2.0</version><relativePath/></parent><groupId>com.jam</groupId><artifactId>springboot-kafka-demo</artifactId><version>0.0.1-SNAPSHOT</version><name>springboot-kafka-demo</name><description>SpringBoot集成Kafka示例項目</description><properties><java.version>17</java.version><lombok.version>1.18.30</lombok.version><commons-lang3.version>3.14.0</commons-lang3.version><mybatis-plus.version>3.5.5</mybatis-plus.version><mysql-connector.version>8.2.0</mysql-connector.version><springdoc.version>2.1.0</springdoc.version><kafka.version>3.6.1</kafka.version></properties><dependencies><!-- SpringBoot核心依賴 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Kafka依賴 --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>${kafka.version}</version></dependency><!-- Lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lombok.version}</version><scope>provided</scope></dependency><!-- 工具類 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>${commons-lang3.version}</version></dependency><!-- MyBatis-Plus --><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>${mybatis-plus.version}</version></dependency><!-- MySQL驅動 --><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><version>${mysql-connector.version}</version><scope>runtime</scope></dependency><!-- Swagger3 --><dependency><groupId>org.springdoc</groupId><artifactId>springdoc-openapi-starter-webmvc-ui</artifactId><version>${springdoc.version}</version></dependency><!-- 測試依賴 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><version>${kafka.version}</version><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build>
</project>
3.2 配置 Kafka
在 application.yml 中添加 Kafka 的配置:
spring:application:name: springboot-kafka-demodatasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/kafka_demo?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghaiusername: rootpassword: rootkafka:# Kafka集群地址bootstrap-servers: localhost:9092# 生產者配置producer:# 消息key的序列化器key-serializer: org.apache.kafka.common.serialization.StringSerializer# 消息value的序列化器value-serializer: org.springframework.kafka.support.serializer.JsonSerializer# 批次大小,當批次滿了之后才會發送batch-size: 16384# 緩沖區大小buffer-memory: 33554432# 消息確認機制:0-不需要確認,1-只需要leader確認,all-所有副本都需要確認acks: all# 重試次數retries: 3# 重試間隔時間retry-backoff-ms: 1000# 消費者配置consumer:# 消息key的反序列化器key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 消息value的反序列化器value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer# 消費者組IDgroup-id: default-group# 自動偏移量重置策略:earliest-從頭開始消費,latest-從最新的開始消費,none-如果沒有偏移量則拋出異常auto-offset-reset: earliest# 是否自動提交偏移量enable-auto-commit: false# 自動提交偏移量的間隔時間auto-commit-interval: 1000# 指定JsonDeserializer反序列化的目標類properties:spring:json:trusted:packages: com.jam.entity# 監聽器配置listener:# 消息確認模式:manual-手動確認,auto-自動確認ack-mode: manual_immediate# 并發消費者數量concurrency: 3# 批量消費配置batch-listener: false# 每次拉取的記錄數consumer:max-poll-records: 500# 重試配置retry:# 是否啟用重試enabled: true# 初始重試間隔時間initial-interval: 1000# 最大重試間隔時間max-interval: 10000# 重試乘數multiplier: 2# 最大重試次數max-attempts: 3mybatis-plus:mapper-locations: classpath:mapper/*.xmltype-aliases-package: com.jam.entityconfiguration:map-underscore-to-camel-case: truelog-impl: org.apache.ibatis.logging.stdout.StdOutImplspringdoc:api-docs:path: /api-docsswagger-ui:path: /swagger-ui.htmloperationsSorter: methodserver:port: 8081
3.3 創建 Kafka 常量配置類
創建常量類,定義 Kafka 相關的常量:
package com.jam.config;/*** Kafka常量配置類* 定義Kafka主題名稱、消費者組等常量** @author 果醬*/
public class KafkaConstant {/*** 普通消息主題*/public static final String NORMAL_TOPIC = "normal_topic";/*** 分區消息主題*/public static final String PARTITION_TOPIC = "partition_topic";/*** 事務消息主題*/public static final String TRANSACTIONAL_TOPIC = "transactional_topic";/*** 死信主題*/public static final String DEAD_LETTER_TOPIC = "dead_letter_topic";/*** 普通消費者組*/public static final String NORMAL_CONSUMER_GROUP = "normal_consumer_group";/*** 分區消費者組*/public static final String PARTITION_CONSUMER_GROUP = "partition_consumer_group";/*** 事務消費者組*/public static final String TRANSACTIONAL_CONSUMER_GROUP = "transactional_consumer_group";/*** 死信消費者組*/public static final String DEAD_LETTER_CONSUMER_GROUP = "dead_letter_consumer_group";/*** 事務ID前綴*/public static final String TRANSACTION_ID_PREFIX = "tx-";
}
3.4 創建消息實體類
創建一個通用的消息實體類,用于封裝發送的消息內容:
package com.jam.entity;import lombok.Data;
import java.io.Serializable;
import java.time.LocalDateTime;/*** 消息實體類* 用于封裝發送到Kafka的消息內容** @author 果醬*/
@Data
public class MessageEntity implements Serializable {/*** 消息ID*/private String messageId;/*** 消息內容*/private String content;/*** 業務類型*/private String businessType;/*** 業務ID,用于分區策略*/private String businessId;/*** 創建時間*/private LocalDateTime createTime;/*** 擴展字段,用于存儲額外信息*/private String extra;
}
3.5 創建 Kafka 配置類
創建配置類,配置 Kafka 生產者、消費者、分區策略等:
package com.jam.config;import com.jam.entity.MessageEntity;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import java.util.HashMap;
import java.util.Map;
import static org.apache.kafka.clients.consumer.ConsumerConfig.*;
import static org.apache.kafka.clients.producer.ProducerConfig.*;/*** Kafka配置類* 配置Kafka主題、生產者、消費者等** @author 果醬*/
@Configuration
public class KafkaConfig {/*** 創建普通消息主題* 3個分區,1個副本** @return 普通消息主題*/@Beanpublic NewTopic normalTopic() {// 參數:主題名稱、分區數、副本數return new NewTopic(KafkaConstant.NORMAL_TOPIC, 3, (short) 1);}/*** 創建分區消息主題* 5個分區,1個副本** @return 分區消息主題*/@Beanpublic NewTopic partitionTopic() {return new NewTopic(KafkaConstant.PARTITION_TOPIC, 5, (short) 1);}/*** 創建事務消息主題* 3個分區,1個副本** @return 事務消息主題*/@Beanpublic NewTopic transactionalTopic() {return new NewTopic(KafkaConstant.TRANSACTIONAL_TOPIC, 3, (short) 1);}/*** 創建死信主題* 1個分區,1個副本** @return 死信主題*/@Beanpublic NewTopic deadLetterTopic() {return new NewTopic(KafkaConstant.DEAD_LETTER_TOPIC, 1, (short) 1);}/*** 配置事務生產者工廠** @return 事務生產者工廠*/@Beanpublic ProducerFactory<String, MessageEntity> transactionalProducerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configProps.put(KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);configProps.put(VALUE_SERIALIZER_CLASS_CONFIG, org.springframework.kafka.support.serializer.JsonSerializer.class);configProps.put(ACKS_CONFIG, "all");configProps.put(RETRIES_CONFIG, 3);configProps.put(BATCH_SIZE_CONFIG, 16384);configProps.put(BUFFER_MEMORY_CONFIG, 33554432);// 配置事務ID前綴configProps.put(TRANSACTIONAL_ID_CONFIG, KafkaConstant.TRANSACTION_ID_PREFIX);DefaultKafkaProducerFactory<String, MessageEntity> factory = new DefaultKafkaProducerFactory<>(configProps);// 開啟事務支持factory.transactionCapable();return factory;}/*** 配置事務Kafka模板** @return 事務Kafka模板*/@Beanpublic KafkaTemplate<String, MessageEntity> transactionalKafkaTemplate() {return new KafkaTemplate<>(transactionalProducerFactory());}/*** 配置Kafka事務管理器** @return Kafka事務管理器*/@Beanpublic KafkaTransactionManager<String, MessageEntity> kafkaTransactionManager() {return new KafkaTransactionManager<>(transactionalProducerFactory());}
}
3.6 創建分區策略類
創建自定義的分區策略,根據業務 ID 將消息發送到指定分區:
package com.jam.config;import com.jam.entity.MessageEntity;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;
import java.util.List;
import java.util.Map;/*** 自定義Kafka分區策略* 根據業務ID將消息發送到指定分區,確保相同業務ID的消息在同一分區** @author 果醬*/
public class BusinessIdPartitioner implements Partitioner {/*** 計算分區號** @param topic 主題名稱* @param key 消息鍵* @param keyBytes 消息鍵的字節數組* @param value 消息值* @param valueBytes 消息值的字節數組* @param cluster Kafka集群信息* @return 分區號*/@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 獲取主題的所有分區List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();// 如果消息值不是MessageEntity類型,拋出異常if (!(value instanceof MessageEntity)) {throw new InvalidRecordException("消息必須是MessageEntity類型");}MessageEntity message = (MessageEntity) value;String businessId = message.getBusinessId();// 如果業務ID為空,使用默認分區策略if (StringUtils.isBlank(businessId)) {if (keyBytes == null) {// 使用隨機分區return Utils.toPositive(Utils.murmur2(valueBytes)) % numPartitions;} else {// 使用key計算分區return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}}// 根據業務ID計算分區,確保相同業務ID的消息在同一分區return Math.abs(businessId.hashCode()) % numPartitions;}/*** 關閉分區器*/@Overridepublic void close() {// 關閉資源(如果有的話)}/*** 配置分區器** @param configs 配置參數*/@Overridepublic void configure(Map<String, ?> configs) {// 讀取配置參數(如果有的話)}
}
3.7 創建消息生產者服務
創建消息生產者服務,封裝發送消息的各種方法:
package com.jam.service;import com.jam.config.KafkaConstant;
import com.jam.entity.MessageEntity;
import com.jam.entity.MessageTrace;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.time.LocalDateTime;
import java.util.Objects;
import java.util.UUID;/*** Kafka消息生產者服務* 負責向Kafka發送各種類型的消息** @author 果醬*/
@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaProducerService {/*** Kafka模板類,提供發送消息的各種方法*/private final KafkaTemplate<String, MessageEntity> kafkaTemplate;/*** 事務Kafka模板類,用于發送事務消息*/private final KafkaTemplate<String, MessageEntity> transactionalKafkaTemplate;/*** 消息軌跡服務*/private final MessageTraceService messageTraceService;/*** 發送普通消息** @param topic 主題名稱* @param message 消息實體*/public void sendMessage(String topic, MessageEntity message) {// 參數校驗StringUtils.hasText(topic, "主題名稱不能為空");Objects.requireNonNull(message, "消息實體不能為空");// 確保消息ID和創建時間不為空if (StringUtils.isBlank(message.getMessageId())) {message.setMessageId(UUID.randomUUID().toString());}if (message.getCreateTime() == null) {message.setCreateTime(LocalDateTime.now());}// 記錄消息發送前的軌跡messageTraceService.recordBeforeSend(message, topic);log.info("發送Kafka消息,主題:{},消息ID:{},業務類型:{}",topic, message.getMessageId(), message.getBusinessType());// 發送消息ListenableFuture<SendResult<String, MessageEntity>> future = kafkaTemplate.send(topic, message.getMessageId(), message);// 處理發送結果future.addCallback(new ListenableFutureCallback<>() {@Overridepublic void onSuccess(SendResult<String, MessageEntity> result) {log.info("Kafka消息發送成功,主題:{},消息ID:{},分區:{},偏移量:{}",topic, message.getMessageId(),result.getRecordMetadata().partition(),result.getRecordMetadata().offset());// 記錄消息發送成功的軌跡messageTraceService.recordSendSuccess(message.getMessageId(),result.getRecordMetadata().partition(),result.getRecordMetadata().offset());}@Overridepublic void onFailure(Throwable ex) {log.error("Kafka消息發送失敗,主題:{},消息ID:{}",topic, message.getMessageId(), ex);// 記錄消息發送失敗的軌跡messageTraceService.recordSendFailure(message.getMessageId(), ex.getMessage());}});}/*** 發送分區消息** @param message 消息實體*/public void sendPartitionMessage(MessageEntity message) {// 參數校驗Objects.requireNonNull(message, "消息實體不能為空");StringUtils.hasText(message.getBusinessId(), "業務ID不能為空");// 確保消息ID和創建時間不為空if (StringUtils.isBlank(message.getMessageId())) {message.setMessageId(UUID.randomUUID().toString());}if (message.getCreateTime() == null) {message.setCreateTime(LocalDateTime.now());}String topic = KafkaConstant.PARTITION_TOPIC;// 記錄消息發送前的軌跡messageTraceService.recordBeforeSend(message, topic);log.info("發送Kafka分區消息,主題:{},消息ID:{},業務ID:{},業務類型:{}",topic, message.getMessageId(), message.getBusinessId(), message.getBusinessType());// 發送消息,使用業務ID作為key,配合自定義分區策略ListenableFuture<SendResult<String, MessageEntity>> future = kafkaTemplate.send(topic, message.getBusinessId(), message);// 處理發送結果future.addCallback(new ListenableFutureCallback<>() {@Overridepublic void onSuccess(SendResult<String, MessageEntity> result) {log.info("Kafka分區消息發送成功,主題:{},消息ID:{},業務ID:{},分區:{},偏移量:{}",topic, message.getMessageId(), message.getBusinessId(),result.getRecordMetadata().partition(),result.getRecordMetadata().offset());// 記錄消息發送成功的軌跡messageTraceService.recordSendSuccess(message.getMessageId(),result.getRecordMetadata().partition(),result.getRecordMetadata().offset());}@Overridepublic void onFailure(Throwable ex) {log.error("Kafka分區消息發送失敗,主題:{},消息ID:{},業務ID:{}",topic, message.getMessageId(), message.getBusinessId(), ex);// 記錄消息發送失敗的軌跡messageTraceService.recordSendFailure(message.getMessageId(), ex.getMessage());}});}/*** 發送事務消息** @param message 消息實體*/@Transactional(rollbackFor = Exception.class)public void sendTransactionalMessage(MessageEntity message) {// 參數校驗Objects.requireNonNull(message, "消息實體不能為空");// 確保消息ID和創建時間不為空if (StringUtils.isBlank(message.getMessageId())) {message.setMessageId(UUID.randomUUID().toString());}if (message.getCreateTime() == null) {message.setCreateTime(LocalDateTime.now());}String topic = KafkaConstant.TRANSACTIONAL_TOPIC;// 記錄消息發送前的軌跡messageTraceService.recordBeforeSend(message, topic);log.info("發送Kafka事務消息,主題:{},消息ID:{},業務類型:{}",topic, message.getMessageId(), message.getBusinessType());// 開始事務transactionalKafkaTemplate.executeInTransaction(kafkaOperations -> {// 發送消息SendResult<String, MessageEntity> result = kafkaOperations.send(topic, message.getMessageId(), message).get();log.info("Kafka事務消息發送成功,主題:{},消息ID:{},分區:{},偏移量:{}",topic, message.getMessageId(),result.getRecordMetadata().partition(),result.getRecordMetadata().offset());// 記錄消息發送成功的軌跡messageTraceService.recordSendSuccess(message.getMessageId(),result.getRecordMetadata().partition(),result.getRecordMetadata().offset());// 這里可以添加數據庫操作等其他事務操作return result;});}/*** 創建消息實體** @param content 消息內容* @param businessType 業務類型* @param businessId 業務ID* @param extra 額外信息* @return 消息實體*/public MessageEntity createMessageEntity(String content, String businessType, String businessId, String extra) {MessageEntity message = new MessageEntity();message.setMessageId(UUID.randomUUID().toString());message.setContent(content);message.setBusinessType(businessType);message.setBusinessId(businessId);message.setCreateTime(LocalDateTime.now());message.setExtra(extra);return message;}
}
3.8 創建消息消費者服務
創建消息消費者服務,使用 @KafkaListener 注解消費消息:
package com.jam.service;import com.jam.config.KafkaConstant;
import com.jam.entity.MessageEntity;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Objects;/*** Kafka消息消費者服務* 負責從Kafka接收并處理消息** @author 果醬*/
@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaConsumerService {/*** 消息軌跡服務*/private final MessageTraceService messageTraceService;/*** 消費普通消息** @param record 消息記錄* @param acknowledgment 確認對象* @param topic 主題名稱* @param partition 分區號* @param offset 偏移量*/@KafkaListener(topics = KafkaConstant.NORMAL_TOPIC, groupId = KafkaConstant.NORMAL_CONSUMER_GROUP)public void consumeNormalMessage(ConsumerRecord<String, MessageEntity> record,Acknowledgment acknowledgment,@Header("kafka_receivedTopic") String topic,@Header("kafka_receivedPartitionId") int partition,@Header("kafka_offset") long offset) {MessageEntity message = record.value();Objects.requireNonNull(message, "消息內容不能為空");log.info("接收到普通消息,主題:{},分區:{},偏移量:{},消息ID:{},業務類型:{}",topic, partition, offset, message.getMessageId(), message.getBusinessType());try {// 處理消息的業務邏輯processMessage(message);// 記錄消費成功軌跡messageTraceService.recordConsumeSuccess(message.getMessageId(), partition, offset);// 手動確認消息acknowledgment.acknowledge();log.info("普通消息處理成功并確認,主題:{},消息ID:{}", topic, message.getMessageId());} catch (Exception e) {// 記錄消費失敗軌跡messageTraceService.recordConsumeFailure(message.getMessageId(), partition, offset, e.getMessage());log.error("普通消息處理失敗,主題:{},消息ID:{}", topic, message.getMessageId(), e);// 手動確認消息(將失敗消息標記為已消費,避免無限重試)// 如果需要將消息發送到死信隊列,可以不確認并配置死信轉發acknowledgment.acknowledge();}}/*** 消費分區消息** @param record 消息記錄* @param acknowledgment 確認對象* @param topic 主題名稱* @param partition 分區號* @param offset 偏移量*/@KafkaListener(topics = KafkaConstant.PARTITION_TOPIC, groupId = KafkaConstant.PARTITION_CONSUMER_GROUP)public void consumePartitionMessage(ConsumerRecord<String, MessageEntity> record,Acknowledgment acknowledgment,@Header("kafka_receivedTopic") String topic,@Header("kafka_receivedPartitionId") int partition,@Header("kafka_offset") long offset) {MessageEntity message = record.value();Objects.requireNonNull(message, "消息內容不能為空");log.info("接收到分區消息,主題:{},分區:{},偏移量:{},消息ID:{},業務ID:{},業務類型:{}",topic, partition, offset, message.getMessageId(), message.getBusinessId(), message.getBusinessType());try {// 處理消息的業務邏輯processMessage(message);// 記錄消費成功軌跡messageTraceService.recordConsumeSuccess(message.getMessageId(), partition, offset);// 手動確認消息acknowledgment.acknowledge();log.info("分區消息處理成功并確認,主題:{},消息ID:{}", topic, message.getMessageId());} catch (Exception e) {// 記錄消費失敗軌跡messageTraceService.recordConsumeFailure(message.getMessageId(), partition, offset, e.getMessage());log.error("分區消息處理失敗,主題:{},消息ID:{}", topic, message.getMessageId(), e);acknowledgment.acknowledge();}}/*** 消費事務消息** @param record 消息記錄* @param acknowledgment 確認對象* @param topic 主題名稱* @param partition 分區號* @param offset 偏移量*/@Transactional(rollbackFor = Exception.class)@KafkaListener(topics = KafkaConstant.TRANSACTIONAL_TOPIC, groupId = KafkaConstant.TRANSACTIONAL_CONSUMER_GROUP)public void consumeTransactionalMessage(ConsumerRecord<String, MessageEntity> record,Acknowledgment acknowledgment,@Header("kafka_receivedTopic") String topic,@Header("kafka_receivedPartitionId") int partition,@Header("kafka_offset") long offset) {MessageEntity message = record.value();Objects.requireNonNull(message, "消息內容不能為空");log.info("接收到事務消息,主題:{},分區:{},偏移量:{},消息ID:{},業務類型:{}",topic, partition, offset, message.getMessageId(), message.getBusinessType());try {// 處理消息的業務邏輯processMessage(message);// 這里可以添加數據庫操作等其他事務操作// 記錄消費成功軌跡messageTraceService.recordConsumeSuccess(message.getMessageId(), partition, offset);// 手動確認消息acknowledgment.acknowledge();log.info("事務消息處理成功并確認,主題:{},消息ID:{}", topic, message.getMessageId());} catch (Exception e) {// 記錄消費失敗軌跡messageTraceService.recordConsumeFailure(message.getMessageId(), partition, offset, e.getMessage());log.error("事務消息處理失敗,主題:{},消息ID:{}", topic, message.getMessageId(), e);// 事務會回滾,消息不會被確認,將被重新消費}}/*** 消費死信消息** @param record 消息記錄* @param acknowledgment 確認對象* @param topic 主題名稱* @param partition 分區號* @param offset 偏移量*/@KafkaListener(topics = KafkaConstant.DEAD_LETTER_TOPIC, groupId = KafkaConstant.DEAD_LETTER_CONSUMER_GROUP)public void consumeDeadLetterMessage(ConsumerRecord<String, MessageEntity> record,Acknowledgment acknowledgment,@Header("kafka_receivedTopic") String topic,@Header("kafka_receivedPartitionId") int partition,@Header("kafka_offset") long offset) {MessageEntity message = record.value();Objects.requireNonNull(message, "消息內容不能為空");log.error("接收到死信消息,主題:{},分區:{},偏移量:{},消息ID:{},業務類型:{}",topic, partition, offset, message.getMessageId(), message.getBusinessType());try {// 處理死信消息的業務邏輯,通常需要人工干預processDeadLetterMessage(message);// 記錄消費成功軌跡messageTraceService.recordConsumeSuccess(message.getMessageId(), partition, offset);// 手動確認消息acknowledgment.acknowledge();log.info("死信消息處理成功并確認,主題:{},消息ID:{}", topic, message.getMessageId());} catch (Exception e) {// 記錄消費失敗軌跡messageTraceService.recordConsumeFailure(message.getMessageId(), partition, offset, e.getMessage());log.error("死信消息處理失敗,主題:{},消息ID:{}", topic, message.getMessageId(), e);acknowledgment.acknowledge();}}/*** 處理消息的業務邏輯** @param message 要處理的消息*/private void processMessage(MessageEntity message) {// 根據業務類型處理不同的消息String businessType = message.getBusinessType();if ("ORDER_CREATE".equals(businessType)) {// 處理訂單創建消息processOrderCreateMessage(message);} else if ("USER_REGISTER".equals(businessType)) {// 處理用戶注冊消息processUserRegisterMessage(message);} else {// 處理未知類型消息log.warn("收到未知類型的消息,消息ID:{},業務類型:{}",message.getMessageId(), businessType);}}/*** 處理死信消息** @param message 死信消息*/private void processDeadLetterMessage(MessageEntity message) {log.info("處理死信消息,消息ID:{},內容:{}",message.getMessageId(), message.getContent());// 實際業務處理邏輯,如記錄到數據庫等待人工處理}/*** 處理訂單創建消息** @param message 訂單創建消息*/private void processOrderCreateMessage(MessageEntity message) {log.info("處理訂單創建消息,消息ID:{},訂單信息:{}",message.getMessageId(), message.getContent());// 實際業務處理邏輯...}/*** 處理用戶注冊消息** @param message 用戶注冊消息*/private void processUserRegisterMessage(MessageEntity message) {log.info("處理用戶注冊消息,消息ID:{},用戶信息:{}",message.getMessageId(), message.getContent());// 實際業務處理邏輯...}
}
3.9 創建消息軌跡服務
為了跟蹤消息的整個生命周期,創建消息軌跡服務:
package com.jam.service;import com.jam.entity.MessageEntity;
import com.jam.entity.MessageTrace;
import com.jam.mapper.MessageTraceMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.Objects;/*** 消息軌跡服務* 記錄消息的發送和消費軌跡** @author 果醬*/
@Slf4j
@Service
@RequiredArgsConstructor
public class MessageTraceService {private final MessageTraceMapper messageTraceMapper;/*** 記錄消息發送前的軌跡** @param message 消息實體* @param topic 主題* @return 消息軌跡ID*/@Transactional(rollbackFor = Exception.class)public Long recordBeforeSend(MessageEntity message, String topic) {Objects.requireNonNull(message, "消息實體不能為空");StringUtils.hasText(message.getMessageId(), "消息ID不能為空");StringUtils.hasText(topic, "主題不能為空");MessageTrace trace = new MessageTrace();trace.setMessageId(message.getMessageId());trace.setTopic(topic);trace.setBusinessType(message.getBusinessType());trace.setBusinessId(message.getBusinessId());trace.setContent(message.getContent());trace.setSendStatus(0); // 待發送trace.setCreateTime(LocalDateTime.now());trace.setUpdateTime(LocalDateTime.now());messageTraceMapper.insert(trace);log.info("記錄消息發送前軌跡,消息ID:{},軌跡ID:{}", message.getMessageId(), trace.getId());return trace.getId();}/*** 記錄消息發送成功的軌跡** @param messageId 消息ID* @param partition 分區* @param offset 偏移量*/@Transactional(rollbackFor = Exception.class)public void recordSendSuccess(String messageId, int partition, long offset) {StringUtils.hasText(messageId, "消息ID不能為空");MessageTrace trace = messageTraceMapper.selectByMessageId(messageId);if (trace == null) {log.warn("未找到消息軌跡,消息ID:{}", messageId);return;}trace.setSendTime(LocalDateTime.now());trace.setSendStatus(1); // 發送成功trace.setPartition(partition);trace.setOffset(offset);trace.setUpdateTime(LocalDateTime.now());messageTraceMapper.updateById(trace);log.info("記錄消息發送成功軌跡,消息ID:{}", messageId);}/*** 記錄消息發送失敗的軌跡** @param messageId 消息ID* @param errorMsg 錯誤信息*/@Transactional(rollbackFor = Exception.class)public void recordSendFailure(String messageId, String errorMsg) {StringUtils.hasText(messageId, "消息ID不能為空");StringUtils.hasText(errorMsg, "錯誤信息不能為空");MessageTrace trace = messageTraceMapper.selectByMessageId(messageId);if (trace == null) {log.warn("未找到消息軌跡,消息ID:{}", messageId);return;}trace.setSendTime(LocalDateTime.now());trace.setSendStatus(2); // 發送失敗trace.setSendErrorMsg(errorMsg);trace.setUpdateTime(LocalDateTime.now());messageTraceMapper.updateById(trace);log.info("記錄消息發送失敗軌跡,消息ID:{}", messageId);}/*** 記錄消息消費成功的軌跡** @param messageId 消息ID* @param partition 分區* @param offset 偏移量*/@Transactional(rollbackFor = Exception.class)public void recordConsumeSuccess(String messageId, int partition, long offset) {StringUtils.hasText(messageId, "消息ID不能為空");MessageTrace trace = messageTraceMapper.selectByMessageId(messageId);if (trace == null) {log.warn("未找到消息軌跡,消息ID:{}", messageId);return;}trace.setConsumeTime(LocalDateTime.now());trace.setConsumeStatus(1); // 消費成功trace.setConsumePartition(partition);trace.setConsumeOffset(offset);trace.setUpdateTime(LocalDateTime.now());messageTraceMapper.updateById(trace);log.info("記錄消息消費成功軌跡,消息ID:{}", messageId);}/*** 記錄消息消費失敗的軌跡** @param messageId 消息ID* @param partition 分區* @param offset 偏移量* @param errorMsg 錯誤信息*/@Transactional(rollbackFor = Exception.class)public void recordConsumeFailure(String messageId, int partition, long offset, String errorMsg) {StringUtils.hasText(messageId, "消息ID不能為空");StringUtils.hasText(errorMsg, "錯誤信息不能為空");MessageTrace trace = messageTraceMapper.selectByMessageId(messageId);if (trace == null) {log.warn("未找到消息軌跡,消息ID:{}", messageId);return;}trace.setConsumeTime(LocalDateTime.now());trace.setConsumeStatus(2); // 消費失敗trace.setConsumePartition(partition);trace.setConsumeOffset(offset);trace.setConsumeErrorMsg(errorMsg);trace.setUpdateTime(LocalDateTime.now());messageTraceMapper.updateById(trace);log.info("記錄消息消費失敗軌跡,消息ID:{}", messageId);}
}
3.10 創建控制器
創建一個控制器,用于測試消息發送功能:
package com.jam.controller;import com.jam.config.KafkaConstant;
import com.jam.entity.MessageEntity;
import com.jam.service.KafkaProducerService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;/*** Kafka消息測試控制器* 提供API接口用于測試Kafka消息發送功能** @author 果醬*/
@Slf4j
@RestController
@RequestMapping("/api/kafka")
@RequiredArgsConstructor
@Tag(name = "Kafka消息測試接口", description = "用于測試Kafka消息發送的API接口")
public class KafkaMessageController {/*** Kafka消息生產者服務*/private final KafkaProducerService kafkaProducerService;/*** 發送普通消息** @param content 消息內容* @param businessType 業務類型* @param businessId 業務ID* @param extra 額外信息* @return 響應信息*/@PostMapping("/normal")@Operation(summary = "發送普通消息", description = "發送到普通主題的消息")public ResponseEntity<String> sendNormalMessage(@Parameter(description = "消息內容", required = true)@RequestParam String content,@Parameter(description = "業務類型")@RequestParam(required = false) String businessType,@Parameter(description = "業務ID")@RequestParam(required = false) String businessId,@Parameter(description = "額外信息")@RequestParam(required = false) String extra) {log.info("接收到發送普通消息請求");MessageEntity message = kafkaProducerService.createMessageEntity(content, businessType, businessId, extra);kafkaProducerService.sendMessage(KafkaConstant.NORMAL_TOPIC, message);return ResponseEntity.ok("普通消息發送成功,消息ID:" + message.getMessageId());}/*** 發送分區消息** @param content 消息內容* @param businessType 業務類型* @param businessId 業務ID(用于分區)* @param extra 額外信息* @return 響應信息*/@PostMapping("/partition")@Operation(summary = "發送分區消息", description = "發送到分區主題的消息,相同業務ID的消息會被發送到同一分區")public ResponseEntity<String> sendPartitionMessage(@Parameter(description = "消息內容", required = true)@RequestParam String content,@Parameter(description = "業務類型")@RequestParam(required = false) String businessType,@Parameter(description = "業務ID(用于分區)", required = true)@RequestParam String businessId,@Parameter(description = "額外信息")@RequestParam(required = false) String extra) {log.info("接收到發送分區消息請求,業務ID:{}", businessId);MessageEntity message = kafkaProducerService.createMessageEntity(content, businessType, businessId, extra);kafkaProducerService.sendPartitionMessage(message);return ResponseEntity.ok("分區消息發送成功,消息ID:" + message.getMessageId());}/*** 發送事務消息** @param content 消息內容* @param businessType 業務類型* @param businessId 業務ID* @param extra 額外信息* @return 響應信息*/@PostMapping("/transactional")@Operation(summary = "發送事務消息", description = "發送到事務主題的消息,支持事務特性")public ResponseEntity<String> sendTransactionalMessage(@Parameter(description = "消息內容", required = true)@RequestParam String content,@Parameter(description = "業務類型")@RequestParam(required = false) String businessType,@Parameter(description = "業務ID")@RequestParam(required = false) String businessId,@Parameter(description = "額外信息")@RequestParam(required = false) String extra) {log.info("接收到發送事務消息請求");MessageEntity message = kafkaProducerService.createMessageEntity(content, businessType, businessId, extra);kafkaProducerService.sendTransactionalMessage(message);return ResponseEntity.ok("事務消息發送成功,消息ID:" + message.getMessageId());}
}
3.11 創建啟動類
package com.jam;import io.swagger.v3.oas.annotations.OpenAPIDefinition;
import io.swagger.v3.oas.annotations.info.Info;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** SpringBoot應用啟動類** @author 果醬*/
@SpringBootApplication
@MapperScan("com.jam.mapper")
@OpenAPIDefinition(info = @Info(title = "SpringBoot集成Kafka示例項目",version = "1.0",description = "SpringBoot集成Kafka的示例項目,包含各種消息發送和消費的示例")
)
public class SpringbootKafkaDemoApplication {public static void main(String[] args) {SpringApplication.run(SpringbootKafkaDemoApplication.class, args);}
}
3.12 創建消息軌跡相關實體和數據庫表
消息軌跡實體類:
package com.jam.entity;import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.time.LocalDateTime;/*** 消息軌跡實體類* 記錄Kafka消息的發送和消費情況** @author 果醬*/
@Data
@TableName("t_message_trace")
public class MessageTrace {/*** 主鍵ID*/@TableId(type = IdType.AUTO)private Long id;/*** 消息ID*/private String messageId;/*** 主題*/private String topic;/*** 分區*/private Integer partition;/*** 偏移量*/private Long offset;/*** 業務類型*/private String businessType;/*** 業務ID*/private String businessId;/*** 消息內容*/private String content;/*** 發送時間*/private LocalDateTime sendTime;/*** 發送狀態:0-待發送,1-發送成功,2-發送失敗*/private Integer sendStatus;/*** 發送錯誤信息*/private String sendErrorMsg;/*** 消費時間*/private LocalDateTime consumeTime;/*** 消費分區*/private Integer consumePartition;/*** 消費偏移量*/private Long consumeOffset;/*** 消費狀態:0-待消費,1-消費成功,2-消費失敗*/private Integer consumeStatus;/*** 消費錯誤信息*/private String consumeErrorMsg;/*** 創建時間*/private LocalDateTime createTime;/*** 更新時間*/private LocalDateTime updateTime;
}
消息軌跡 Mapper 接口:
package com.jam.mapper;import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.entity.MessageTrace;
import org.apache.ibatis.annotations.Param;/*** 消息軌跡Mapper** @author 果醬*/
public interface MessageTraceMapper extends BaseMapper<MessageTrace> {/*** 根據消息ID查詢消息軌跡** @param messageId 消息ID* @return 消息軌跡信息*/MessageTrace selectByMessageId(@Param("messageId") String messageId);
}
消息軌跡 Mapper XML 文件(resources/mapper/MessageTraceMapper.xml):
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.jam.mapper.MessageTraceMapper"><select id="selectByMessageId" parameterType="java.lang.String" resultType="com.jam.entity.MessageTrace">SELECT * FROM t_message_trace WHERE message_id = #{messageId}</select>
</mapper>
創建消息軌跡表的 SQL:
CREATE TABLE `t_message_trace` (`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主鍵ID',`message_id` varchar(64) NOT NULL COMMENT '消息ID',`topic` varchar(128) NOT NULL COMMENT '主題',`partition` int DEFAULT NULL COMMENT '分區',`offset` bigint DEFAULT NULL COMMENT '偏移量',`business_type` varchar(64) DEFAULT NULL COMMENT '業務類型',`business_id` varchar(64) DEFAULT NULL COMMENT '業務ID',`content` text COMMENT '消息內容',`send_time` datetime DEFAULT NULL COMMENT '發送時間',`send_status` tinyint DEFAULT NULL COMMENT '發送狀態:0-待發送,1-發送成功,2-發送失敗',`send_error_msg` text COMMENT '發送錯誤信息',`consume_time` datetime DEFAULT NULL COMMENT '消費時間',`consume_partition` int DEFAULT NULL COMMENT '消費分區',`consume_offset` bigint DEFAULT NULL COMMENT '消費偏移量',`consume_status` tinyint DEFAULT NULL COMMENT '消費狀態:0-待消費,1-消費成功,2-消費失敗',`consume_error_msg` text COMMENT '消費錯誤信息',`create_time` datetime NOT NULL COMMENT '創建時間',`update_time` datetime NOT NULL COMMENT '更新時間',PRIMARY KEY (`id`),UNIQUE KEY `uk_message_id` (`message_id`),KEY `idx_topic` (`topic`),KEY `idx_business_type` (`business_type`),KEY `idx_business_id` (`business_id`),KEY `idx_send_status` (`send_status`),KEY `idx_consume_status` (`consume_status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='Kafka消息軌跡表';
3.13 測試消息發送與消費
啟動應用程序后,可以通過以下方式測試消息發送與消費:
- 使用 Swagger UI 測試:訪問http://localhost:8081/swagger-ui.html,通過界面調用消息發送接口
- 使用 curl 命令測試:
# 發送普通消息
curl -X POST "http://localhost:8081/api/kafka/normal?content=Hello Kafka&businessType=TEST"# 發送分區消息
curl -X POST "http://localhost:8081/api/kafka/partition?content=Hello Partition&businessType=TEST&businessId=BUS123456"# 發送事務消息
curl -X POST "http://localhost:8081/api/kafka/transactional?content=Hello Transaction&businessType=TEST"
發送消息后,可以在控制臺看到生產者和消費者的日志輸出,證明消息已經成功發送和消費。
四、Kafka 高級特性
4.1 消息確認機制
Kafka 提供了靈活的消息確認機制,確保消息的可靠傳遞。
-
生產者確認機制:
通過 acks 參數控制生產者需要等待的確認數量:- acks=0:生產者不等待任何確認,直接發送下一條消息
- acks=1:生產者等待 leader 分區確認收到消息
- acks=all:生產者等待所有同步副本確認收到消息
-
消費者確認機制:
通過 ack-mode 參數控制消費者何時確認消息:- auto:自動確認,消費者收到消息后立即確認
- manual:手動確認,消費者處理完消息后調用 acknowledge () 方法確認
- manual_immediate:手動確認,確認后立即提交偏移量
消息確認流程:
根據 Kafka 官方文檔(Apache Kafka),對于需要高可靠性的場景,推薦使用 acks=all 和 manual 確認模式。
4.2 事務消息
Kafka 從 0.11 版本開始支持事務消息,確保消息的原子性:要么所有消息都被成功發送,要么都失敗。
事務消息的工作流程:
在前面的代碼中,我們已經實現了事務消息的發送:
- 配置了事務生產者工廠和事務 Kafka 模板
- 使用 @Transactional 注解或 executeInTransaction 方法開啟事務
- 在事務中可以混合發送消息和數據庫操作等
4.3 死信隊列
死信隊列(Dead Letter Queue)用于存儲無法被正常消費的消息。在 Kafka 中,可以通過以下方式實現死信隊列:
- 配置死信主題和死信消費者
- 在消費失敗時,手動將消息發送到死信主題
- 死信消費者專門處理死信消息
死信隊列的工作流程:
實現死信消息轉發的代碼示例:
/*** 轉發消息到死信隊列** @param message 消息實體* @param topic 原主題* @param partition 原分區* @param offset 原偏移量* @param errorMsg 錯誤信息*/
public void forwardToDeadLetterQueue(MessageEntity message, String topic, int partition, long offset, String errorMsg) {Objects.requireNonNull(message, "消息實體不能為空");StringUtils.hasText(topic, "主題不能為空");StringUtils.hasText(errorMsg, "錯誤信息不能為空");log.warn("將消息轉發到死信隊列,原主題:{},消息ID:{},錯誤信息:{}",topic, message.getMessageId(), errorMsg);// 創建死信消息,添加原消息的元數據MessageEntity deadLetterMessage = new MessageEntity();deadLetterMessage.setMessageId(UUID.randomUUID().toString());deadLetterMessage.setContent(JSON.toJSONString(message));deadLetterMessage.setBusinessType("DEAD_LETTER");deadLetterMessage.setBusinessId(message.getMessageId());deadLetterMessage.setCreateTime(LocalDateTime.now());deadLetterMessage.setExtra(String.format("原主題:%s,原分區:%d,原偏移量:%d,錯誤信息:%s",topic, partition, offset, errorMsg));// 發送到死信主題kafkaTemplate.send(KafkaConstant.DEAD_LETTER_TOPIC, deadLetterMessage.getMessageId(), deadLetterMessage);
}
4.4 消息冪等性
在分布式系統中,消息重復消費是不可避免的問題,因此需要保證消息消費的冪等性。常用的實現方式有:
- 基于數據庫唯一索引:
/*** 處理消息(冪等性保證)** @param message 消息實體*/
@Transactional(rollbackFor = Exception.class)
public void processMessageWithIdempotency(MessageEntity message) {String messageId = message.getMessageId();String businessType = message.getBusinessType();// 檢查消息是否已經處理過MessageTrace trace = messageTraceMapper.selectByMessageId(messageId);if (trace != null && trace.getConsumeStatus() == 1) {log.info("消息已經處理過,消息ID:{}", messageId);return;}// 根據業務類型處理不同的消息if ("ORDER_CREATE".equals(businessType)) {// 處理訂單創建消息,使用訂單號作為唯一鍵String orderNo = message.getExtra();// 檢查訂單是否已經處理Order order = orderMapper.selectByOrderNo(orderNo);if (order != null) {log.info("訂單已經處理過,訂單號:{}", orderNo);return;}// 處理訂單業務邏輯// ...} else if ("USER_REGISTER".equals(businessType)) {// 處理用戶注冊消息,使用用戶ID作為唯一鍵// ...}
}
- 基于 Redis 的分布式鎖:
/*** 使用Redis分布式鎖保證冪等性** @param message 消息實體*/
public void processMessageWithRedisLock(MessageEntity message) {String messageId = message.getMessageId();String lockKey = "kafka:message:process:" + messageId;// 獲取分布式鎖,設置5分鐘過期時間Boolean locked = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 5, TimeUnit.MINUTES);if (Boolean.TRUE.equals(locked)) {try {// 檢查消息是否已經處理過MessageTrace trace = messageTraceMapper.selectByMessageId(messageId);if (trace != null && trace.getConsumeStatus() == 1) {log.info("消息已經處理過,消息ID:{}", messageId);return;}// 處理消息業務邏輯processMessage(message);} finally {// 釋放鎖redisTemplate.delete(lockKey);}} else {log.info("消息正在處理中,消息ID:{}", messageId);}
}
五、Kafka 性能調優
為了讓 Kafka 在生產環境中發揮最佳性能,我們需要進行合理的調優。以下是一些關鍵的調優方向:
5.1 服務器調優
-
JVM 參數調優:
根據服務器內存大小合理配置 JVM 參數# 在kafka-server-start.sh中設置 export KAFKA_HEAP_OPTS="-Xms8g -Xmx8g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M"
-
操作系統調優:
- 增加文件描述符限制
?# 在/etc/security/limits.conf中添加 * soft nofile 1000000 * hard nofile 1000000
- 調整網絡參數
# 在/etc/sysctl.conf中添加 net.core.rmem_default=134217728 net.core.rmem_max=134217728 net.core.wmem_default=134217728 net.core.wmem_max=134217728 net.ipv4.tcp_wmem=134217728 134217728 134217728 net.ipv4.tcp_rmem=134217728 134217728 134217728 net.ipv4.tcp_max_syn_backlog=8192 net.core.netdev_max_backlog=16384
-
Kafka 配置調優:
# server.properties # 日志刷新策略 log.flush.interval.messages=10000 log.flush.interval.ms=1000# 日志保留策略 log.retention.hours=72 log.retention.bytes=107374182400# 分區大小限制 log.segment.bytes=1073741824# I/O線程數 num.io.threads=8# 網絡線程數 num.network.threads=3# 分區副本同步線程數 num.replica.fetchers=2# 副本滯后閾值 replica.lag.time.max.ms=30000
5.2 生產者調優
-
批量發送:
配置合理的批次大小和 linger.ms 參數,實現批量發送spring:kafka:producer:# 批次大小,當批次滿了之后才會發送batch-size: 16384# linger.ms參數,即使批次未滿,達到該時間也會發送properties:linger.ms: 5
-
壓縮消息:
啟用消息壓縮,減少網絡傳輸和存儲開銷spring:kafka:producer:# 啟用消息壓縮,可選值:none, gzip, snappy, lz4, zstdproperties:compression.type: lz4
-
異步發送:
使用異步發送提高吞吐量,避免阻塞主線程 -
自定義分區策略:
根據業務特點實現自定義分區策略,均衡分區負載
5.3 消費者調優
-
消費線程池配置:
根據分區數量配置合理的消費者線程數spring:kafka:listener:# 并發消費者數量,建議等于分區數量concurrency: 3# 每次拉取的記錄數consumer:max-poll-records: 500
-
批量消費:
開啟批量消費提高消費效率
?spring:kafka:listener:# 開啟批量消費batch-listener: trueconsumer:# 批量消費需要設置為falseenable-auto-commit: false# 每次拉取的最大記錄數properties:max.poll.records: 500
批量消費代碼示例:
/*** 批量消費消息*/ @KafkaListener(topics = KafkaConstant.NORMAL_TOPIC, groupId = KafkaConstant.NORMAL_CONSUMER_GROUP) public void batchConsume(List<ConsumerRecord<String, MessageEntity>> records,Acknowledgment acknowledgment) {log.info("接收到批量消息,數量:{}", records.size());for (ConsumerRecord<String, MessageEntity> record : records) {MessageEntity message = record.value();if (message == null) {continue;}try {log.info("處理批量消息,主題:{},分區:{},偏移量:{},消息ID:{}",record.topic(), record.partition(), record.offset(), message.getMessageId());// 處理消息的業務邏輯processMessage(message);// 記錄消費成功軌跡messageTraceService.recordConsumeSuccess(message.getMessageId(),record.partition(), record.offset());} catch (Exception e) {// 記錄消費失敗軌跡messageTraceService.recordConsumeFailure(message.getMessageId(),record.partition(), record.offset(), e.getMessage());log.error("批量消息處理失敗,消息ID:{}", message.getMessageId(), e);// 轉發到死信隊列forwardToDeadLetterQueue(message, record.topic(), record.partition(), record.offset(), e.getMessage());}}// 手動確認所有消息acknowledgment.acknowledge();log.info("批量消息處理完成,數量:{}", records.size()); }
-
異步處理:
消費者接收到消息后,將消息放入線程池異步處理,快速確認消息,提高消費效率
5.4 主題和分區調優
-
合理設置分區數量:
分區數量是影響 Kafka 吞吐量的關鍵因素,一般建議:- 每個主題的分區數量 = 預期吞吐量 / 單分區吞吐量
- 單分區吞吐量:生產者約 500-1000 條 / 秒,消費者約 1000-2000 條 / 秒
-
合理設置副本數量:
- 副本數量越多,可靠性越高,但會降低吞吐量
- 生產環境建議設置為 2-3 個副本
-
清理策略:
根據業務需求設置合理的日志清理策略:- 按時間清理:log.retention.hours
- 按大小清理:log.retention.bytes
六、常見問題與解決方案
6.1 消息丟失問題
消息丟失可能發生在三個階段:生產階段、存儲階段和消費階段。
-
生產階段丟失:
- 解決方案:設置 acks=all,確保所有副本都收到消息
spring:kafka:producer:acks: allretries: 3
-
存儲階段丟失:
- 解決方案:設置合理的副本數量和同步策略
# server.properties # 最小同步副本數,應小于等于副本數 min.insync.replicas=2
-
消費階段丟失:
- 解決方案:使用手動確認模式,確保消息處理完成后再確認
spring:kafka:listener:ack-mode: manual_immediate
6.2 消息積壓問題
消息積壓通常是因為消費速度跟不上生產速度,解決方案如下:
-
優化消費邏輯:
- 減少單次消息處理時間
- 異步處理非關鍵流程
-
增加消費者數量:
- 水平擴展消費者實例
- 確保消費者數量不超過分區數量
-
臨時擴容:
- 對于突發流量,可以臨時啟動更多的消費者實例
-
消息遷移:
- 創建新的主題和消費者組,將積壓的消息遷移到新主題
/*** 遷移積壓消息*/ @Scheduled(fixedRate = 60000) public void migrateBacklogMessages() {String sourceTopic = "source_topic";String targetTopic = "backlog_topic";String consumerGroup = "backlog_migrate_group";log.info("開始遷移積壓消息,源主題:{},目標主題:{}", sourceTopic, targetTopic);// 創建臨時消費者DefaultKafkaConsumerFactory<String, MessageEntity> consumerFactory = new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());try (KafkaConsumer<String, MessageEntity> consumer = (KafkaConsumer<String, MessageEntity>) consumerFactory.createConsumer(consumerGroup, new DefaultPrincipal("migrate-service"))) {// 訂閱源主題consumer.subscribe(Collections.singleton(sourceTopic));// 從最早的偏移量開始消費consumer.seekToBeginning(consumer.assignment());while (true) {ConsumerRecords<String, MessageEntity> records = consumer.poll(Duration.ofMillis(1000));if (records.isEmpty()) {break;}// 批量發送到目標主題List<ProducerRecord<String, MessageEntity>> producerRecords = new ArrayList<>();for (ConsumerRecord<String, MessageEntity> record : records) {producerRecords.add(new ProducerRecord<>(targetTopic, record.key(), record.value()));}// 批量發送kafkaTemplate.send(producerRecords);log.info("已遷移消息:{}條", producerRecords.size());// 手動提交偏移量consumer.commitSync();// 控制遷移速度,避免影響正常業務Thread.sleep(100);}} catch (Exception e) {log.error("遷移積壓消息失敗", e);}log.info("積壓消息遷移完成"); }
-
監控告警:
- 配置消息積壓監控和告警,及時發現問題
/*** 消息積壓監控*/ @Scheduled(fixedRate = 60000) // 每分鐘檢查一次 public void monitorMessageBacklog() {// 監控的主題和消費者組Map<String, String> monitorTopics = new HashMap<>();monitorTopics.put(KafkaConstant.NORMAL_TOPIC, KafkaConstant.NORMAL_CONSUMER_GROUP);monitorTopics.put(KafkaConstant.PARTITION_TOPIC, KafkaConstant.PARTITION_CONSUMER_GROUP);// 獲取KafkaAdminClienttry (AdminClient adminClient = AdminClient.create(kafkaProperties.buildAdminProperties())) {for (Map.Entry<String, String> entry : monitorTopics.entrySet()) {String topic = entry.getKey();String consumerGroup = entry.getValue();// 獲取消費者組的偏移量Map<TopicPartition, OffsetAndMetadata> committedOffsets = adminClient.listConsumerGroupOffsets(consumerGroup).partitionsToOffsetAndMetadata().get();// 獲取主題的最新偏移量Map<TopicPartition, Long> endOffsets = adminClient.listOffsets(committedOffsets.keySet()).all().get();// 計算每個分區的積壓數量for (Map.Entry<TopicPartition, OffsetAndMetadata> offsetEntry : committedOffsets.entrySet()) {TopicPartition topicPartition = offsetEntry.getKey();long consumerOffset = offsetEntry.getValue().offset();long endOffset = endOffsets.get(topicPartition);long backlog = endOffset - consumerOffset;log.info("主題:{},分區:{},積壓消息數:{}", topic, topicPartition.partition(), backlog);// 如果積壓數量超過閾值,發送告警if (backlog > 10000) {log.warn("主題消息積壓嚴重,主題:{},分區:{},積壓消息數:{}", topic, topicPartition.partition(), backlog);// 發送告警通知(郵件、短信等)alertService.sendAlert("Kafka消息積壓告警", String.format("主題:%s,分區:%d,積壓消息數:%d", topic, topicPartition.partition(), backlog));}}}} catch (Exception e) {log.error("消息積壓監控失敗", e);} }
6.3 消息順序性問題
Kafka 中,單個分區的消息是有序的,但跨分區的消息無法保證順序。確保消息順序性的解決方案如下:
-
單分區:
- 所有消息都發送到同一個分區,保證全局有序
- 缺點:無法利用多分區的并行處理能力,吞吐量受限
-
按業務 ID 分區:
- 相同業務 ID 的消息發送到同一個分區,保證局部有序
- 優點:兼顧順序性和吞吐量
// 如前面實現的BusinessIdPartitioner
-
使用狀態機:
- 對于需要全局有序的場景,可以在消費端實現狀態機,處理亂序消息
七、總結
本文詳細介紹了 SpringBoot 集成 Kafka 的全過程,從基礎概念到高級特性,從代碼實現到性能調優,涵蓋了實際開發中可能遇到的各種場景。
Kafka 作為一款高性能的分布式消息系統,在大數據領域和實時流處理場景中有著廣泛的應用。合理使用 Kafka 可以幫助我們構建高吞吐、高可靠的分布式系統。
八、參考
- Kafka 核心概念與架構:參考 Kafka 官方文檔(Apache Kafka)
- SpringBoot 集成 Kafka:參考 Spring Kafka 官方文檔(Overview :: Spring Kafka)
- 消息確認機制:參考 Kafka 官方文檔的 "Producer Configs" 和 "Consumer Configs" 章節
- 事務消息:參考 Kafka 官方文檔的 "Transactions" 章節(Apache Kafka)
- 性能調優參數:參考 Kafka 官方文檔的 "Performance Tuning" 章節(Apache Kafka)
- 消息冪等性解決方案:參考 Spring 官方博客和《Kafka 權威指南》一書
- 消息丟失與積壓解決方案:參考 Kafka 官方文檔和 Confluent 博客(Confluent Blog | Tutorials, Tips, and News Updates)