Kafka系列之SpringBoot集成Kafka

本文介紹如何在springboot項目中集成kafka收發message。

pom依賴

springboot相關的依賴我們就不提了,和kafka相關的只依賴一個spring-kafka集成包

	<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

Kafka相關的yaml配置

spring:kafka:bootstrap-servers: 30.46.35.29:9092producer:retries: 3acks: -1batch-size: 16384buffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializercompression-type: lz4properties:linger.ms: 1'interceptor.classes': com.tencent.qidian.ma.commontools.trace.kafka.TracingProducerInterceptorconsumer:heartbeat-interval: 3000max-poll-records: 100enable-auto-commit: falsekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:session.timeout.ms: 30000listener:concurrency: 3type: batchack-mode: manual_immediate

以下為相關配置的說明:

spring.kafka

  • spring.kafka.bootstrap-servers: 指定 Kafka 服務器的地址列表,格式為 host:port,多個地址使用逗號分隔。

spring.kafka.producer用于配置 Kafka 消費者相關屬性

spring.kafka.consumer用于配置 Kafka 消費者相關屬性

  • spring.kafka.consumer.enable-auto-commit的值為false表示關閉Kafka客戶端的自動提交offSet。
  • spring.kafka.consumer.max-poll-records的值為20表示在開啟了批量消費以后,每次從Kafka服務端拉取的數據最大條數為20。

在 Spring 中是使用 Kafka 監聽器來進行消息消費的,spring.kafka.listener用來配置監聽器的相關配置

  • spring.kafka.listener.type的值為batch表示開啟批量消費,默認值為single(單條)。
  • spring.kafka.listener.ack-mode的值為manual_immediate表示關閉Spring的自動提交offSet,我們需要在代碼中進行手動提交。spring.kafka.listener.ack-mode的取值有兩個比較常見的選項值MANUAL 和MANUAL_IMMEDIATE。MANUAL表示處理完業務后,手動調用Acknowledgment.acknowledge()先將offset存放到map本地緩存,在下一次poll之前從緩存拿出來批量提交。MANUAL_IMMEDIATE表示每次處理完業務,手動調用Acknowledgment.acknowledge()后立即提交。

生產者配置

1)通過@Configuration、@EnableKafka,聲明Config并且打開KafkaTemplate能力。

2)生成bean,@Bean

常見配置參考:

package com.somnus.config.kafka;@Configuration
@EnableKafka
public class KafkaProducerConfig {@Resourceprivate KafkaProperties kafkaProperties;public Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());props.put(ProducerConfig.RETRIES_CONFIG, kafkaProperties.getProducer().getRetries());props.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaProperties.getProducer().getBatchSize());props.put(ProducerConfig.LINGER_MS_CONFIG, kafkaProperties.getProducer().getProperties().get("linger.ms"));props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaProperties.getProducer().getBufferMemory());props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaProperties.getProducer().getKeySerializer());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafkaProperties.getProducer().getValueSerializer());return props;}public ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}
}

消費端配置

1)通過@Configuration、@EnableKafka,聲明Config并且打開KafkaTemplate能力。

2)生成bean,@Bean

常見配置參考:

package com.tencent.qidian.ma.maaction.web.config.kafka;import jakarta.annotation.Resource;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener.Type;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties.AckMode;/*** KafkaBeanConfiguration*/
@Configuration
@EnableKafka
public class KafkaBeanConfiguration {@Resourceprivate ConsumerFactory consumerFactory;@Resourceprivate KafkaProperties kafkaProperties;@Bean(name = "kafkaListenerContainerFactory")public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.getContainerProperties().setAckMode(kafkaProperties.getListener().getAckMode());factory.setConcurrency(kafkaProperties.getListener().getConcurrency());if (kafkaProperties.getListener().getType().equals(Type.BATCH)) {factory.setBatchListener(true);}return factory;}// 此bean為了后續演示使用,參考消費演示中的containerFactory屬性配置@Bean(name = "tenThreadsKafkaListenerContainerFactory1")public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>tenThreadsKafkaListenerContainerFactory1() {ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);factory.getContainerProperties().setAckMode(kafkaProperties.getListener().getAckMode());factory.setConcurrency(10);if (kafkaProperties.getListener().getType().equals(Type.BATCH)) {factory.setBatchListener(true);}return factory;}public ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}public Map<String, Object> consumerConfigs() {Map<String, Object> propsMap = new HashMap<>();propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,kafkaProperties.getConsumer().getMaxPollRecords());propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaProperties.getConsumer().getEnableAutoCommit());propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaProperties.getConsumer().getProperties().get("session.timeout.ms"));propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaProperties.getConsumer().getKeyDeserializer());propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaProperties.getConsumer().getValueDeserializer());return propsMap;}}

SpringBoot 集成 KafkaTemplate 發送Kafka消息

@Resourceprivate ObjectMapper mapper;@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;try {Order order = new Order();String message = mapper.writeValueAsString(order);CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send("order", message);future.thenAccept(result -> {if (result.getRecordMetadata() != null) {log.debug("send message:{} with offset:{}", message, result.getRecordMetadata().offset());}}).exceptionally(exception -> {log.error("KafkaProducer send message failure,topic={},data={}", topic, message, exception);return null;});} catch (Exception e) {log.error("KafkaProducer send message exception,topic={},message={}", topic, message, e);}

SpringBoot 集成 @KafkaListener 消費Kafka消息

max.poll.interval.ms

默認為5分鐘
如果兩次poll操作間隔超過了這個時間,broker就會認為這個consumer處理能力太弱,會將其踢出消費組,將分區分配給別的consumer消費,觸發rebalance 。
如果你的消費者節點總是在重啟完不久就不消費了,可以考慮檢查改配置項或者優化你的消費者的消費速度等等。

max.poll.records

max-poll-records是Kafka consumer的一個配置參數,表示consumer一次從Kafka broker中拉取的最大消息數目,默認值為500條。在Kafka中,一個消費者組可以有多個consumer實例,每個consumer實例負責消費一個或多個partition的消息,每個consumer實例一次從broker中可以拉取一個或多個消息。

max-poll-records參數的作用就是控制每次拉取消息的最大數目,以實現消費弱化和控制內存資源的需求。

參考Kafka中的max-poll-records和listener.concurrency配置

注意:@KafkaListener注解中的concurrency會覆蓋消費者工廠中的concurrency,以下面代碼為例,即使kafkaListenerContainerFactory1中的并發數是3,但是最終生成的監聽器數量是2。

@Resourceprivate ObjectMapper mapper;@KafkaListener(id = "order_consumer",topics = "order",groupId = "g_order_consumer_group",//可配置containerFactory參數,使用指定的containerFactory,不配置默認使用名稱是kafkaListenerContainerFactory的bean//containerFactory = "kafkaListenerContainerFactory1",//concurrency = "2",properties = {"max.poll.interval.ms:300000", "max.poll.records:1"})// 可以只有ConsumerRecords<String, String> records參數。ack參數非必需,ack.acknowledge()是為了防消息丟失public void consume(ConsumerRecords<String, String> records, Acknowledgment ack) {for (ConsumerRecord<String, String> record : records) {String msg = record.value();log.info("Consume msg:{}", msg);try {Order order = mapper.readValue(val, Order.class);// 處理業務邏輯} catch (Exception e) {log.error("Consume failed, msg:{}", val, e);}}ack.acknowledge();}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/40019.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/40019.shtml
英文地址,請注明出處:http://en.pswp.cn/web/40019.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

STM32F1+HAL庫+FreeTOTS學習5——內核中斷管理及中斷控制函數

STM32F1HAL庫FreeTOTS學習5——中斷管理和臨界段代碼保護 中斷簡介中斷優先級寄存器拓展FreeRTOS中PendSV和Systick中斷優先級配置三個中斷屏蔽寄存器FreeRTOS中斷管理函數代碼驗證 上一期我們學習了FreeRTOS中任務掛起與恢復&#xff0c;在中斷服務程序中恢復任務過程中&#…

[Redis]哨兵機制

哨兵機制概念 在傳統主從復制機制中&#xff0c;會存在一些問題&#xff1a; 1. 主節點發生故障時&#xff0c;進行主備切換的過程是復雜的&#xff0c;需要人工參與&#xff0c;導致故障恢復時間無法保障。 2. 主節點可以將讀壓力分散出去&#xff0c;但寫壓力/存儲壓力是無法…

印章誰在管、誰用了、用在哪?契約鎖讓您打開手機一看便知

“印章都交給誰在管”、“哪些人能用”、“都有哪些業務在用”…這些既是管理者最關心的印章問題也是影響印章安全的關鍵要素。但是公司旗下分子公司那么多&#xff0c;各類公章、法人章、財務章、合同章一大堆&#xff0c;想“問”明白很難。 契約鎖電子簽及印控平臺推出“印章…

14-11 2024 年的 13 個 AI 趨勢

2024 年的 13 個 AI 趨勢 人工智能對環境的影響和平人工智能人工智能支持的問題解決和決策針對人工智能公司的訴訟2024 年美國總統大選與人工智能威脅人工智能、網絡犯罪和社會工程威脅人工智能治療孤獨與對人工智能的情感依賴人工智能影響者中國爭奪人工智能霸主地位人工智能…

一句話回答的前端面試題

該篇文章為一句話的答案&#xff0c;想看更詳細的面試題請看這篇>《前端面試題》 原型鏈&#xff1a; 實例與原型的鏈條&#xff0c;原型是prototype&#xff0c;鏈是__proto__&#xff0c;每個函數有一個原型對象&#xff0c;函數在創建時有一個默認屬性 prototype&#x…

YOLOv10全網最新創新點改進系列:融合GSConv+Slim Neck,雙改進、雙增強,替換特征融合層實現, 輕量化漲點改進策略,有效漲點神器!

YOLOv10全網最新創新點改進系列&#xff1a;融合GSConvSlim Neck&#xff0c;雙改進、雙增強&#xff0c;替換特征融合層實現&#xff0c; 輕量化漲點改進策略&#xff0c;有效漲點神器&#xff01; 所有改進代碼均經過實驗測試跑通&#xff01;截止發稿時YOLOv10已改進40&…

【數據結構】06.棧隊列

一、棧 1.1棧的概念及結構 棧&#xff1a;一種特殊的線性表&#xff0c;其只允許在固定的一端進行插入和刪除元素操作。進行數據插入和刪除操作的一端稱為棧頂&#xff0c;另一端稱為棧底。棧中的數據元素遵守后進先出LIFO&#xff08;Last In First Out)的原則。 壓棧&#…

FPGA就業方向以及主要工作

FPGA&#xff08;Field-Programmable Gate Array&#xff09;作為可編程邏輯器件&#xff0c;在多個行業和領域中都有廣泛的應用。具備FPGA技能的專業人士可以在多個方向上找到就業機會&#xff0c;以下是FPGA主要的就業方向及其對應的主要工作職責&#xff1a; 通信行業 職位…

LangChain終極內幕指南,學會langchain就看它了

1.概述 在人工智能迅速演進的時代&#xff0c;諸如Open AI的ChatGPT和Google的Bard等大型語言模型(LLMs)正徹底改變我們與技術互動的方式。這些技術巨頭和SaaS公司正在競相利用LLMs的威力&#xff0c;創造更為智能和實用的應用程序。 然而&#xff0c;真正的變革并非僅僅停留…

低壓電工精選歷年真題附答案

1.當電壓為5V時&#xff0c;導體的電阻值為5歐&#xff0c;那么當電阻兩端電壓為2V時&#xff0c;導體的電阻值為()歐。[單選題] A 、10B、5(正確答案) C、2 2.當電氣火災發生時&#xff0c;應首先切斷電源再滅火&#xff0c;但當電源無法切斷時&#xff0c;只能帶電滅火&…

Finding and exploting an unused API endpoint

Using 0$ account buy a piece of lether priced at $133 1、嘗試訪問api接口 大概率可能訪問不到,但是可以嘗試訪問下 /api/swagger/v1 /openapi.json 2、頁面功能點尋找 api send to Repeter 3、Find Supported HTTP請求 POST方法測試 通過測試得知支持GET方法和PATC…

C語言實現的人員管理系統(順序表版)

該系統具有以下主要功能&#xff1a; 添加人員信息&#xff1a;在有空間的前提下&#xff0c;用戶輸入人員的工號、姓名、性別、聯系電話和 QQ 號等信息&#xff0c;系統會自動檢查編號的唯一性&#xff0c;確保不重復。查找人員信息&#xff1a;提供按工號和姓名兩種查找方式…

av_read_frame 代碼研究

------------------------------------------------------------ author: hjjdebug date: 2024年 07月 05日 星期五 11:02:51 CST av_read_frame 代碼研究 ------------------------------------------------------------ 有人只標注一層,標注一層太膚淺了.不能了解底層之精妙…

Lianwei 安全周報|2024.07.01

新的一周又開始了&#xff0c;以下是本周「Lianwei周報」&#xff0c;我們總結推薦了本周的政策/標準/指南最新動態、熱點資訊和安全事件&#xff0c;保證大家不錯過本周的每一個重點&#xff01; 政策/標準/指南最新動態 01 出于安全考慮&#xff0c;拜登下令禁用卡巴斯基殺毒…

【康復學習--LeetCode每日一題】3115. 質數的最大距離

題目&#xff1a; 給你一個整數數組 nums。 返回兩個&#xff08;不一定不同的&#xff09;質數在 nums 中 下標 的 最大距離。 示例 1&#xff1a; 輸入&#xff1a; nums [4,2,9,5,3] 輸出&#xff1a; 3 解釋&#xff1a; nums[1]、nums[3] 和 nums[4] 是質數。因此答案是…

SpringBoot各類數量限制及超出后拋出的異常

前言 在使用SpringBoot開發接口時&#xff0c;動不動的就發生各種超過默認值的限制&#xff0c;這里總結了下SpringBoot默認限制的設置以及可能會發生的異常&#xff0c;便于問題的排查和快速修改默認值。 配置項配置項說明默認值超過大小后拋出的異常spring.servlet.multipa…

系統管理(System Keeping):全新迭代,優化您的開發體驗

隨著科技的不斷進步和用戶需求的日益增長&#xff0c;系統管理&#xff08;System Keeping&#xff09;不斷進行迭代更新&#xff0c;致力于為用戶帶來更加高效、便捷的開發體驗。本次全新迭代&#xff0c;不僅在界面與交互上進行了革新&#xff0c;更在功能整合、個性化與安全…

ECOLOGY9重置系統管理員密碼

ECOLOGY9系統管理員密碼忘記需要重置&#xff1a; 1、KB2110之后版本加了防篡改邏輯&#xff0c;數據庫中初始話密碼需要將hashdata、signdata更新為空&#xff0c;執行如下語句初始化 update HrmResourceManager set password ‘C4CA4238A0B923820DCC509A6F75849B’,salt‘’…

Android --- Service

出自于此&#xff0c;寫得很清楚。關于Android Service真正的完全詳解&#xff0c;你需要知道的一切_android service-CSDN博客 出自【zejian的博客】 什么是Service? Service(服務)是一個一種可以在后臺執行長時間運行操作而沒有用戶界面的應用組件。 服務可由其他應用組件…

萬字長文|關于 OpenAI 接口開發你應該知道的一切

這篇文章中個人結合自己的實踐經驗把 OpenAI 官方文檔解讀一遍。但是原文檔涉及內容眾多&#xff0c;包括微調&#xff0c;嵌入&#xff08;Embeddings&#xff09;等眾多主題&#xff0c;我這里重點挑選自己開發高頻使用到的&#xff0c;需要詳細了解的可以自行前往官網閱讀。…