SpringBoot基礎Kafka示例

這里將生產者和消費者放在一個應用中

使用的Boot3.4.3

引入Kafka依賴

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

yml配置


spring:application:name: kafka-1#kafka連接地址kafka:bootstrap-servers: 127.0.0.1:9092#配置生產者producer:#消息發送失敗重試次數retries: 0#一個批次可以使用內存的大小batch-size: 16384#一個批次消息數量buffer-memory: 33554432#鍵的序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializer#值的序列化方式value-serializer: org.apache.kafka.common.serialization.StringSerializeracks: allconsumer:#是否自動提交enable-auto-commit: false#自動提交的頻率auto-commit-interval: 1000#earliest	從分區的最早偏移量開始消費	需要消費所有歷史消息  latest	從分區的最新偏移量開始消費,忽略歷史消息	只關心新消息#none	如果沒有有效的偏移量,拋出異常	嚴格要求偏移量必須存在#exception spring-kafka不支持auto-offset-reset: latestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:#用于配置消費者如何處理消息的確認  ack配置方式  這里指定由消費者手動提交偏移量#Acknowledgment.acknowledge() 方法來提交偏移量ack-mode: MANUAL_IMMEDIATEconcurrency: 4
test-1: group-1
test-2: group-2
test-3: group-3server:port: 8099

生產者示例,一般可能是一個MQTT接收消息入口

package com.hrui.kafka1.producer;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;/*** @author hrui* @date 2025/3/10 14:56*/
@RestController
public class EventProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@RequestMapping("/sendMessage")public String sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);return "Message sent to topic '" + topic + "': " + message;}@RequestMapping("/sendMessage2")public String sendMessage2() {//通過構建器模式創建Message對象Message<String> message = MessageBuilder.withPayload("Hello, Kafka!").setHeader(KafkaHeaders.TOPIC, "ceshi").build();kafkaTemplate.send(message);return "Message sent to topic";}}

消費者示例

注意:如果配置了手動提交ack,那么

主要目的不僅僅是避免重復消費,而是為了確保消息的可靠處理和偏移量(offset)的正確提交。它可以避免重復消費,但更重要的是保證消息不會丟失,并且在消息處理失敗時能夠重新消費。

package com.hrui.kafka1.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.boot.autoconfigure.jms.AcknowledgeMode;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;/*** @author hrui* @date 2025/3/10 15:57*/
@Component
public class EventConsumer {@KafkaListener(topics = {"ceshi"},groupId = "#{'${test-1}'}")public void onMessage(ConsumerRecord<String,String> message){System.out.println("接收到消息1:"+message.value());}@KafkaListener(topics = {"ceshi"},groupId = "#{'${test-2}'}")public void onMessage(String message){System.out.println("接收到消息2:"+message);}@KafkaListener(topics = {"ceshi"}, groupId = "#{'${test-3}'}")public void onMessage(ConsumerRecord<String, String> message, Acknowledgment ack,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(KafkaHeaders.GROUP_ID) String groupId) {try {System.out.println("接收到消息3:" + message + ", ack:" + ack + ", topic:" + topic + ", groupId:" + groupId);// 處理消息邏輯// ...} catch (Exception e) {// 處理異常,記錄日志System.err.println("處理消息失敗: " + e.getMessage());// 可以根據業務需求決定是否重新拋出異常}finally {// 手動提交偏移量ack.acknowledge();}}
}

生產者可選擇異步或者同步發送消息

生產者發送消息有同步異步之說 那么消費者在消費消息時候 有沒有同步異步之說呢???

在 Kafka 消費者中,消費消息的方式本質上是由 Kafka 的設計決定的,而不是由消費者代碼顯式控制的。Kafka 消費者在消費消息時,通常是以拉取(poll)的方式從 Kafka 服務器獲取消息,然后處理這些消息。從這個角度來看,消費者的消費行為是同步的,因為消費者需要主動調用?poll?方法來獲取消息。

然而,消費者的消息處理邏輯可以是同步異步的,具體取決于業務實現。以下是對消費者消費消息的同步和異步行為的詳細分析:

?消費者的同步消費

在默認情況下,Kafka 消費者的消費行為是同步的,即:

  • 消費者通過?poll?方法從 Kafka 拉取一批消息。

  • 消費者逐條處理這些消息。

  • 每條消息處理完成后,消費者提交偏移量(offset)。

  • 消費者繼續調用?poll?方法獲取下一批消息。

特點:
  • 消息處理是順序的,即一條消息處理完成后才會處理下一條消息。

  • 如果某條消息處理時間較長,會影響后續消息的處理速度。

  • 適合消息處理邏輯簡單、處理時間較短的場景。

@KafkaListener(topics = {"ceshi"}, groupId = "#{'${test-3}'}")
public void onMessage(ConsumerRecord<String, String> message, Acknowledgment ack) {try {System.out.println("接收到消息:" + message.value());// 同步處理消息邏輯processMessage(message);} catch (Exception e) {System.err.println("處理消息失敗: " + e.getMessage());} finally {ack.acknowledge(); // 手動提交偏移量}
}private void processMessage(ConsumerRecord<String, String> message) {// 模擬消息處理邏輯try {Thread.sleep(1000); // 假設處理一條消息需要 1 秒} catch (InterruptedException e) {Thread.currentThread().interrupt();}
}

2. 消費者的異步消費

在某些場景下,消費者可能需要以異步的方式處理消息,即:

  • 消費者通過?poll?方法拉取一批消息。

  • 將每條消息提交到一個線程池或異步任務中處理。

  • 消費者繼續調用?poll?方法獲取下一批消息,而不等待上一條消息處理完成。

特點:
  • 消息處理是并發的,可以提高消息處理的吞吐量。

  • 需要額外的線程池或異步任務管理機制。

  • 適合消息處理邏輯復雜、處理時間較長的場景。

示例代碼:

@Autowired
private ExecutorService executorService; // 注入線程池@KafkaListener(topics = {"ceshi"}, groupId = "#{'${test-3}'}")
public void onMessage(ConsumerRecord<String, String> message, Acknowledgment ack) {if (!StringUtils.hasText(message.value())) {ack.acknowledge();return;}// 提交異步任務處理消息executorService.submit(() -> {try {System.out.println("接收到消息:" + message.value());processMessage(message); // 異步處理消息} catch (Exception e) {System.err.println("處理消息失敗: " + e.getMessage());} finally {ack.acknowledge(); // 手動提交偏移量}});
}private void processMessage(ConsumerRecord<String, String> message) {// 模擬消息處理邏輯try {Thread.sleep(1000); // 假設處理一條消息需要 1 秒} catch (InterruptedException e) {Thread.currentThread().interrupt();}
}

同步代碼示例

@RequestMapping("/sendMessage2")public String sendMessage2(){//通過構建器模式創建Message對象Message<String> message = MessageBuilder.withPayload("Hello, Kafka!").setHeader(KafkaHeaders.TOPIC, "ceshi").build();CompletableFuture<SendResult<String, String>> send = kafkaTemplate.send(message);try {//阻塞等待拿結果SendResult<String, String> sendResult = send.get();System.out.println("說明消息發送成功,如果不成功會拋出異常");} catch (Exception e) {throw new RuntimeException(e);}return "Message sent to topic";}

異步注冊回調的方式

 @RequestMapping("/sendMessage2")public String sendMessage2(){//通過構建器模式創建Message對象Message<String> message = MessageBuilder.withPayload("Hello, Kafka!").setHeader(KafkaHeaders.TOPIC, "ceshi").build();CompletableFuture<SendResult<String, String>> send = kafkaTemplate.send(message);//非阻塞  異步 注冊回調異步通知send.thenAccept(result -> {System.out.println("消息發送成功");}).exceptionally(e->{System.out.println("發送失敗");e.printStackTrace();return null;});return "Message sent to topic";}

如果需要發送的不是String類型?

那么要發送的不是String類型

KafkaTemplate<String,Object> kafkaTemplate;

一般來說可以專成JSON字符串發送

在引入spring-kafka的時候? ? ?KafkaAutoConfiguration中? 配置了KafkaTemplate

Kafka<Object,Object>

如果需要用KafkaTemplate發送對象的時候

默認用的String序列化? ?會報錯? ?除非將對象轉為JSON字符串(一般可以這么做)

如果用對象的話? ?改成JsonSerializer? 這樣自動轉JSON字符串

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/71999.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/71999.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/71999.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

API調試工具的無解困境:白名單、動態IP與平臺設計問題

引言 你是否曾經在開發中遇到過這樣的尷尬情形&#xff1a;你打開了平臺的API調試工具&#xff0c;準備一番操作&#xff0c;結果卻發現根本無法連接到平臺&#xff1f;別急&#xff0c;問題出在調試工具本身。今天我們要吐槽的就是那些神奇的開放平臺API調試工具&#xff0c;…

多方安全計算(MPC)電子拍賣系統

目錄 一、前言二、多方安全計算(MPC)與電子拍賣系統概述2.1 多方安全計算(MPC)的基本概念2.2 電子拍賣系統背景與需求三、MPC電子拍賣系統設計原理3.1 系統總體架構3.2 電子拍賣中的安全協議3.3 數學與算法證明四、數據加解密模塊設計五、GPU加速與系統性能優化六、GUI設計與系…

【Linux篇】初識Linux指令(上篇)

Linux命令世界&#xff1a;從新手到高手的必備指南 一 Linux發展與歷史1.1 Linux起源與發展1.2 Linux與Windows操作系統對比 二 Linux常用操作指令2.1 ls命令 - “List”&#xff08;列出文件)2.2 pwd指令- "打印當前工作目錄"2.3 cd指令 - “Change Directory”&…

編程視界:C++命名空間

目錄 命名空間 為什么要使用命名空間 什么是命名空間 命名空間的使用方式 關鍵點總結 命名空間的嵌套使用 匿名命名空間 跨模塊調用問題 命名空間可以多次定義 總結 首先從C的hello,world程序入手&#xff0c;來認識一下C語言 #include <iostream> using name…

Redux 和 MobX 高頻面試題

Redux 和 MobX 是 React 生態中的兩大狀態管理方案&#xff0c;在面試中常涉及 原理、使用方式、對比、最佳實踐 等方面。以下是 高頻面試題 詳細答案&#xff0c;助你輕松應對面試&#xff01;&#x1f680; &#x1f525; Redux 部分 1. Redux 是什么&#xff1f;為什么需要…

Excel 保護工作簿:它能解決哪些問題?如何正確使用?

在日常辦公中&#xff0c;Excel 表格常常涉及多人協作、重要數據保護&#xff0c;甚至是避免誤操作的情況。這時候&#xff0c;“保護工作簿”功能就能派上用場。它能有效防止他人修改表結構、刪除工作表&#xff0c;甚至可以設置密碼&#xff0c;確保數據的完整性和安全性。今…

Android Retrofit 框架注解定義與解析模塊深度剖析(一)

一、引言 在現代 Android 和 Java 開發中&#xff0c;網絡請求是不可或缺的一部分。Retrofit 作為 Square 公司開源的一款強大的類型安全的 HTTP 客戶端&#xff0c;憑借其簡潔易用的 API 和高效的性能&#xff0c;在開發者社區中廣受歡迎。Retrofit 的核心特性之一便是通過注…

C# Enumerable類 之 數據分組

總目錄 前言 在 C# 中&#xff0c;System.Linq.Enumerable 類是 LINQ&#xff08;Language Integrated Query&#xff09;的核心組成部分&#xff0c;它提供了一系列靜態方法&#xff0c;用于操作實現了 IEnumerable 接口的集合。通過這些方法&#xff0c;我們可以輕松地對集合…

推理模型對SQL理解能力的評測:DeepSeek r1、GPT-4o、Kimi k1.5和Claude 3.7 Sonnet

引言 隨著大型語言模型&#xff08;LLMs&#xff09;在技術領域的應用日益廣泛&#xff0c;評估這些模型在特定技術任務上的能力變得越來越重要。本研究聚焦于四款領先的推理模型——DeepSeek r1、GPT-4o、Kimi k1.5和Claude 3.7 Sonnet在SQL理解與分析方面的能力&#xff0c;…

IDEA接入阿里云百煉中免費的通義千問[2025版]

安裝deepseek 上一篇文章IDEA安裝deepseek最新教程2025中說明了怎么用idea安裝codeGPT插件&#xff0c;并接入DeepSeek&#xff0c;無奈接入的官方api已經不能使用了&#xff0c;所以我們嘗試從其他地方接入 阿里云百煉https://bailian.console.aliyun.com/ 阿里云百煉?是阿…

實施一套先進的智能攝像頭服務系統。

一、項目背景 隨著物聯網、人工智能和大數據技術的飛速發展&#xff0c;智能攝像頭已成為家庭、企業以及公共安全領域的重要設備。其便捷、高效、智能的特點&#xff0c;使得市場需求日益增長。為了滿足用戶對智能監控的多樣化需求&#xff0c;提供更加全面、可靠的監控服務&a…

linux自啟動服務

在Linux環境中&#xff0c;systemd是一個系統和服務管理器&#xff0c;它為每個服務使用.service文件進行配置。systemctl是用于控制系統服務的主要工具。本文將詳細介紹如何使用systemctl來管理vsftpd服務&#xff0c;以及如何設置服務自啟動。 使用Systemd設置自啟動服務 創…

010-Catch2

Catch2 一、框架簡介 Catch2 是一個基于 C 的現代化單元測試框架&#xff0c;支持 TDD&#xff08;測試驅動開發&#xff09;和 BDD&#xff08;行為驅動開發&#xff09;模式。其核心優勢在于&#xff1a; 單頭文件設計&#xff1a;v2.x 版本僅需包含 catch.hpp 即可使用自然…

數字人分身開發指南:從概念到實戰

一、什么是數字人分身&#xff1f; 想象一下&#xff0c;在電腦或手機屏幕里&#xff0c;一個能跟你聊天、回答問題&#xff0c;甚至還能做表情的虛擬角色。這就是數字人分身&#xff0c;它用上了人工智能技術&#xff0c;讓機器也能像人一樣交流。無論是在線客服、網絡主播還…

Pixelmator Pro for Mac 專業圖像處理軟件【媲美PS的修圖】

介紹 Pixelmator Pro&#xff0c;是一款非常強大、美觀且易于使用的圖像編輯器&#xff0c;專為 Mac 設計。采用單窗口界面、基于機器學習的智能圖像編輯、自動水平檢測&#xff0c;智能快速選擇及更好的修復工具等功能優點。許多非破壞性的專業編輯工具可讓您進行最佳的照片處…

LiveGBS流媒體平臺GB/T28181常見問題-視頻流安全控制HTTP接口鑒權勾選流地址鑒權后401Unauthorized如何播放調用接口流地址校驗

LiveGBS流媒體平臺GB/T28181常見問題頻流安全控制HTTP接口鑒權勾選流地址鑒權后401Unauthorized如何播放調用接口流地址校驗&#xff1f; 1、安全控制1.1、HTTP接口鑒權1.2、流地址鑒權 2、401 Unauthorized2.1、攜帶token調用接口2.1.1、獲取鑒權token2.1.2、調用其它接口2.1.…

C++設計模式-抽象工廠模式:從原理、適用場景、使用方法,常見問題和解決方案深度解析

一、模式基本概念 1.1 定義與核心思想 抽象工廠模式&#xff08;Abstract Factory Pattern&#xff09;是創建型設計模式的集大成者&#xff0c;它通過提供統一的接口來創建多個相互關聯或依賴的對象族&#xff0c;而無需指定具體類。其核心思想體現在兩個維度&#xff1a; …

【prompt實戰】知乎問題解答專家

本文原創作者&#xff1a;姚瑞南 AI-agent 大模型運營專家&#xff0c;先后任職于美團、獵聘等中大廠AI訓練專家和智能運營專家崗&#xff1b;多年人工智能行業智能產品運營及大模型落地經驗&#xff0c;擁有AI外呼方向國家專利與PMP項目管理證書。&#xff08;轉載需經授權&am…

數據結構第八節:紅黑樹(初階)

【本節要點】 紅黑樹概念紅黑樹性質紅黑樹結點定義紅黑樹結構紅黑樹插入操作的分析 一、紅黑樹的概念與性質 1.1 紅黑樹的概念 紅黑樹 &#xff0c;是一種 二叉搜索樹 &#xff0c;但 在每個結點上增加一個存儲位表示結點的顏色&#xff0c;可以是 Red和 Black 。 通過對 任何…

Spring Boot3.3.X整合Mybatis-Plus

前提說明&#xff1a; 項目的springboot版本為&#xff1a;3.3.2 需要整合的mybatis-plus版本&#xff1a;3.5.7 廢話不多說&#xff0c;開始造吧 1.準備好數據庫和表 2.配置全局文件application.properties或者是application.yml&#xff08;配置mapper的映射文件路徑&am…