Apche Kafka + Spring的消息監聽容器

目錄

  • 一、消息的接收
    • 1.1、消息監聽器
  • 二、消息監聽容器
    • 2.1、 實現方法
      • 2.1.1、KafkaMessageListenerContainer
        • 2.1.1.1、 基本概念
        • 2.1.1.2、如何使用 KafkaMessageListenerContainer
      • 2.1.2、ConcurrentMessageListenerContainer
    • 三、偏移
  • 四、監聽器容器自動啟動

一、消息的接收

消息的接收:可以通過配置MessageListenerContainer并提供消息偵聽器或使用@KafkaListener注釋來接收消息。本章我們主要說明通過配置MessageListenerContainer并提供消息偵聽器的方式接收消息。

1.1、消息監聽器

當使用消息監聽容器時,就必須提供一個監聽器來接收數據。目前有八個支持消息偵聽器的接口:

public interface MessageListener<K, V> { // 當使用自動提交或容器管理的提交方法之一時,使用此接口處理從 Kafka 消費者 poll() 操作接收到的各個 ConsumerRecord 實例。void onMessage(ConsumerRecord<K, V> data);
}public interface AcknowledgingMessageListener<K, V> { // 當使用手動提交方法之一時,使用此接口處理從 Kafka 消費者 poll() 操作接收到的各個 ConsumerRecord 實例。void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { // 當使用自動提交或容器管理的提交方法之一時,使用此接口處理從 Kafka 消費者 poll() 操作接收到的各個 ConsumerRecord 實例。提供對 Consumer 對象的訪問。void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);}public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { //當使用手動提交方法之一時,使用此接口處理從 Kafka 消費者 poll() 操作接收到的各個 ConsumerRecord 實例。提供對 Consumer 對象的訪問。void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);}public interface BatchMessageListener<K, V> { //當使用自動提交或容器管理的提交方法之一時,使用此接口處理從 Kafka 消費者 poll() 操作接收到的所有 ConsumerRecord 實例。使用此接口時不支持 AckMode.RECORD,因為偵聽器會獲得完整的批次。void onMessage(List<ConsumerRecord<K, V>> data);}public interface BatchAcknowledgingMessageListener<K, V> { // 當使用手動提交方法之一時,使用此接口處理從 Kafka 消費者 poll() 操作接收到的所有 ConsumerRecord 實例。void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);}public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { // 當使用自動提交或容器管理的提交方法之一時,使用此接口處理從 Kafka 消費者 poll() 操作接收到的所有 ConsumerRecord 實例。使用此接口時不支持 AckMode.RECORD,因為偵聽器會獲得完整的批次。提供對 Consumer 對象的訪問。void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);}public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { //當使用手動提交方法之一時,使用此接口處理從 Kafka 消費者 poll() 操作接收到的所有 ConsumerRecord 實例。提供對 Consumer 對象的訪問。void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);}

注意:1、 Consumer對象不是線程安全的;2、不應執行任何Consumer<?, ?>影響消費者位置和/或監聽器中已提交偏移量的方法;容器需要管理這些信息。

二、消息監聽容器

2.1、 實現方法

MessageListenerContainer 提供了兩種實現方式 :
1、KafkaMessageListenerContainer,
2、ConcurrentMessageListenerContainer

2.1.1、KafkaMessageListenerContainer

2.1.1.1、 基本概念

KafkaMessageListenerContainer在單個線程上接收來自所有主題或分區的所有消息。委托ConcurrentMessageListenerContainer給一個或多個KafkaMessageListenerContainer實例以提供多線程消費。

  • 從2.2.7版本開始,可以添加一個記錄攔截器(RecordInterceptor)監聽器容器;它將在調用偵聽器之前調用,以允許檢查或修改記錄。如果攔截器返回 null,則不會調用偵聽器。
  • 從版本 2.7 開始,它具有在偵聽器退出后(通常或通過拋出異常)調用的附加方法。
  • 批處理攔截器(BatchInterceptor)為批量監聽器(Batch Listeners)提供類似的功能。
  • 此外,ConsumerAwareRecordInterceptor(和 BatchInterceptor)提供對 Consumer<?, ?> 的訪問。 例如,這可以用于訪問攔截器中的消費者指標。
  • CompositeRecordInterceptor and CompositeBatchInterceptor可以調用多個攔截器。
  • 默認情況下,當使用事務時,攔截器在事務啟動后被調用。從版本 2.3.4 開始,可以設置偵聽器容器的 interceptBeforeTx 屬性在事務開始之前調用攔截器。
  • 從版本 2.3.8、2.4.6 開始,當并發大于 1 時 ConcurrentMessageListenerContainer 支持靜態成員資格。 group.instance.id 后綴為 -n ,起始n于1。這與增加 session.timeout.ms 的值 一起可用于減少重新平衡事件,例如,當應用程序實例重新啟動時。
  • 靜態成員資格是指在提高流應用程序、消費者組和其他構建在組再平衡協議之上的應用程序的可用性。再平衡協議依賴組協調器為組成員分配實體 ID。這些生成的 ID 是短暫的,并且會在成員重新啟動和重新加入時發生變化。對于基于消費者的應用程序,這種“動態成員資格”可能會導致在管理操作(例如代碼部署、配置更新和定期重新啟動)期間將大部分任務重新分配給不同的實例。對于大型狀態應用程序,洗牌任務在處理之前需要很長時間才能恢復其本地狀態,從而導致應用程序部分或完全不可用。受這一觀察的啟發,Kafka 的組管理協議允許組成員提供持久的實體 ID。根據這些 ID,組成員資格保持不變,因此不會觸發重新平衡。

同樣的,攔截器中不應該執行任何影響消費者的位置和/或提交的偏移量的方法,容器需要管理這些信息。

如果攔截器改變了記錄(通過創建新記錄),則topic、partition和offset必須保持不變,以避免意外的副作用,例如記錄丟失。

2.1.1.2、如何使用 KafkaMessageListenerContainer

  • KafkaMessageListenerContainer 構造函數

    public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,ContainerProperties containerProperties)
    

    該構造函數接收接收消費者工廠(ConsumerFactory)有關對象中主題和分區以及其他配置的信息。

  • 容器屬性(ContainerProperties)包含3個構造函數,下面我們一個一個介紹它們。
    1、以TopicPartitionOffset為參數

    public ContainerProperties(TopicPartitionOffset... topicPartitions)
    

    該構造函數采用一個主題分區偏移量(TopicPartitionOffset)參數數組來顯式指示容器要使用哪些分區(使用消費者assign()方法)并帶有可選的初始偏移量。默認情況下,正值是絕對偏移量,負值是相對于分區內當前最后一個偏移量。TopicPartitionOffset提供了一個帶有附加參數的構造函,boolean如果是true,則在容器啟動時相對于該消費者的當前位置初始偏移(正或負)。
    2、以String為參數

    public ContainerProperties(String... topics)
    

    該構造函數采用主題數組,Kafka 根據屬性分配分區group.id——在組中分配分區
    3、以Pattern為參數

    public ContainerProperties(Pattern topicPattern)
    

    該構造函數使用正則表達式Pattern來選擇主題。

  • 如何將監聽器分配給容器
    監聽器有了容器也有了,如何將監聽器分配給容器呢?。要將 MessageListener 分配給容器,可以在創建 Container 時使用 ContainerProps.setMessageListener 方法:

    ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
    containerProps.setMessageListener(new MessageListener<Integer, String>() {...
    });
    DefaultKafkaConsumerFactory<Integer, String> cf =new DefaultKafkaConsumerFactory<>(consumerProps());
    KafkaMessageListenerContainer<Integer, String> container =new KafkaMessageListenerContainer<>(cf, containerProps);
    return container;
    

    要注意的是,在創建 DefaultKafkaConsumerFactory 時,使用僅接受上述屬性的構造函數意味著從配置中選取鍵和值反序列化器類。 或者,反序列化器實例可以傳遞到 DefaultKafkaConsumerFactory 構造函數以獲取鍵和/或值,在這種情況下,所有消費者共享相同的實例。 另一種選擇是提供Supplier(從版本2.3開始),它將用于為每個消費者獲取單獨的Deserializer實例:

     DefaultKafkaConsumerFactory<Integer, CustomValue> cf =new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new      CustomValueDeserializer());KafkaMessageListenerContainer<Integer, String> container =new KafkaMessageListenerContainer<>(cf, containerProps);
    return container;
    

從版本 2.3.5 開始,引入了一個名為authorizationExceptionRetryInterval 的新容器屬性。 這會導致容器在從 KafkaConsumer 獲取任何 AuthorizationException 后重試獲取消息。 例如,當配置的用戶被拒絕讀取特定主題時,就會發生這種情況。 定義authorizationExceptionRetryInterval應該有助于應用程序在授予適當的權限后立即恢復。

2.1.2、ConcurrentMessageListenerContainer

ConcurrentMessageListenerContainer只有一個構造函數與構造函數類似 KafkaListenerContainer。

public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,ContainerProperties containerProperties)

它有一個concurrency屬性,這個屬性的作用是創建幾個 KafkaMessageListenerContainer 實例。例如:container.setConcurrency(3) 創建三個 KafkaMessageListenerContainer 實例。

當監聽多個主題時,默認的分區分布可能不是我們所期望的。 例如,如果有 3 個主題,每個主題有 5 個分區,并且我們想要使用 concurrency=15,但是我們只會看到 5 個活動使用者,每個使用者從每個主題分配一個分區,而其他 10 個使用者處于空閑狀態。 這是因為默認的 Kafka PartitionAssignor 是 RangeAssignor。 對于這種情況,我們需要考慮使用 RoundRobinAssignor,它將分區分配給所有使用者。 然后,為每個消費者分配一個主題或分區。 我們可以在提供給DefaultKafkaConsumerFactory的屬性中設置partition.assignment.strategy消費者屬性來更改要更改PartitionAssignor。(ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)。
在springboot中可以這樣:
spring.kafka.consumer.properties.partition.assignment.strategy=
org.apache.kafka.clients.consumer.RoundRobinAssignor

當使用 TopicPartitionOffset 配置容器屬性時,ConcurrentMessageListenerContainer 會在委托 KafkaMessageListenerContainer 實例之間分發 TopicPartitionOffset 實例。

假設提供了 6 個 TopicPartitionOffset 實例,并發度為 3; 每個容器有兩個分區。 對于五個
TopicPartitionOffset 實例,兩個容器獲得兩個分區,第三個容器獲得一個分區。 如果并發數大于TopicPartition的數量,則降低并發數,使每個容器獲得一個分區。

三、偏移

spring提供了幾個偏移選項, 如果 enable.auto.commit 消費者屬性為 true,Kafka會根據其配置自動提交偏移量。 如果為 false,則容器支持多種 AckMode 設置。 默認 AckMode 為 BATCH。

從版本 2.3 開始,框架將 enable.auto.commit 設置為 false,除非在配置中明確設置。以前,如果未設置該屬性,則使用 Kafka 默認值 (true)。

消費者 poll() 方法返回一個或多個 ConsumerRecord。 為每條記錄調用 MessageListener。 以下列表描述了容器對每個 AckMode 采取的操作(當未使用事務時):

  • RECORD:當偵聽器處理記錄后返回時提交偏移量。

  • BATCH:當 poll() 返回的所有記錄都已處理完畢時提交偏移量。

  • TIME:當 poll() 返回的所有記錄都處理完畢后,只要超過了自上次提交以來的 ackTime,就提交偏移量。

  • COUNT:當 poll() 返回的所有記錄都已處理完畢時,提交偏移量,只要自上次提交以來已收到 ackCount 條記錄。

  • COUNT_TIME:與 TIME 和 COUNT 類似,但如果任一條件為真,則執行提交。

  • MANUAL:消息偵聽器負責acknowledge() 確認。 之后,應用與 BATCH 相同的語義。

  • MANUAL_IMMEDIATE:當偵聽器調用 Acknowledgment.acknowledge() 方法時立即提交偏移量。

使用事務(transactions)時,偏移量將發送到事務,語義相當于 RECORD 或 BATCH,具體取決于偵聽器類型(記錄或批處理)。MANUAL 和 MANUAL_IMMEDIATE 要求偵聽器是 AcknowledgingMessageListener 或 BatchAcknowledgingMessageListener。

根據syncCommits容器屬性,使用消費者上的commitSync()或commitAsync()方法。 默認情況下,syncCommits 為 true。

作者個人建議建議設置:ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 為 false。

從版本 2.3 開始,Acknowledgment 接口增加了兩個方法 nack(long sleep) 和 nack(int index, long sleep)。 第一個與記錄偵聽器一起使用,第二個與批處理偵聽器一起使用。 為偵聽器類型調用錯誤的方法將引發 IllegalStateException。在此之前他是這樣:

public interface Acknowledgment {void acknowledge();}
  • 如果要提交部分批次,使用 nack()。
  • 使用事務時,將 AckMode 設置為 MANUAL;
  • 調用 nack() 會將成功處理的記錄的偏移量發送到事務。
  • nack() 只能在調用偵聽器的消費者線程上調用。
  • 當調用 nack() 時,將提交所有掛起的偏移量,丟棄上次輪詢的剩余記錄,并在其分區上執行查找,以便在下一次輪詢時重新傳遞失敗的記錄和未處理的記錄( )。
  • 通過設置 sleep 參數,消費者線程可以在重新交付之前暫停。 這與在容器配置了 SeekToCurrentErrorHandler 時拋出異常的功能類似。

當通過組管理使用分區分配時,確保 sleep 參數(加上處理先前輪詢的記錄所花費的時間)小于使用者 max.poll.interval.ms屬性,這個非常重要

四、監聽器容器自動啟動

偵聽器容器實現 SmartLifecycle,并且 autoStartup 默認為 true。 容器在后期啟動 (Integer.MAX-VALUE - 100)。 實現 SmartLifecycle 來處理來自偵聽器的數據的其他組件應在早期階段啟動。 -100 為后續階段留出了空間,使組件能夠在容器之后自動啟動。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/40023.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/40023.shtml
英文地址,請注明出處:http://en.pswp.cn/news/40023.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【機器學習】sklearn數據集的使用,數據集的獲取和劃分

「作者主頁」&#xff1a;士別三日wyx 「作者簡介」&#xff1a;CSDN top100、阿里云博客專家、華為云享專家、網絡安全領域優質創作者 「推薦專欄」&#xff1a;對網絡安全感興趣的小伙伴可以關注專欄《網絡安全入門到精通》 sklearn數據集 二、安裝sklearn二、獲取數據集三、…

mac錄屏工具,錄屏沒有聲音的解決辦法

mac錄屏工具&#xff0c;錄屏沒有聲音的解決辦法 在使用macbook錄制屏幕時&#xff0c;發現自帶的錄屏工具QuickTime Player沒有聲音&#xff0c;于是嘗試了多款錄屏工具&#xff0c;對其做一些經驗總結&#xff08;省流&#xff1a;APP Store直接可以免費下載使用Omi錄屏專家…

第三課-界面介紹SD-Stable Diffusion 教程

前言 我們已經安裝好了SD&#xff0c;這篇文章不介紹難以理解的原理&#xff0c;說使用。以后再介紹原理。 我的想法是&#xff0c;先學會畫&#xff0c;然后明白原理&#xff0c;再去提高技術。 我失敗過&#xff0c;知道三天打魚兩天曬網的痛苦&#xff0c;和很多人一樣試了…

TiDB數據庫從入門到精通系列之六:使用 TiCDC 將 TiDB 的數據同步到 Apache Kafka

TiDB數據庫從入門到精通系列之六&#xff1a;使用 TiCDC 將 TiDB 的數據同步到 Apache Kafka 一、技術流程二、搭建環境三、創建Kafka changefeed四、寫入數據以產生變更日志五、配置 Flink 消費 Kafka 數據 一、技術流程 快速搭建 TiCDC 集群、Kafka 集群和 Flink 集群創建 c…

【網絡編程系列】網絡編程實戰

&#x1f49d;&#x1f49d;&#x1f49d;歡迎來到我的博客&#xff0c;很高興能夠在這里和您見面&#xff01;希望您在這里可以感受到一份輕松愉快的氛圍&#xff0c;不僅可以獲得有趣的內容和知識&#xff0c;也可以暢所欲言、分享您的想法和見解。 推薦:kuan 的首頁,持續學…

使用Vue.js框架的指令和事件綁定實現一個購物車的頁面布局

使用了v-model指令來實現全選/全不選的功能&#xff0c;當全選框被點擊時&#xff0c;isAllChecked的值會被改變。使用了v-if指令來判斷購物車中是否有商品&#xff0c;如果有商品則渲染商品列表&#xff0c;否則顯示購物車為空的提示。使用了v-for指令來遍歷datalist數組&…

jvm內存溢出排查(使用idea自帶的內存泄漏分析工具)

文章目錄 1.確保生成內存溢出文件2.使用idea自帶的內存泄漏分析工具3.具體實驗一下 1.確保生成內存溢出文件 想分析堆內存溢出&#xff0c;一定在運行jar包時就寫上參數-XX:HeapDumpOnOutOfMemoryError&#xff0c;可以看我之前關于如何運行jar包的文章。若你沒有寫。可以寫上…

Keepalived入門指南:實現故障轉移和負載均衡

文章目錄 一、簡介1. Keepalived概述2. 高可用性和負載均衡的重要性 二、故障轉移1. 什么是故障轉移2. Keepalived的故障轉移原理a) VRRP協議b) 虛擬路由器ID和優先級 3. 配置Keepalived實現故障轉移a) 主備服務器的設置b) 監控網絡接口c) 虛擬IP的配置d) 備份服務器接管流程 三…

Python學習筆記_基礎篇(九)_面向對象編程

本篇內容: 1、反射2、面向對象編程3、面向對象三大特性4、類成員5、類成員修飾符6、類的特殊成員7、單例模式 反射 python中的反射功能是由以下四個內置函數提供&#xff1a;hasattr、getattr、setattr、delattr&#xff0c;改四個函數分別用于對對象內部執行&#xff1a;檢…

el-form自定義校驗規則

Vue 的 el-form 組件可以使用自定義校驗規則進行表單驗證。自定義校驗規則可以通過傳遞一個函數來實現&#xff0c;該函數接受要校驗的字段的值作為參數&#xff0c;并返回一個布爾值或一個 Promise 對象。 下面是一個示例&#xff0c;演示如何在 el-form 中使用自定義校驗規則…

若依前端npm run dev啟動時報錯

本文主要解決問題:若依前端npm run dev啟動時報錯,解決辦法。 目錄 1、第1種解決方案(親測有效) 2、第2種解決方案(親測有效) Error: error:0308010C:digital envelope routines::unsupportedat new Hash (node:internal/crypto/hash:67:19)at Object.createHash (node…

解決 adb install 錯誤INSTALL_FAILED_UPDATE_INCOMPATIBLE

最近給游戲出包&#xff0c;平臺要求 v1 簽名吧&#xff0c;AS 打包后&#xff0c;adb 執行安裝到手機&#xff0c;我用的設備是google pixel6 , android 系統 13&#xff0c; 提示如下&#xff1a; adb install -r v5_android_202308161046.apk Performing Streamed Install a…

centos 安裝.net 6 sdk

按照以下步驟在 CentOS 上安裝 .NET 6 SDK&#xff1a; 更新系統&#xff1a; sudo yum update安裝依賴項&#xff1a; sudo yum install -y curl libunwind libicu下載并添加 Microsoft 的軟件包存儲庫密鑰&#xff1a; sudo rpm -Uvh https://packages.microsoft.com/config/…

單片機第一季:零基礎13——AD和DA轉換

1&#xff0c;AD轉換基本概念 51 單片機系統內部運算時用的全部是數字量&#xff0c;即0 和1&#xff0c;因此對單片機系統而言&#xff0c;無法直接操作模擬量&#xff0c;必須將模擬量轉換成數字量。所謂數字量&#xff0c;就是用一系列0 和1 組成的二進制代碼表示某個信號大…

Linux -- 進階 Autofs自動掛載服務 實驗詳解

服務端創建共享目錄&#xff0c; 客戶端實現自動掛載 第一步 &#xff1a; 客戶端&#xff0c;服務端 均關閉安全軟件 [rootserver ~]# setenforce 0 [rootserver ~]# systemctl stop firewalld [rootnode1 ~]# setenforce 0 [rootnode1 ~]# systemctl stop firewalld 第二…

在K8s上處理nginx

基本說明 創建一個名為ssl的TLS類型的Secret對象&#xff0c;用于存儲證書和密鑰信息。 kubectl create secret tls ssl --certserver.crt --keyserver.key配置Nginx的events塊&#xff0c;設置worker連接數為1024。 events {worker_connections 1024; }配置Nginx的http塊&a…

MyBaits(單獨使用,與整合無關)小白版

文章目錄 概述比較配置寫xml加載上面配置并執行加載配置的方法方式一 執行方法方式一方式二(MyBatis映射器) 寫配置文件的映射文件設置對象的別名&#xff08;簡寫&#xff09;獲取自動生成的主鍵 查詢結果和java的映射規則基本類型映射&#xff1a;簡單對象映射&#xff1a;嵌…

加鹽加密算法

MD5加密加鹽加密項目密碼升級 MD5加密 MD5一系列公式進行復雜數學運算&#xff1b;特點&#xff1a;&#xff08;用途校驗和、計算hash值方式、加密&#xff09; 1&#xff1a;定長&#xff1b;無論原始數據多長&#xff1b;算出的結果都是4或者8字節的版本。 2&#xff1a;沖…

Java多線程實戰

Java多線程實戰 java多線程&#xff08;超詳細&#xff09; java自定義線程池總結 Java創建線程方式 方法1&#xff0c;繼承Thread類 方法2&#xff0c;實現Runable接口 方法2-2&#xff0c;匿名內部類形式lambda表達式 方法3&#xff0c;實現Callable接口&#xff0c;允許…

【深入理解Linux內核鎖】三、原子操作

我的圈子: 高級工程師聚集地 我是董哥,高級嵌入式軟件開發工程師,從事嵌入式Linux驅動開發和系統開發,曾就職于世界500強企業! 創作理念:專注分享高質量嵌入式文章,讓大家讀有所得! 文章目錄 1、原子操作思想2、整型變量原子操作2.1 API接口2.2 API實現2.2.1 原子變量結…