這里將生產者和消費者放在一個應用中
使用的Boot3.4.3
引入Kafka依賴
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId> </dependency>
yml配置
spring:application:name: kafka-1#kafka連接地址kafka:bootstrap-servers: 127.0.0.1:9092#配置生產者producer:#消息發送失敗重試次數retries: 0#一個批次可以使用內存的大小batch-size: 16384#一個批次消息數量buffer-memory: 33554432#鍵的序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializer#值的序列化方式value-serializer: org.apache.kafka.common.serialization.StringSerializeracks: allconsumer:#是否自動提交enable-auto-commit: false#自動提交的頻率auto-commit-interval: 1000#earliest 從分區的最早偏移量開始消費 需要消費所有歷史消息 latest 從分區的最新偏移量開始消費,忽略歷史消息 只關心新消息#none 如果沒有有效的偏移量,拋出異常 嚴格要求偏移量必須存在#exception spring-kafka不支持auto-offset-reset: latestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:#用于配置消費者如何處理消息的確認 ack配置方式 這里指定由消費者手動提交偏移量#Acknowledgment.acknowledge() 方法來提交偏移量ack-mode: MANUAL_IMMEDIATEconcurrency: 4
test-1: group-1
test-2: group-2
test-3: group-3server:port: 8099
生產者示例,一般可能是一個MQTT接收消息入口
package com.hrui.kafka1.producer;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;/*** @author hrui* @date 2025/3/10 14:56*/
@RestController
public class EventProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@RequestMapping("/sendMessage")public String sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);return "Message sent to topic '" + topic + "': " + message;}@RequestMapping("/sendMessage2")public String sendMessage2() {//通過構建器模式創建Message對象Message<String> message = MessageBuilder.withPayload("Hello, Kafka!").setHeader(KafkaHeaders.TOPIC, "ceshi").build();kafkaTemplate.send(message);return "Message sent to topic";}}
消費者示例
注意:如果配置了手動提交ack,那么
主要目的不僅僅是避免重復消費,而是為了確保消息的可靠處理和偏移量(offset)的正確提交。它可以避免重復消費,但更重要的是保證消息不會丟失,并且在消息處理失敗時能夠重新消費。
package com.hrui.kafka1.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.boot.autoconfigure.jms.AcknowledgeMode;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;/*** @author hrui* @date 2025/3/10 15:57*/
@Component
public class EventConsumer {@KafkaListener(topics = {"ceshi"},groupId = "#{'${test-1}'}")public void onMessage(ConsumerRecord<String,String> message){System.out.println("接收到消息1:"+message.value());}@KafkaListener(topics = {"ceshi"},groupId = "#{'${test-2}'}")public void onMessage(String message){System.out.println("接收到消息2:"+message);}@KafkaListener(topics = {"ceshi"}, groupId = "#{'${test-3}'}")public void onMessage(ConsumerRecord<String, String> message, Acknowledgment ack,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(KafkaHeaders.GROUP_ID) String groupId) {try {System.out.println("接收到消息3:" + message + ", ack:" + ack + ", topic:" + topic + ", groupId:" + groupId);// 處理消息邏輯// ...} catch (Exception e) {// 處理異常,記錄日志System.err.println("處理消息失敗: " + e.getMessage());// 可以根據業務需求決定是否重新拋出異常}finally {// 手動提交偏移量ack.acknowledge();}}
}
生產者可選擇異步或者同步發送消息
生產者發送消息有同步異步之說 那么消費者在消費消息時候 有沒有同步異步之說呢???
在 Kafka 消費者中,消費消息的方式本質上是由 Kafka 的設計決定的,而不是由消費者代碼顯式控制的。Kafka 消費者在消費消息時,通常是以拉取(poll)的方式從 Kafka 服務器獲取消息,然后處理這些消息。從這個角度來看,消費者的消費行為是同步的,因為消費者需要主動調用?poll
?方法來獲取消息。
然而,消費者的消息處理邏輯可以是同步或異步的,具體取決于業務實現。以下是對消費者消費消息的同步和異步行為的詳細分析:
?消費者的同步消費
在默認情況下,Kafka 消費者的消費行為是同步的,即:
-
消費者通過?
poll
?方法從 Kafka 拉取一批消息。 -
消費者逐條處理這些消息。
-
每條消息處理完成后,消費者提交偏移量(offset)。
-
消費者繼續調用?
poll
?方法獲取下一批消息。
特點:
-
消息處理是順序的,即一條消息處理完成后才會處理下一條消息。
-
如果某條消息處理時間較長,會影響后續消息的處理速度。
-
適合消息處理邏輯簡單、處理時間較短的場景。
@KafkaListener(topics = {"ceshi"}, groupId = "#{'${test-3}'}")
public void onMessage(ConsumerRecord<String, String> message, Acknowledgment ack) {try {System.out.println("接收到消息:" + message.value());// 同步處理消息邏輯processMessage(message);} catch (Exception e) {System.err.println("處理消息失敗: " + e.getMessage());} finally {ack.acknowledge(); // 手動提交偏移量}
}private void processMessage(ConsumerRecord<String, String> message) {// 模擬消息處理邏輯try {Thread.sleep(1000); // 假設處理一條消息需要 1 秒} catch (InterruptedException e) {Thread.currentThread().interrupt();}
}
2. 消費者的異步消費
在某些場景下,消費者可能需要以異步的方式處理消息,即:
-
消費者通過?
poll
?方法拉取一批消息。 -
將每條消息提交到一個線程池或異步任務中處理。
-
消費者繼續調用?
poll
?方法獲取下一批消息,而不等待上一條消息處理完成。
特點:
-
消息處理是并發的,可以提高消息處理的吞吐量。
-
需要額外的線程池或異步任務管理機制。
-
適合消息處理邏輯復雜、處理時間較長的場景。
示例代碼:
@Autowired
private ExecutorService executorService; // 注入線程池@KafkaListener(topics = {"ceshi"}, groupId = "#{'${test-3}'}")
public void onMessage(ConsumerRecord<String, String> message, Acknowledgment ack) {if (!StringUtils.hasText(message.value())) {ack.acknowledge();return;}// 提交異步任務處理消息executorService.submit(() -> {try {System.out.println("接收到消息:" + message.value());processMessage(message); // 異步處理消息} catch (Exception e) {System.err.println("處理消息失敗: " + e.getMessage());} finally {ack.acknowledge(); // 手動提交偏移量}});
}private void processMessage(ConsumerRecord<String, String> message) {// 模擬消息處理邏輯try {Thread.sleep(1000); // 假設處理一條消息需要 1 秒} catch (InterruptedException e) {Thread.currentThread().interrupt();}
}
同步代碼示例
@RequestMapping("/sendMessage2")public String sendMessage2(){//通過構建器模式創建Message對象Message<String> message = MessageBuilder.withPayload("Hello, Kafka!").setHeader(KafkaHeaders.TOPIC, "ceshi").build();CompletableFuture<SendResult<String, String>> send = kafkaTemplate.send(message);try {//阻塞等待拿結果SendResult<String, String> sendResult = send.get();System.out.println("說明消息發送成功,如果不成功會拋出異常");} catch (Exception e) {throw new RuntimeException(e);}return "Message sent to topic";}
異步注冊回調的方式
@RequestMapping("/sendMessage2")public String sendMessage2(){//通過構建器模式創建Message對象Message<String> message = MessageBuilder.withPayload("Hello, Kafka!").setHeader(KafkaHeaders.TOPIC, "ceshi").build();CompletableFuture<SendResult<String, String>> send = kafkaTemplate.send(message);//非阻塞 異步 注冊回調異步通知send.thenAccept(result -> {System.out.println("消息發送成功");}).exceptionally(e->{System.out.println("發送失敗");e.printStackTrace();return null;});return "Message sent to topic";}
如果需要發送的不是String類型?
那么要發送的不是String類型
KafkaTemplate<String,Object> kafkaTemplate;
一般來說可以專成JSON字符串發送
在引入spring-kafka的時候? ? ?KafkaAutoConfiguration中? 配置了KafkaTemplate
Kafka<Object,Object>
如果需要用KafkaTemplate發送對象的時候
默認用的String序列化? ?會報錯? ?除非將對象轉為JSON字符串(一般可以這么做)
如果用對象的話? ?改成JsonSerializer? 這樣自動轉JSON字符串