Spring for Apache Pulsar->Reactive Support->Message Production

好消息:Spring for Apache Pulsar這兩天剛剛升到2.0.0版本

1. ReactivePulsarTemplate

在Pulsar生產者端,Spring Boot自動配置提供了一個ReactivePulsarTemplate用于發布記錄。該模板實現了一個名為ReactivePulse Operations的接口,并提供了通過其合約發布記錄的方法。

該模板提供了send方法,可以接受單個消息并返回Mono<MessageId>。它還提供了send方法,可以接受多條消息(以ReactiveStreams Publisher類型的形式)并返回Flux<MessageId>。

對于不包含主題參數的API變體,將使用主題解析過程來確定目標主題。

1.1. Fluent API

該模板提供了一個流暢的構建器來處理更復雜的發送請求。

1.2. Message customization

您可以指定MessageSpecBuilderCustomizer來配置傳出消息。例如,以下代碼顯示了如何發送鍵控消息:

template.newMessage(msg).withMessageCustomizer((mc) -> mc.key("foo-msg-key")).send();

1.3. Sender customization

您可以指定一個ReactiveMessageSenderBuilderCustomizer來配置底層Pulsar發送器生成器,該生成器最終構建用于發送傳出消息的發送器。

請謹慎使用,因為這可以完全訪問發送方構建器,調用其某些方法(如create)可能會產生意想不到的副作用。

例如,以下代碼顯示了如何禁用批處理和啟用分塊:

template.newMessage(msg).withSenderCustomizer((sc) -> sc.enableChunking(true).enableBatching(false)).send();

另一個示例顯示了如何在將記錄發布到分區主題時使用自定義路由。在發送方構建器上指定自定義MessageRouter實現,例如:

template.newMessage(msg).withSenderCustomizer((sc) -> sc.messageRouter(messageRouter)).send();

請注意,使用MessageRouter時,spring.pulsar.producter.message-routing-mode的唯一有效設置是自定義。

2. Specifying Schema Information

如果您使用Java基元類型,框架會自動為您檢測模式,您不需要指定任何模式類型來發布數據。對于非基元類型,如果在ReactivePulsarTemplate上調用send操作時沒有明確指定Schema,則Spring For Apache Pulsar框架將嘗試構建Schema。JSON類型。

目前支持的復雜模式類型有JSON、AVRO、PROTOBUF、AUTO_PRODUCE_BYTES和帶內聯編碼的KEY_VALUE。

2.1. Custom Schema Mapping

作為在ReactivePulse Template上為復雜類型調用發送操作時指定模式的替代方法,可以使用類型的映射配置模式解析器。這消除了在框架使用傳出消息類型咨詢解析器時指定模式的需要。

2.1.1. Configuration properties

模式映射可以使用spring.pulsar.defaults.type-mappings屬性進行配置。以下示例使用application.yml分別使用AVRO和JSON模式為User和Address復雜對象添加映射:

spring:pulsar:defaults:type-mappings:- message-type: com.acme.Userschema-info:schema-type: AVRO- message-type: com.acme.Addressschema-info:schema-type: JSON

消息類型是消息類的完全限定名。

2.1.2. Schema resolver customizer

添加映射的首選方法是通過上述屬性。但是,如果需要更多的控制,您可以提供一個模式解析器定制器來添加映射。

以下示例使用模式解析器定制器分別使用AVRO和JSON模式為User和Address復雜對象添加映射:

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {return (schemaResolver) -> {schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));}
}
2.1.3. Type mapping annotation

指定用于特定消息類型的默認模式信息的另一種選擇是用@PulsarMessage注釋標記消息類。可以通過注釋上的schemaType屬性指定架構信息。

以下示例將系統配置為在生成或使用Foo類型的消息時使用JSON作為默認模式:

@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}

有了這個配置,就不需要在發送操作上設置或指定模式。

2.2. Producing with AUTO_SCHEMA

如果沒有機會提前知道Pulsar主題的模式類型,您可以使用AUTO_PRODUCE模式將原始JSON或Avro有效載荷安全地發布為byte[]。

在這種情況下,生產者會驗證出站字節是否與目標主題的模式兼容。

只需指定schema的模式。模板上的AUTO_PRODUCE_BYTES()發送操作如下例所示:

void sendUserAsBytes(ReactivePulsarTemplate<byte[]> template, byte[] userAsBytes) {template.send("user-topic", userAsBytes, Schema.AUTO_PRODUCE_BYTES()).subscribe();
}

這僅支持Avro和JSON模式類型。

3. ReactivePulsarSenderFactory

ReactivePulsarTemplate依賴于ReactivePulse SenderFactory來實際創建底層發送方。

Spring Boot提供了這個發送器工廠,可以配置任何Spring.pulser.producer.*應用程序屬性。

如果直接使用發送方工廠API時未指定主題信息,則使用ReactivePulse Template使用的相同主題解析過程,但省略了“消息類型默認”步驟。

3.1. Producer Caching

每個底層Pulsar生產者都會消耗資源。為了提高性能并避免持續創建生產者,底層Apache Pulsar Reactive客戶端中的ReactiveMessageSenderCache緩存了它創建的生產者。它們以LRU方式緩存,并在配置的時間段內未被使用時被驅逐。

您可以通過指定任何spring.pulsinger.producer.cache.*應用程序屬性來配置緩存設置。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/913851.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/913851.shtml
英文地址,請注明出處:http://en.pswp.cn/news/913851.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

AtCoder Beginner Contest 413

比賽鏈接如下&#xff1a;Denso Create Programming Contest 2025&#xff08;AtCoder Beginner Contest 413&#xff09; - AtCoder A - Content Too Large Problem Statement Takahashi has N items and one bag. The size of the i-th (1≤i≤N) item is Ai?, and the si…

Java學習---JVM(1)

JVM&#xff0c;即Java虛擬機&#xff0c;其是Java程序的運行環境&#xff0c;是Java技術的核心組成部分&#xff0c;本次就JVM的自動內存管理詳細展開&#xff1a;JVM的內存區域分為2大類&#xff0c;即線程私有的和線程共享的&#xff0c;前者分為3大塊&#xff0c;虛擬機棧、…

Qt去噪面板搭建

建立單選互斥性面板用于選擇噪聲屬性// 創建去噪面板 QWidget* noisePanel new QWidget(); QVBoxLayout* mainLayout new QVBoxLayout(noisePanel); mainLayout->setContentsMargins(10, 10, 10, 10); mainLayout->setSpacing(15);// 去噪方法選擇組QGroupBox* methodG…

無需公網IP的文件交互:FileCodeBox容器化部署技術解析

文章目錄 前言1.Docker部署2.簡單使用演示3. 安裝cpolar內網穿透4. 配置公網地址5. 配置固定公網地址 前言 在數字化辦公需求日益增長的今天&#xff0c;文件傳輸已成為職場協作的高頻剛需。傳統共享方式卻飽受詬病&#xff1a;"需要安裝哪些臃腫客戶端&#xff1f;免費版…

1. http 有哪些版本,你是用的哪個版本,怎么查看

http 有哪些版本&#xff0c;你是用的哪個版本&#xff0c;怎么查看 總結&#xff1a;http 版本有 0.9/1.0/1.1/2.0/3.0&#xff0c;我們常用的是 1.1 和 2.0&#xff0c;使用 window.chrome.loadTimes() 獲取 http 版本。 常見的 HTTP 版本 HTTP/0.9&#xff1a;最初的版本&am…

C# IIncrementalGenerator干點啥

生成器項目 得基于.Net Stander 2.0 重要&#xff1a;<IsRoslynComponent>true</IsRoslynComponent>、<IncludeBuildOutput>false</IncludeBuildOutput>、 <PackageReference Include"Microsoft.CodeAnalysis" Version"4.14.0&q…

在徐州網絡中服務器租用與托管的優勢

一、高性價比&#xff1a;徐州萬恒提供多種配置的服務器供租用&#xff0c;滿足不同企業和個人的業務需求&#xff0c;無論是初創企業追求低成本高效能&#xff0c;還是對性能有嚴苛要求的大型項目&#xff0c;都能找到合適的服務器型號&#xff0c;以極具競爭力的價格獲取強大…

學習軟件測試的第十四天(移動端)

一.常用的abd命令有哪些1.什么是 ADB&#xff1f;通俗解釋&#xff1a; ADB 就像一個橋梁&#xff0c;讓電腦能控制連接的手機&#xff0c;比如安裝APP、抓日志、重啟設備等。專業術語總結&#xff1a; ADB&#xff08;Android Debug Bridge&#xff09;是 Android SDK 提供的命…

04-ES6

let和const命令ES6中新增了let命令&#xff0c;用來聲明變量&#xff0c;用法類似與varlet和var的不同&#xff1a;1、不存在變量提升 console.log(a); //Cannot access a before initializationlet a 100;2、同一個作用域不能重復定義同一個名稱var c 20;let c 30;c…

基于GeographicLib實現測站地平坐標系(東北天)轉地心固定坐標系XYZ

一、概述主要內容&#xff1a;本文基于GeographicLib開源庫&#xff0c;實現了一個地理空間坐標轉換功能&#xff0c;主要用于根據觀測站的位置和目標的相對方位信息&#xff0c;計算目標在地球坐標系中的絕對位置。輸入&#xff1a;觀測站的經緯度坐標(緯度、經度、海拔高度)和…

若依框架去掉Redis

這篇文章全是按照我的實戰操作來的&#xff0c;本文一是記錄一下這個過程&#xff0c;二是幫助更多的人少走彎路。 接下來我們看實戰&#xff1a;第一步毋庸置疑&#xff0c;就是找到配置文件application.yml里面大redis配置部分&#xff0c;直接注釋掉 注意這里的data:這是否注…

【會員專享數據】2013-2024年我國省市縣三級逐日SO?數值數據(Shp/Excel格式)

之前我們分享過2013-2024年全國范圍逐日SO?柵格數據&#xff08;可查看之前的文章獲悉詳情&#xff09;!該數據來源于韋晶博士、李占清教授團隊發布在國家青藏高原科學數據中心網站上的中國高分辨率高質量近地表空氣污染物數據集。很多小伙伴拿到數據后反饋柵格數據不太方便使…

TCP SYN、UDP、ICMP之DOS攻擊

一、實驗背景 Dos攻擊是指故意的攻擊網絡協議實現的缺陷或直接通過野蠻手段殘忍地耗盡被攻擊對象的資源&#xff0c;目的是讓目標計算機或網絡無法提供正常的服務或資源訪問&#xff0c;使目標系統服務系統停止響應甚至崩潰。 二、實驗設備 1.一臺靶機Windows主機 2.增加一個網…

Ntfs!LfsUpdateLfcbFromRestart函數分析之根據Ntfs!_LFS_RESTART_AREA初始化Ntfs!_LFCB

第一部分&#xff1a;LfsUpdateLfcbFromRestart( ThisLfcb,FileSize,DiskRestartArea,FirstRestar1: kd> p Ntfs!LfsRestartLogFile0x317: f71fc8dd e820e5ffff call Ntfs!LfsUpdateLfcbFromRestart (f71fae02) 1: kd> t Ntfs!LfsUpdateLfcbFromRestart: f71fae0…

Qt開發:QtConcurrent介紹和使用

文章目錄一、QtConcurrent 簡介二、常用功能分類2.1 異步運行一個函數&#xff08;無返回值&#xff09;2.2 異步運行一個帶參數的函數&#xff08;有返回值&#xff09;2.3 綁定類成員函數2.4 容器并行處理&#xff08;map&#xff09;三、線程池控制四、取消任務五、典型應用…

企業數據開發治理平臺選型:13款系統優劣對比

本文將深入對比13款主流的數據指標管理平臺&#xff1a;1.網易數帆&#xff1b; 2.云徙科技&#xff1b; 3.數瀾科技&#xff1b; 4.用友數據中臺&#xff1b; 5.龍石數據中臺&#xff1b; 6.SelectDB&#xff1b; 7.得帆云 DeHoop 數據中臺&#xff1b; 8.Talend&#xff1b; …

Java JDK 下載指南

Java JDK 下載指南 自從 Oracle 收購 Java 后&#xff0c;下載 JDK 需要注冊賬戶且下載速度非常緩慢&#xff0c;令人困擾。 解決方案&#xff1a; 華為云提供了便捷的 JDK 下載鏡像&#xff0c;訪問速度快且無需注冊&#xff1a; https://repo.huaweicloud.com/java/jdk/ 高…

QT數據交互全解析:JSON處理與HTTP通信

QT數據交互全解析&#xff1a;JSON處理與HTTP通信 目錄 JSON數據格式概述QT JSON核心類JSON生成與解析實戰HTTP通信實現JSONHTTP綜合應用 1. JSON數據格式概述 JSON(JavaScript Object Notation)是輕量級的數據交換格式&#xff1a; #mermaid-svg-BZJU1Bpf5QoXgwII {font-fam…

Function Call大模型的理解(大白話版本)

由來---場景設計你雇了一位 超級聰明的百科全書管家&#xff08;就是大模型&#xff0c;比如GPT&#xff09;。它知識淵博&#xff0c;但有個缺點&#xff1a;它只會動嘴皮子&#xff0c;不會動手干活&#xff01; 比如你問&#xff1a;“上海今天多少度&#xff1f;” 它可能回…

【PTA數據結構 | C語言版】求兩個正整數的最大公約數

本專欄持續輸出數據結構題目集&#xff0c;歡迎訂閱。 文章目錄題目代碼題目 請編寫程序&#xff0c;求兩個正整數的最大公約數。 輸入格式&#xff1a; 輸入在一行中給出一對正整數 0<x,y≤10^6&#xff0c;數字間以空格分隔。 輸出格式&#xff1a; 在一行中輸出 x 和 …