5、Receiving Messages:Message Listener Containers

提供了兩個MessageListenerContainer實現:

KafkaMessageListenerContainer

ConcurrentMessageListener容器

KafkaMessageListenerContainer在單個線程上接收來自所有主題或分區的所有消息。ConcurrentMessageListenerContainer委托給一個或多個KafkaMessageListenerCcontainer實例,以提供多線程消費。

從2.2.7版本開始,您可以將RecordInterceptor添加到偵聽器容器中;它將在調用偵聽器之前被調用,允許檢查或修改記錄。如果攔截器返回null,則不調用偵聽器。從2.7版本開始,它具有在偵聽器退出后調用的其他方法(通常,或通過拋出異常)。此外,從2.7版本開始,現在有一個BatchInterceptor,為Batch Listeners提供類似的功能。此外,ConsumerwareRecordInterceptor(和BatchInterceptor)還提供對Consumer的訪問?, ?>.例如,這可用于訪問攔截器中的消費者指標。

您不應該執行任何影響消費者在這些攔截器中的位置和/或已提交偏移量的方法;容器需要管理這些信息。

如果攔截器更改了記錄(通過創建新記錄),則主題、分區和偏移量必須保持不變,以避免記錄丟失等意外副作用。

CompositeRecordInterceptor和CompositeBatchInterceptor可用于調用多個攔截器。

默認情況下,從2.8版本開始,當使用事務時,攔截器會在事務開始之前被調用。您可以將偵聽器容器的interceptBeforeTx屬性設置為false,以便在事務開始后調用偵聽器。從2.9版本開始,這將適用于任何事務管理器,而不僅僅是KafkaAwareTransactionManagers。例如,這允許攔截器參與由容器啟動的JDBC事務。

從2.3.8、2.4.6版本開始,當并發性大于1時,ConcurrentMessageListenerContainer現在支持靜態成員資格。group.instance.id的后綴是-n,n從1開始。這與增加的session.timeout.ms一起可用于減少重新平衡事件,例如在重新啟動應用程序實例時。

Using KafkaMessageListenerContainer

以下構造函數可用:

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,ContainerProperties containerProperties)

它在ContainerProperties對象中接收ConsumerFactory以及有關主題和分區以及其他配置的信息。ContainerProperties具有以下構造函數:

public ContainerProperties(TopicPartitionOffset... topicPartitions)public ContainerProperties(String... topics)public ContainerProperties(Pattern topicPattern)

第一個構造函數接受TopicPartitionOffset參數數組,以顯式指示容器使用哪些分區(使用消費者assign()方法),并具有可選的初始偏移量。默認情況下,正值是絕對偏移量。默認情況下,負值相對于分區內當前最后一個偏移量。提供了一個接受額外布爾參數的TopicPartitionOffset構造函數。如果這是真的,則初始偏移(正或負)相對于該消費者的當前位置。容器啟動時應用偏移量。第二個是一個主題數組,Kafka根據group.id屬性分配分區?—?在整個組中分布分區。第三種使用正則表達式模式來選擇主題。

要將MessageListener分配給容器,可以在創建容器時使用ContainerProps.setMessageListener方法。以下示例顯示了如何執行此操作:

ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {...
});
DefaultKafkaConsumerFactory<Integer, String> cf =new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

請注意,在創建DefaultKafkaConsumerFactory時,使用僅接受上述屬性的構造函數意味著從配置中獲取鍵和值反序列化器類。或者,反序列化器實例可以傳遞給DefaultKafkaConsumerFactory構造函數的鍵和/或值,在這種情況下,所有消費者共享相同的實例。另一種選擇是提供Supplier<Deserializer>(從2.3版本開始),用于為每個Consumer獲取單獨的Deserializer實例:

DefaultKafkaConsumerFactory<Integer, CustomValue> cf =new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

有關可以設置的各種屬性的更多信息,請參閱ContainerProperties的Javadoc。

自2.1.1版本起,一個名為logContainerConfig的新屬性可用。當啟用true和INFO日志記錄時,每個偵聽器容器都會寫入一條日志消息,總結其配置屬性。

默認情況下,在DEBUG日志級別執行主題偏移提交的日志記錄。從2.1.2版本開始,ContainerProperties中名為commitLogLevel的屬性允許您指定這些消息的日志級別。例如,要將日志級別更改為INFO,可以使用containerProperties.setCommitLogLevel(LogIfLevelEnabled.level.INFO);。

從2.2版本開始,添加了一個名為missingTopicsFatal的新容器屬性(默認值:自2.3.4以來為false)。如果代理上不存在任何配置的主題,這將阻止容器啟動。如果容器配置為偵聽主題模式(正則表達式),則不適用。以前,容器線程在consumer.poll()方法中循環,在記錄許多消息時等待主題出現。除了日志,沒有跡象表明存在問題。

自2.8版本起,引入了新的容器屬性authExceptionRetryInterval。這會導致容器在從KafkaConsumer獲取任何AuthenticationException或AuthorizationException后重試獲取消息。例如,當配置的用戶被拒絕讀取某個主題或憑據不正確時,就會發生這種情況。定義authExceptionRetryInterval允許容器在授予適當權限時恢復。

默認情況下,不配置間隔-身份驗證和授權錯誤被認為是致命的,這會導致容器停止。

從2.8版本開始,在創建消費者工廠時,如果您將反序列化器作為對象提供(在構造函數中或通過setter),工廠將調用configure()方法,使用配置屬性對其進行配置。

Using ConcurrentMessageListenerContainer

單個構造函數類似于KafkaListenerContainer構造函數。以下列表顯示了構造函數的簽名:

public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,ContainerProperties containerProperties)

它還具有并發屬性。例如,container.setCourrency(3)創建了三個KafkaMessageListenerContainer實例。

如果容器屬性是為主題(或主題模式)配置的,Kafka將使用其組管理功能在消費者之間分配分區。

在聽多個主題時,默認的分區分布可能不是您所期望的。例如,如果你有三個主題,每個主題有五個分區,并且你想使用并發=15,你只會看到五個活動的消費者,每個消費者從每個主題分配一個分區,其他10個消費者處于空閑狀態。這是因為默認的Kafka ConsumerPartitionAssignor是RangeAssignor(請參閱其Javadoc)。對于這種情況,您可能需要考慮使用RoundRobinAssignor,它將分區分配給所有消費者。然后,為每個消費者分配一個主題或分區。要更改ConsumerPartitionAssignor,您可以在提供給DefaultKafkaConsumerFactory的屬性中設置partition.assignment.strategy消費者屬性(ConsumerConfig.partition_assignment_STRATY_CONFIG)。

使用Spring Boot時,您可以按如下方式分配和設置策略:

spring.kafka.consumer.properties.分區.分配.策略=\

org.apache.kafka.clients.consumer。RoundRobin分配器

當容器屬性配置了TopicPartitionOffset時,ConcurrentMessageListenerContainer會將TopicPartitionOffset實例分發到代理KafkaMessageListenerCcontainer實例中。

假設提供了六個TopicPartitionOffset實例,并發性為3;每個容器有兩個分區。對于五個TopicPartitionOffset實例,兩個容器獲得兩個分區,第三個容器獲得一個分區。如果并發性大于TopicPartitions的數量,則會降低并發性,使每個容器都有一個分區。

client.id屬性(如果設置)附加了-n,其中n是與并發性對應的消費者實例。這是在啟用JMX時為MBean提供唯一名稱所必需的。

從1.3版本開始,MessageListenerContainer提供了對底層KafkaConsumer指標的訪問。在ConcurrentMessageListenerContainer的情況下,metrics()方法返回所有目標KafkaMessageListenerCcontainer實例的度量。這些指標被分組到Map<MetricName?根據為底層KafkaConsumer提供的客戶端id擴展Metric>。

從2.3版本開始,ContainerProperties提供了一個idleBetweenPolls選項,讓偵聽器容器中的主循環在KafkaConsumer.poll()調用之間休眠。從提供的選項和max.poll.interval.ms消費者配置與當前記錄批處理時間之間的差值中選擇實際睡眠間隔作為最小值。

Committing Offsets

提供了幾種用于提交抵消的選項。如果enable.auto.commit消費者屬性為true,Kafka會根據其配置自動提交偏移量。如果為false,則容器支持多個AckMode設置(如下表所述)。默認的確認模式為批處理。從2.3版本開始,除非在配置中明確設置,否則框架將enable.auto.commit設置為false。以前,如果未設置屬性,則使用Kafka默認值(true)。

Consumerpoll()方法返回一個或多個ConsumerRecords。每條記錄都會調用MessageListener。以下列表描述了容器對每個AckMode(未使用事務時)采取的操作:

RECORD:在監聽器處理記錄后返回時提交偏移量。

批處理:當poll()返回的所有記錄都已處理完畢時,提交偏移量。

TIME:在poll()返回的所有記錄都已處理完畢時提交偏移量,只要超過了自上次提交以來的ackTime。

COUNT:在poll()返回的所有記錄都已處理完畢時提交偏移量,只要自上次提交以來已收到ackCount記錄即可。

COUNT_TIME:類似于TIME和COUNT,但如果任一條件為真,則執行提交。

手冊:消息監聽器負責確認()確認。之后,應用與BATCH相同的語義。

MANUAL_IMMEDIATE:當偵聽器調用Acknowledgment.reacknowe()方法時,立即提交偏移量。

使用事務時,偏移量被發送到事務,語義等效于RECORD或BATCH,具體取決于偵聽器類型(記錄或批處理)。

MANUAL和MANUAL_IMMEDIATE要求偵聽器是AcknowledgingMessageListener或BatchAcknowledingMessageListener。請參閱消息偵聽器。

根據syncCommits容器屬性,使用消費者上的commitSync()或commitSync(()方法。syncCommits默認為true;另請參見setSyncCommitTimeout。查看setCommitCallback以獲取異步提交的結果;默認回調是LoggingCommitCallback,它記錄錯誤(以及調試級別的成功)。

因為監聽器容器有自己的偏移提交機制,所以它更喜歡Kafka ConsumerConfig。ENABLE_AUTO_COMIT_CONFIG為false。從2.3版本開始,它無條件地將其設置為false,除非在消費者工廠中特別設置或容器的消費者屬性重寫。

確認書有以下方法:

public interface Acknowledgment {void acknowledge();}

此方法使偵聽器可以控制何時提交偏移量。

從2.3版本開始,Acknowledgment接口有兩個額外的方法nack(長睡眠)和nack(int index,長睡眠)。第一個用于記錄偵聽器,第二個用于批處理偵聽器。為您的偵聽器類型調用錯誤的方法將引發IllegalStateException。

如果你想使用nack()提交部分批處理,在使用事務時,將AckMode設置為MANUAL;調用nack()將成功處理的記錄的偏移量發送給事務。

nack()只能在調用監聽器的消費者線程上調用。

使用無序提交時不允許使用nack()。

使用記錄偵聽器,當調用nack()時,任何掛起的偏移都會被提交,最后一次輪詢的剩余記錄會被丟棄,并在它們的分區上執行查找,以便在下一次輪詢()時重新傳遞失敗的記錄和未處理的記錄。通過設置sleep參數,可以在重新交付之前暫停消費者。這與在容器配置了DefaultErrorHandler時拋出異常的功能類似。

nack()在指定的睡眠持續時間內暫停整個偵聽器,包括所有分配的分區。

使用批偵聽器時,可以指定發生故障的批中的索引。當調用nack()時,在對失敗和丟棄的記錄的分區執行索引和查找之前,將為記錄提交偏移量,以便在下一次poll()時重新傳遞它們。

有關更多信息,請參閱容器錯誤處理程序。

消費者在睡眠期間暫停,以便我們繼續輪詢經紀人以保持消費者的活力。實際睡眠時間及其分辨率取決于容器的pollTimeout,默認值為5秒。最小睡眠時間等于pollTimeout,所有睡眠時間都是它的倍數。對于較小的睡眠時間,或者為了提高其準確性,可以考慮減少容器的pollTimetime。

從3.0.10版本開始,批處理監聽器可以使用Acknowledgment參數上的confirm(index)來提交批處理部分的偏移量。調用此方法時,將提交索引處記錄的偏移量(以及所有以前的記錄)。在執行部分批處理提交后調用confirmate()將提交批處理剩余部分的偏移量。以下限制適用:

確認模式。需要立即手動

必須在偵聽器線程上調用該方法

偵聽器必須使用List而不是原始ConsumerRecords

索引必須在列表元素的范圍內

索引必須大于前一次調用中使用的索引

這些限制被強制執行,該方法將根據違規情況拋出IllegalArgumentException或IllegalStateException。

Listener Container Auto Startup

偵聽器容器實現SmartLifecycle,默認情況下autoStartup為true。容器在后期階段(Integer.MAX-VALUE-100)啟動。實現SmartLifecycle以處理來自偵聽器的數據的其他組件應在早期階段啟動。-100為后續階段留出了空間,使組件能夠在容器后自動啟動。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/87882.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/87882.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/87882.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

JDBC 注冊驅動的常用方法詳解

JDBC 注冊驅動的常用方法詳解 在 JDBC 中&#xff0c;注冊驅動是建立數據庫連接的第一步。以下是幾種常用的驅動注冊方式&#xff1a; 1. 顯式類加載&#xff08;傳統方式&#xff09; // 通過 Class.forName() 加載驅動類 Class.forName("com.mysql.cj.jdbc.Driver&qu…

插入數據優化

目錄 一.插入數據優化 1.insert語句優化 ①批量插入 ②手動提交事務 ③主鍵順序插入 2.大批量插入數據&#xff08;100萬條&#xff09; 舉例 第一步&#xff1a;連接數據庫時&#xff0c;加上--local-infile屬性 第二步&#xff1a;查看全局參數local_infile的值&…

區塊鏈在域名系統安全中的應用進展綜述

一、區塊鏈與DNS結合的核心原理1.1 傳統DNS的安全缺陷中心化架構&#xff1a;傳統DNS依賴中心化服務器&#xff08;如ICANN管理的根服務器&#xff09;&#xff0c;存在單點故障風險&#xff0c;易受DDoS攻擊或配置錯誤影響。協議脆弱性&#xff1a;DNS協議設計之初缺乏加密和認…

GO Web 框架 Gin 完全解析與實踐

目錄 1. 為什么選擇 Gin?解鎖 Go Web 開發的超能力 Gin 的核心優勢 什么時候用 Gin? 第一個 Hello World 2. 路由的藝術:從簡單 GET 到復雜匹配 基礎路由 高級路由技巧 性能優化小貼士 3. 中間件的魔法:讓請求處理更聰明 內置中間件 自定義中間件 中間件的最佳實…

RabbitMQ使用topic Exchange實現微服務分組訂閱

案例場景&#xff1a;用戶下單后需要多個微服務&#xff08;如營銷、會員&#xff09;分別訂閱并處理訂單事件&#xff0c;且每個微服務可能有多個集群實例&#xff0c;需要保證同一個微服務的集群中&#xff0c;只有一個實例消費到消息。不同于Kafka和rocketMQ有分組消費的功能…

kotlin 通道trysend方法

trySend 方法是 Kotlin 協程中 Channel 類的一個重要功能。它用于向通道發送元素&#xff0c;但與 send 方法不同的是&#xff0c;trySend 是非阻塞的。這意味著它不會在通道滿時掛起當前協程&#xff0c;而是會立即返回。 trySend 方法的效果 非阻塞行為&#xff1a; 當你調用…

winform CheckedListBox單擊選中解決方案

在WinForms的CheckedListBox控件中&#xff0c;默認需要雙擊才能切換選中狀態&#xff08;復選框勾選&#xff09;。要實現單擊即選中&#xff0c;需要通過代碼處理鼠標點擊事件并手動切換選中狀態。以下是實現步驟&#xff1a; 1.CheckOnClick屬性置為true即可。 2.通過事件處…

Docker文件操作、數據卷、掛載

一&#xff1a;容器文件操作 在Docker環境中&#xff0c;管理容器內部的文件是一個常見的需求。 無論是為了配置應用、備份數據還是調試問題&#xff0c;了解如何高效地進行文件操作都是非常重要的。 docker cp命令提供了一種簡單的方法來在宿主主機和容器之間復制文件或目錄…

接口漏洞怎么抓?Fiddler 中文版 + Postman + Wireshark 實戰指南

接口安全是現代應用開發中的高危環節&#xff1a;一旦API存在未授權訪問、參數篡改、權限繞過等漏洞&#xff0c;可能直接導致用戶信息泄露、資金損失甚至整個平臺癱瘓。對于開發和安全人員來說&#xff0c;光依賴后端日志排查遠遠不夠&#xff0c;需要對接口進行主動安全性驗證…

iOS 出海 App 安全加固指南:無源碼環境下的 IPA 加固與防破解方法

隨著越來越多國內開發團隊將iOS App推向海外市場&#xff0c;如何在交付和分發環節保護應用安全成為出海過程中的重要議題。尤其是App進入多個海外應用商店或通過第三方渠道發行時&#xff0c;容易被當地黑產或競爭對手進行逆向分析&#xff0c;從而暴露內部API、核心業務流程等…

React Hooks 內部實現原理與函數組件更新機制

React Hooks 內部實現原理與函數組件更新機制 Hooks 的內部實現原理 React Hooks 的實現依賴于以下幾個關鍵機制&#xff1a; 1. 鏈表結構存儲 Hook 狀態 React 使用單向鏈表來管理 Hooks 的狀態。每個 Hook 節點包含&#xff1a; type Hook {memoizedState: any, // 存儲…

分布式會話的演進和最佳實踐,含springBoot 實現(Java版本)

一、分布式會話的背景 在微服務架構或集群部署環境下&#xff0c;請求可能落在不同的服務器節點&#xff0c;無法再依賴本地內存來維護用戶 Session。因此&#xff0c;需要一種跨節點共享 Session 的機制&#xff0c;這就是 分布式會話管理的核心目標。二、分布式會話的演進歷程…

ch03 部分題目思路

G. 收集 由于稀有度相同的物品需要一起處理&#xff0c;我們先把他們聚集到一起。 類似這樣&#xff1a; vector<int> g[maxn]; ... {cin >> x >> c;g[c].push_back(x); }那么我們需要一個貪心的思路&#xff1a; 肯定是按 ccc 從小往大收集的&#xff1b;對…

Django多表查詢(ORM)

1、建立表結構 三個表&#xff1a;book、Author、publisher。 書籍和作者是多對多的關系&#xff0c;一本書可以有多個作者&#xff0c;一個作者可以有多本書。 出版社和書籍是一對多的關系&#xff0c;一個出版社可以出版多本書&#xff08;多方&#xff0c;多方定義外鍵&…

C# 集合表達式和展開運算符 (..) 詳解

集合表達式 (Collection Expressions)基本語法支持的集合類型展開運算符 (..)基本用法實際應用示例創建新集合合并集合與現有API結合性能考慮高級用法多維集合自定義集合注意事項與傳統方式的比較總結集合表達式 (Collection Expressions) C# 12 引入了集合表達式&#xff0c;…

數學視頻動畫引擎Python庫 -- Manim Voiceover 安裝 Installation

文中內容僅限技術學習與代碼實踐參考&#xff0c;市場存在不確定性&#xff0c;技術分析需謹慎驗證&#xff0c;不構成任何投資建議。 Manim Voiceover 是一個為 Manim 打造的專注于語音旁白的插件&#xff1a; 直接在 Python 中添加語音旁白&#xff1a; 無需使用視頻編輯器&…

Git安裝避坑指南:新手村通關秘籍

Git安裝避坑指南&#xff1a;新手村通關秘籍 剛學編程那會兒&#xff0c;Git安裝差點讓我砸鍵盤。滿心歡喜打開官網下載&#xff0c;結果卡在配置上&#xff0c;命令行死活不認識git命令。看著教程里別人行云流水的操作&#xff0c;自己對著報錯信息干瞪眼——這感覺&#xff…

如何修改Siteground max_execution_time值?

這個值在Siteground 上是修改不了的。 以下是來自Siteground 官網的解釋&#xff1a; 由于服務器上全局定義的 PHP 限制&#xff0c;某些 PHP 設置無法更改。最常見的無法更改的 PHP 設置包括&#xff1a; memory_limit max_execution_time max_input_time post_max_size up…

【libm】 11 fmin函數 (fmin.rs)

一、源碼 這段代碼實現了一個符合 IEEE 754-2008 標準的 minNum 函數&#xff08;在 Rust 中命名為 fmin&#xff09;&#xff0c;該功能在 IEEE 754-2019 標準中已被 minimumNumber 取代。 /* SPDX-License-Identifier: MIT OR Apache-2.0 */ //! IEEE 754-2008 minNum. Thi…

React 英語單詞消消樂一款專為英語學習設計的互動式記憶游戲

&#x1f4d6; 項目簡介 英語單詞消消樂 是一款專為英語學習設計的互動式記憶游戲。通過經典的消消樂玩法&#xff0c;讓用戶在輕松愉快的游戲中掌握英語單詞&#xff0c;提高詞匯量和記憶效果。 &#x1f3af; 項目目標 讓英語學習變得有趣且高效通過游戲化方式增強單詞記憶…