Using Spring for Apache Pulsar:Message Production

1. Pulsar Template

在Pulsar生產者端,Spring Boot自動配置提供了一個用于發布記錄的PulsarTemplate。該模板實現了一個名為PulsarOperations的接口,并提供了通過其合約發布記錄的方法。

這些send API方法有兩類:send和sendAsync。send方法通過Pulsar生成器上的同步發送功能阻止調用。它們返回消息在代理上持久化后發布的消息的MessageId。sendAsync方法調用是非阻塞的異步調用。它們返回一個CompletableFuture,您可以在消息發布后使用它異步接收消息ID。

對于不包含主題參數的API變體,將使用主題解析過程來確定目標主題。

1.1. Simple API

該模板為簡單的發送請求提供了一些方法(前綴為“send”)。對于更復雜的發送請求,流暢的API可以讓您配置更多選項。

1.2. Fluent API

該模板提供了一個流暢的構建器來處理更復雜的發送請求。

1.3. Message customization

您可以指定一個TypedMessageBuilderCustomizer來配置傳出消息。例如,以下代碼顯示了如何發送鍵控消息:

template.newMessage(msg).withMessageCustomizer((mb) -> mb.key("foo-msg-key")).send();

1.4. Producer customization

您可以指定一個ProducerBuilderCustomizer來配置底層Pulsar生產者構建器,該生成器最終構建用于發送傳出消息的生產者。

請謹慎使用,因為這可以完全訪問生產者構建器,調用其某些方法(如create)可能會產生意想不到的副作用。

例如,以下代碼顯示了如何禁用批處理和啟用分塊:

template.newMessage(msg).withProducerCustomizer((pb) -> pb.enableChunking(true).enableBatching(false)).send();

另一個示例顯示了如何在將記錄發布到分區主題時使用自定義路由。在Producer構建器上指定自定義MessageRouter實現,例如:

template.newMessage(msg).withProducerCustomizer((pb) -> pb.messageRouter(messageRouter)).send();

請注意,使用MessageRouter時,spring.pulsar.producter.message-routing-mode的唯一有效設置是自定義。

另一個示例顯示了如何添加一個ProducerInterceptor,該攔截器將攔截和修改生產者在發布到代理之前收到的消息:

template.newMessage(msg).withProducerCustomizer((pb) -> pb.intercept(interceptor)).send();

定制程序將僅適用于用于發送操作的生產者。如果要將自定義程序應用于所有生產商,則必須按照全球生產商自定義中的描述將其提供給生產商工廠。

2. Specifying Schema Information

如果您使用Java基元類型,框架會自動為您檢測模式,您不需要指定任何模式類型來發布數據。對于非基元類型,如果在PulsarTemplate上調用send操作時沒有明確指定Schema,則Spring For Apache Pulsar框架將嘗試構建Schema。JSON類型。

目前支持的復雜模式類型有JSON、AVRO、PROTOBUF、AUTO_PRODUCE_BYTES和帶內聯編碼的KEY_VALUE。

2.1. Custom Schema Mapping

作為在PulsarTemplate上調用復雜類型的發送操作時指定模式的替代方法,模式解析器可以配置類型的映射。這消除了在框架使用傳出消息類型咨詢解析器時指定模式的需要。

2.1.1. Configuration properties

模式映射可以使用spring.pulsar.defaults.type-mappings屬性進行配置。以下示例使用application.yml分別使用AVRO和JSON模式為User和Address復雜對象添加映射:

spring:pulsar:defaults:type-mappings:- message-type: com.acme.Userschema-info:schema-type: AVRO- message-type: com.acme.Addressschema-info:schema-type: JSON

消息類型是消息類的完全限定名。

2.1.2. Schema resolver customizer

添加映射的首選方法是通過上述屬性。但是,如果需要更多的控制,您可以提供一個模式解析器定制器來添加映射。

以下示例使用模式解析器定制器分別使用AVRO和JSON模式為User和Address復雜對象添加映射:

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {return (schemaResolver) -> {schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));}
}
2.1.3. Type mapping annotation

指定用于特定消息類型的默認模式信息的另一種選擇是用@PulsarMessage注釋標記消息類。可以通過注釋上的schemaType屬性指定架構信息。

以下示例將系統配置為在生成或使用Foo類型的消息時使用JSON作為默認模式:

@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}

有了這個配置,就不需要在發送操作上設置或指定模式。

2.2. Producing with AUTO_SCHEMA

如果沒有機會提前知道Pulsar主題的模式類型,您可以使用AUTO_PRODUCE模式將原始JSON或Avro有效載荷安全地發布為byte[]。

在這種情況下,生產者會驗證出站字節是否與目標主題的模式兼容。

只需指定schema的模式。模板上的AUTO_PRODUCE_BYTES()發送操作如下例所示:

void sendUserAsBytes(PulsarTemplate<byte[]> template, byte[] userAsBytes) {template.send("user-topic", userAsBytes, Schema.AUTO_PRODUCE_BYTES());
}

這僅支持Avro和JSON模式類型。

3. Pulsar Producer Factory

PulsarTemplate依賴于PulsarProducerFactory來實際創建底層生產者。Spring Boot自動配置還提供了這個生產者工廠,您可以通過指定任何Spring.pulser.producer.*應用程序屬性來進一步配置它。

如果在直接使用生產者工廠API時沒有指定主題信息,則使用PulsarTemplate使用的相同主題解析過程,唯一的例外是省略了“消息類型默認”步驟。

3.1. Global producer customization

該框架提供了ProducerBuilderCustomizer合約,該合約允許您配置用于構建每個生產者的底層構建器。要自定義所有生產者,您可以將自定義者列表傳遞給PulsarProducerFactory構造函數。使用多個自定義程序時,它們將按照在列表中顯示的順序應用。

如果您使用Spring Boot自動配置,您可以將自定義程序指定為bean,它們將根據其@Order注釋自動傳遞給PulsarProducerFactory。

如果您只想將自定義程序應用于單個生產商,您可以使用Fluent API并在發送時指定自定義程序。

4. Pulsar Producer Caching

每個底層Pulsar生產者都會消耗資源。為了提高性能并避免不斷創建生產者,生產者工廠會緩存它創建的生產者。它們以LRU方式緩存,并在配置的時間段內未被使用時被驅逐。緩存鍵由足夠的信息組成,以確保在后續的創建請求中,調用者返回相同的生產者。

此外,您可以通過指定任何spring.pulsinger.producer.cache.*應用程序屬性來配置緩存設置。

4.1. Caution on Lambda customizers

任何用戶提供的生產者定制器也包含在緩存密鑰中。由于緩存鍵依賴于equals/hashCode的有效實現,因此在使用Lambda自定義程序時必須謹慎。

規則:實現為Lambdas的兩個自定義程序將在equals/hashCode上匹配,當且僅當它們使用相同的Lambda實例并且不需要在其閉包外定義任何變量時。

為了澄清上述規則,我們將看幾個例子。在下面的示例中,定制器被定義為內聯Lambda,這意味著對sendUser的每次調用都使用相同的Lambda實例。此外,它不需要閉包外的變量。因此,它將作為緩存鍵匹配。

void sendUser() {var user = randomUser();template.newMessage(user).withTopic("user-topic").withProducerCustomizer((b) -> b.producerName("user")).send();
}

在下一種情況下,定制器被定義為內聯Lambda,這意味著對sendUser的每次調用都使用相同的Lambda實例。但是,它需要一個閉包外的變量。因此,它將不匹配為緩存鍵。

void sendUser() {var user = randomUser();var name = randomName();template.newMessage(user).withTopic("user-topic").withProducerCustomizer((b) -> b.producerName(name)).send();
}

在最后一個例子中,定制器被定義為內聯Lambda,這意味著對sendUser的每次調用都使用相同的Lambda實例。雖然它確實使用了一個變量名,但它并非源自其閉包之外,因此將作為緩存鍵進行匹配。這說明變量可以在Lambda閉包中使用,甚至可以調用靜態方法。

void sendUser() {var user = randomUser();template.newMessage(user).withTopic("user-topic").withProducerCustomizer((b) -> {var name = SomeHelper.someStaticMethod();b.producerName(name);}).send();
}

規則:如果你的Lambda定制器不是只定義一次(在后續調用中使用相同的實例),或者它需要在閉包之外定義變量,那么你必須提供一個具有有效equals/hashCode實現的定制器實現。

如果不遵守這些規則,那么生產者緩存將始終丟失,您的應用程序性能將受到負面影響。

5. Intercept Messages on the Producer

添加ProducerInterceptor可以讓您在生產者接收到的消息發布到代理之前對其進行攔截和修改。為此,您可以將攔截器列表傳遞給PulsarTemplate構造函數。使用多個攔截器時,應用它們的順序是它們在列表中出現的順序。

如果使用Spring Boot自動配置,則可以將攔截器指定為Beans。它們會自動傳遞給PulsarTemplate。攔截器的排序是通過使用@Order注釋實現的,如下所示:

@Bean
@Order(100)
ProducerInterceptor firstInterceptor() {...
}@Bean
@Order(200)
ProducerInterceptor secondInterceptor() {...
}

如果您沒有使用啟動器,則需要自己配置和注冊上述組件。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/90388.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/90388.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/90388.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

CSS揭秘:10.平行四邊形

前置知識&#xff1a;基本的css變形一、平行四邊形 要實現一個平行四邊形&#xff0c;可以使用CSS的skew變形屬性來傾斜元素。 transform: skewX(-45deg);圖-1顯示容器和內容都出現了傾斜&#xff0c;該如何解決這個問題&#xff1f; 二、嵌套方案 我們通過將內容嵌套 div 并使…

深度學習 必然用到的 線性代數知識

把標量到張量、點積到范數全串起來&#xff0c;幫你從 0 → 1 搭建 AI 數學底座 &#x1f680; 1 標量&#xff1a;深度學習的最小單元 標量 就是一維空間里的“點”&#xff0c;只有大小沒有方向。例如溫度 52 F、學習率 0.001。 記號&#xff1a;普通小寫 x&#xff1b;域&am…

OpenGL ES 紋理以及紋理的映射

文章目錄開啟紋理創建紋理綁定紋理生成紋理紋理坐標圖像配置線性插值重復效果限制拉伸完整代碼在 Android OpenGL ES 中使用紋理&#xff08;Texture&#xff09;可以顯著提升圖形渲染的質量和效率。以下是使用紋理的主要好處&#xff1a; 增強視覺真實感 紋理可以將復雜的圖像…

從金字塔到個性化路徑:AI 正在重新定義學習方式

幾十年來&#xff0c;我們的教育系統始終遵循著一條熟悉的路線&#xff1a; 從小學、初中、高中&#xff0c;再到大學和研究生。這條標準化的路徑&#xff08;K-12 到研究所&#xff09;結構清晰&#xff0c;卻也緩慢。但在當今這個信息爆炸、知識快速更新、個性化需求高漲的時…

產品經理崗位職責拆解

以下是產品經理崗位職責的詳細分解表&#xff0c;涵蓋工作內容、核心動作及輸出成果&#xff1a;崗位職責具體工作內容輸出成果1. 日常版本迭代管理需求分析及PRD產出協調資源推動產品上線- 收集業務/用戶需求&#xff0c;分析可行性及優先級- 撰寫PRD文檔&#xff0c;明確功能…

后端微服務基礎架構Spring Cloud

版本關系 版本發布說明-阿里云Spring Cloud Alibaba官網 選擇 創建項目 創建父項目 什么都不動&#xff0c;創建即可 1) 刪掉沒用的文件 保留 2) pom中加入 打包方式 <packaging>pom</packaging> 3) 刪掉src 4) pom.xml中刪除沒用的 5)更改pom.xml中 spring…

數據分析框架和方法

一、核心分析框架 (The Big Picture Frameworks)??描述性分析 (What Happened?)????目的&#xff1a;?? 了解過去發生了什么&#xff0c;描述現狀&#xff0c;監控業務健康。??核心工作&#xff1a;?? 匯總、聚合、計算基礎指標 (KPI)&#xff0c;生成報表和儀表盤…

電路研究9.3.10——合宙Air780EP中的AT開發指南:阿里云應用指南

這個好像也用不到&#xff0c;不過可以先貼出來。簡單看了一下也沒深入分析&#xff0c;直接扒過來了&#xff0c;感覺涉及到了上位機的學習了。我這下位機的可能用不到&#xff0c;就是貼過來好了。 應用概述 使用 AT 方式連接阿里云分為一機一密和一型一密&#xff0c;其中一…

[Backlog] 核心協調器 | 終端用戶界面(TUI)實現 | 多分支任務沖突解決 | 測試驗證體系

第8章 核心協調器 歡迎回到Backlog.md&#xff01; 在上一章文件系統操作中&#xff0c;我們深入了解了數據物理存儲層面的讀寫機制。本章將聚焦系統的神經中樞——核心協調器。 核心協調器的本質&#xff08;中央決策引擎&#xff09; 如果將Backlog.md視為項目管理團隊&a…

車載以太網-TC8測試-UT(Upper Tester)

目錄 一、技術原理:指令體系與協議適配1. **指令格式與傳輸機制**2. **協議棧交互邏輯**3. **規范遵循與版本演進**二、測試應用:TC8測試場景與案例1. **TCP協議棧深度驗證**2. **ARP協議健壯性測試**3. **SOME/IP服務動態管理**三、實現挑戰與解決方案1. **實時性要求**2. *…

扣子Coze純前端部署多Agents

純前端網頁搭建&#xff0c;無需任何后端代碼&#xff0c;方便快捷&#xff01; 就像公司前臺的多功能控制臺&#xff0c;員工可以通過按鈕快速呼叫不同的AI助手。具備多設備適配、智能對話等基礎能力。 支持添加多個智能體 配置方式 添加智能體信息&#xff0c;data-bot為智…

STM32中I2C協議詳解

前言 在嵌入式系統中&#xff0c;設備間的短距離通信協議中&#xff0c;I2C&#xff08;Inter-Integrated Circuit&#xff0c;集成電路互連&#xff09;以其信號線少、布線簡單、支持多從機等特點&#xff0c;被廣泛應用于傳感器、EEPROM、OLED屏等中低速外設的通信場景。與SP…

解鎖Spring Boot多項目共享Redis:優雅Key命名結構指南

引言Redis 基礎與 Spring Boot 集成Redis 簡介Redis&#xff0c;即 Remote Dictionary Server&#xff0c;是一個開源的基于內存的數據結構存儲系統&#xff0c;可用作數據庫、緩存和消息中間件 。它具備諸多顯著特性&#xff0c;使其在現代軟件開發中占據重要地位。Redis 的讀…

《重構項目》基于Apollo架構設計的項目重構方案(多種地圖、多階段、多任務、狀態機管理)

1. 項目結構設計project/ ├── config/ # 配置文件&#xff08;定義 Scenario、Stage、Task 的映射&#xff09; ├── src/ │ ├── base/ # 抽象基類定義 │ │ ├── scenario_base.h/.cpp │ │ ├── stage_base.h/.cpp…

動手學深度學習13.6. 目標檢測數據集-筆記練習(PyTorch)

以下內容為結合李沐老師的課程和教材補充的學習筆記&#xff0c;以及對課后練習的一些思考&#xff0c;自留回顧&#xff0c;也供同學之人交流參考。 本節課程地址&#xff1a;數據集_嗶哩嗶哩_bilibili 本節教材地址&#xff1a;13.6. 目標檢測數據集 — 動手學深度學習 2.0…

Unity3D游戲內存優化指南

前言 Unity3D 游戲的內存控制是保證游戲流暢運行&#xff08;尤其在移動端和主機平臺&#xff09;和避免崩潰的關鍵挑戰。以下是核心策略和常見問題的解決方案&#xff1a; 對惹&#xff0c;這里有一個游戲開發交流小組&#xff0c;希望大家可以點擊進來一起交流一下開發經驗…

git學習:首次創建倉庫

文章目錄前言&#xff1a;1、首次創建倉庫并上傳數據1.1 創建倉庫&#xff0c;1.2 命令上傳1.3 首次代碼上傳至倉庫的步驟&#xff1a;2、分支操作2.1 分支的刪除2.2 切換分支2.3 查看分支2.4 同步其他分支的修改3、查看電腦的配置文件4、遠程倉庫命令 git remote5、其他后語前…

C++并行計算:OpenMP與MPI全解析

在高性能計算領域&#xff0c;充分利用硬件資源的并行計算技術已成為剛需。從單節點多核到跨節點集群&#xff0c;開發者需要掌握不同的并行編程模型。本文將系統講解兩種主流并行技術&#xff1a;OpenMP&#xff08;共享內存多核并行&#xff09;與MPI&#xff08;分布式內存集…

TCP 動態選路協議全面研究:OSPF、BGP 與 IS-IS 的比較與應用分析

一、引言&#xff1a;動態選路協議概述 在現代計算機網絡中&#xff0c;路由選擇是數據傳輸的核心功能&#xff0c;它決定了數據包從源到目的地的路徑選擇。隨著網絡規模的不斷擴大和復雜性的增加&#xff0c;靜態路由已經無法滿足網絡動態變化的需求&#xff0c;動態路由協議…

OpenCV 圖像哈希類cv::img_hash::AverageHash

操作系統&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 編程語言&#xff1a;C11 算法描述 cv::img_hash::AverageHash是OpenCV中用于圖像哈希&#xff08;Image Hashing&#xff09;的一個類&#xff0c;屬于opencv_img_hash模塊。它實現了…