springboot kafka 提高拉取數量

文章目錄

  • 背景
  • 問題復現
  • 解決問題
  • 原理分析
    • fetch.min.bytes
    • fetch.max.wait.ms
    • 源碼分析
      • ReplicaManager#fetchMessages

背景

開發過程中,使用kafka批量消費,發現拉取數量一直為1,如何提高批量拉取數量,記錄下踩坑記錄。

問題復現

  • kafka maven依賴
		<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.11</version></dependency>
  • 配置消費者
@Configuration
public class KafkaBlukConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.max-poll-records:30}")private Integer maxPollRecords;@Value("${spring.kafka.consumer.groupId:group1}")private String group;/*** 消費者配置信息*/@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");props.put(ConsumerConfig.GROUP_ID_CONFIG, group);props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}/*** 消費者批量?程*/@Beanpublic KafkaListenerContainerFactory<?> batchFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));//設置為批量消費,每個批次數量在Kafka配置參數中設置ConsumerConfig.MAX_POLL_RECORDS_CONFIGfactory.setBatchListener(true);return factory;}
}
  • 消費端代碼

@Component
public class KafkaBatchConsumer {private static final Logger log = LoggerFactory.getLogger(KafkaBatchConsumer.class);@KafkaListener(id = "consumer1", topics = "topic2", containerFactory = "batchFactory")public void consume(List<ConsumerRecord<String, String>> record) throws Exception {log.info("KafkaBatchConsumer recode size : {} ", record.size());}}
  • 使用yml配置生產者
spring:kafka:bootstrap-servers: 192.168.56.112:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
  • 使用生產者發送消息
@Slf4j
@RestController
@RequestMapping("/kafka")
public class KafkaProducer {// 自定義的主題名稱public static final String TOPIC_NAME = "topic2";private KafkaTemplate<String, String> kafkaTemplate;/*** http://localhost:8080/kafka/send?msg=a* @param msg*/@RequestMapping("/send")public String send(@RequestParam("msg") String msg) {log.info("準備發送消息為:{}", msg);// 1.發送消息ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC_NAME, msg);future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onFailure(Throwable throwable) {// 2.發送失敗的處理log.error("生產者 發送消息失敗:" + throwable.getMessage());}@Overridepublic void onSuccess(SendResult<String, String> stringObjectSendResult) {// 3.發送成功的處理log.info("生產者 發送消息成功:" + stringObjectSendResult.toString());}});return "接口調用成功";}
}
  • 發送消息,觀察消費者批量消費情況
http://localhost:9999/kafka/send?msg=a

多次調用發現如下:

在這里插入圖片描述
發現拉取消息的大小始終為1

解決問題

  • 添加下面兩行代碼
@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");props.put(ConsumerConfig.GROUP_ID_CONFIG, group);props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);################ 添加下面兩行 ###########props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024 * 1024);props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 3000);######################################props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}
  • 再次發送消息,觀察消費情況

在這里插入圖片描述
可以看到批量消費成功。

原理分析

fetch.min.bytes

消費者從服務器獲取記錄的最小字節數,broker 收到消費者拉取數據的請求的時候,如果可用數據量小于設置的值,那么 broker 將會等待有足夠可用的數據的時候才返回給消費者,這樣可以降低消費者和 broker 的工作負載。如果消費者的數量比較多,把該屬性的值設置得大一點可以降低 broker 的工作負載。

fetch.max.wait.ms

如果 Kafka 僅僅參考 fetch.min.bytes 參數的要求,那么有可能會因為獲取不到足夠大小的消息而一直阻塞等待,從而無法發送響應給 Consumer,顯然這是不合理的。fetch.max.wait.ms 參數用于指定 等待 FetchResponse 的最長時間,服務端根據此時間決定何時進行響應,默認值為 500(ms)。如果 Kafka 中沒有足夠多的消息而滿足不了 fetch.min.bytes 參數的要求,那么最終會等待 500ms 再響應消費者請求。這個參數的設定需要參考 Consumer 與 Kafka 之間的延遲大小,如果業務應用對延遲敏感,那么可以適當調小這個參數。

源碼分析

ReplicaManager#fetchMessages

/*** 能夠立即返回給客戶端的4種情況* 1. fetch請求沒有大于0的wait時間,參考fetch.max.wait.ms設置* 2. fetch請求要拉取的分區為空* 3. 根據fetch.min.bytes的設置,有足夠的數據返回* 4. 出現異常*/if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) {// fetchPartitionData是一個TopicPartition -> FetchPartitionData 的map集合val fetchPartitionData = logReadResults.map { case (tp, result) =>tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records,result.lastStableOffset, result.info.abortedTransactions)}// 調用響應回調函數responseCallback(fetchPartitionData)}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/20118.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/20118.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/20118.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

攻防對抗少丟分,愛加密幫您筑起第二防線

應用程序通常處理和存儲大量的敏感數據&#xff0c;如用戶個人信息、財務信息、商業數據、國家數據等&#xff0c;用戶量越大的應用程序&#xff0c;其需要存儲和保護的用戶數據越多。因此應用層長期是攻擊方的核心目標&#xff0c;傳統應用安全依靠防火墻(FireWall)、入侵檢測…

1.7 協議層次和服務模型

協議層次 網絡是一個復雜的系統! ? 網絡功能繁雜&#xff1a;數字信號的物理信 號承載、點到點、路由、rdt、進程區分、應用等 ?現實來看&#xff0c;網絡的許多構成元素和設備: ? 主機 ? 路由器 ? 各種媒體的鏈路 ? 應用 ? 協議 ? 硬件, 軟件 Q:如何組織和實現這個…

Linux上實現ssh免密通訊

Linux上實現ssh免密通訊 1.SSH互信原理2.SSH所需的RPM包3.兩臺機器實現互信4.常見問題及處理 1.SSH互信原理 SSH&#xff08;Secure Shell&#xff09;是一種安全的傳輸協議&#xff0c;它能讓Linux系統中的服務器和客戶端之間進行安全可靠的通訊。 SSH使用加密的傳輸方式&…

iOS組件化 方案 實現

iOS組件化 組件化的原因現在流行的組件化方案方案一、url-block &#xff08;基于 URL Router&#xff09;方案二、protocol調用方式解讀 方案三、target-action調用方式解讀 gitHub代碼鏈接參考 組件化的原因 模塊間解耦模塊重用提高團隊協作開發效率單元測試 當項目App處于…

網絡原理-四

一、續 當窗口大小為0,意味著緩沖區滿了,此時發送方,就因該暫停發送,發送方會周期性的除法 " 窗口探測包 " ,并不攜帶載荷,這樣的包對于業務不產生影響,只是為了觸發ACK,一旦查詢出來的結果是非0,緩沖區右有空間了,發送方就可以繼續發送. 二、擁塞控制 要限制發送方…

一步一步寫線程之十三隊列間的消息通知

一、線程和分布式的通信 隨著技術的不斷發展&#xff0c;多線程和分布式通信愈發的普及。那么在這種場景下的如何進行數據的通信&#xff0c;便成為了一個非常典型的問題。無論是多線程還是分布式&#xff0c;其實其抽象出來的通信機制都是類似的。或者說換句話&#xff0c;多…

java檢測字符串是否包含數字和字母

在Java中&#xff0c;要檢測一個字符串是否同時包含數字和字母&#xff0c;我們可以使用正則表達式&#xff08;regex&#xff09;或者通過遍歷字符串并檢查每個字符來實現。以下是兩種方法的詳細代碼示例&#xff1a; 1.方法一&#xff1a;使用正則表達式 import java.util.…

【AI+知識庫問答】沉浸式體驗了解 AI知識庫問答fastGPT

之前寫過一篇文章 【AI本地知識庫】個人整理的幾種常見本地知識庫技術方案 &#xff0c; 由于當時主要是針對AI本地知識庫&#xff0c; 所以沒列fastGPT。 最近經常刷到fastGPT&#xff0c;這里單獨水一篇。 FastGPT 是一個基于 LLM 大語言模型的知識庫問答系統&#xff0c;…

Github 2024-06-01 開源項目日報Top10

根據Github Trendings的統計,今日(2024-06-01統計)共有10個項目上榜。根據開發語言中項目的數量,匯總情況如下: 開發語言項目數量Python項目5Jupyter Notebook項目2TypeScript項目1Go項目1Shell項目1Lua項目1Kong:云原生API網關與AI能力 創建周期:3482 天開發語言:Lua協議…

如何確保績效目標執行到位?

很多企業在實施績效過程中&#xff0c;盡管制定好了績效目標&#xff0c;但是沒有執行下去&#xff0c;管理者將原因歸咎于“員工低效”、“體制機制”等問題&#xff0c;那么在人力資源管理方面&#xff0c;企業應該如何確保制定的績效目標執行到位&#xff1f;如何提高低效能…

云原生架構相關技術_4.服務網格

1.技術特點 服務網格&#xff08;ServiceMesh&#xff09;是分布式應用在微服務軟件架構之上發展起來的新技術&#xff0c;旨在將那些微服務間的連接、安全、流量控制和可觀測等通用功能下沉為平臺基礎設施&#xff0c;實現應用與平臺基礎設施的解耦。這個解耦意味著開發者無需…

React@16.x(14)context 舉例 - Form 表單

目錄 1&#xff0c;目標2&#xff0c;實現2.1&#xff0c;index.js2.2&#xff0c;context.js2.2&#xff0c;Form.Input2.3&#xff0c;Form.Button 3&#xff0c;使用 1&#xff0c;目標 上篇文章說到&#xff0c;context 上下文一般用于第3方組件庫&#xff0c;因為使用場景…

Chisel入門——在windows下vscode搭建|部署Scala2.13.3開發環境|用Chisel點亮FPGA小燈等實驗

文章目錄 前言一、vscode搭建scala開發環境1.1 安裝Scala官方插件1.2 創建hello_world.scala文件1.3 確認java的版本(博主使用的是1.8)1.4 下載Scala Windows版本的二進制文件1.5 配置環境變量1.6 交互模式測試一下1.7 vscode運行scala 二、windows安裝sbt2.1 下載sbt2.2 設置環…

函數遞歸及具體例子(持續更新)

遞歸就是函數自己調用自己 求n的階乘 n! n * (n - 1)! 直到n為1或者0的時候為止 舉個例子 int Fun(int n) {if (n < 0){return 1;}else{return n * Fun(n - 1);} }int main() {int n 0;scanf("%d", &n);int ret Fun(n);printf("%d\n", ret…

安裝Kubernetes v3 ----以docker的方式部署

以docker的方式部署 docker run -d \ --restartunless-stopped \ --namekuboard \ -p 80:80/tcp \ -p 10081:10081/tcp \ -e KUBOARD_ENDPOINT"http://192.168.136.55:80" \ -e KUBOARD_AGENT_SERVER_TCP_PORT"10081" \ -v /root/kuboard-data:/data \ e…

springboot中抽象類無法注入到ioc容器

1、背景 在寫代碼時&#xff0c;發現service接口有兩個實現類&#xff0c;并且兩個實現類中沒有對類名重命名&#xff0c;屬性注入的時候也沒有使用byName或Qualifier&#xff0c;正確情況下會發生多實現報錯的問題&#xff0c;以前對這個問題進行解析過。 2、調試過程 我想…

【設計模式】創建型-建造者模式

前言 在面向對象的軟件開發中&#xff0c;構建復雜對象時經常會遇到許多挑戰。一種常見的解決方案是使用設計模式&#xff0c;其中建造者模式是一個強大而靈活的選擇。本文將深入探討建造者模式的原理、結構、優點以及如何在實際項目中應用它。 一、復雜的對象 public class…

飛凌嵌入式FET3568/3568J-C核心板現已適配OpenHarmony4.1

近日&#xff0c;飛凌嵌入式為FET3568/3568J-C核心板適配了OpenHarmony4.1系統&#xff0c;新系統的加持使核心板在兼容性、穩定性與安全性等方面都得到進一步提升&#xff0c;不僅為FET3568/3568J-C核心板賦予了更強大的功能&#xff0c;也為開發者們提供了更加廣闊的創新空間…

每日一練編程題:今天是【接口,多態】

設計程序 : 電腦類的屬性USB接口數組 : 有3個usb插口電腦類的功能 : 通過接口插入外設 (u盤,麥克風,鍵盤等) addUSB(USB usb) { }開機 要求: 電腦開機前,先啟動外設關機 要求: 電腦關機前,先關閉外設 外設類(u盤,麥克風,鍵盤等) 功能 : 啟動 關閉 USB接口 定義usb設備的統一…

python多種方式 保留小數點位數(附Demo)

目錄 前言1. 字符串格式2. round函數3. Decimal模塊4. numpy庫5. Demo 前言 在Python中&#xff0c;保留小數點后特定位數可以通過多種方式實現 以下是幾種常見的方法&#xff0c;并附上相應的代碼示例&#xff1a; 使用字符串格式化&#xff08;String Formatting&#xff…