Java 17 下 Spring Boot 與 Pulsar 隊列集成實戰:生產者與消費者實現指南

Pulsar隊列與Springboot集成有2種模式:官方pulsar-client 或社區Starter(如pulsar-spring-boot-starter)

  • 如果考慮最新、最快、最齊全的功能,使用官方pulsar-client
  • 如果考慮快速低成本接入,使用社區Starter(如pulsar-spring-boot-starter)

環境依賴:

  • SpringBoot 3.3.12

  • Java 17

  • 官方pulsar-client

    • 引入依賴
    • 配置Pulsar連接
    • 創建生產者
    • 創建消費者
  • 社區Starter

    • 引入依賴
    • 發送消息
    • 接收消息

在這里插入圖片描述
在這里插入圖片描述

官方pulsar-client

官方 pulsar-client 提供了最全面的 Pulsar 功能,適合對功能完整性有較高要求的項目。下面我們一步步實現生產者和消費者的功能。

引入依賴

首先,需要在項目中引入 pulsar-client 的依賴,這能幫助我們在 Spring Boot 項目里使用 Pulsar 客戶端功能。

<dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client</artifactId><version>3.3.1</version>
</dependency>

配置Pulsar連接

引入依賴后,我們需要對 Pulsar 進行連接配置,指定 Pulsar 服務的地址。可以在配置文件里添加相關配置,同時創建一個配置類來初始化 Pulsar 客戶端。

spring:pulsar:service-url: pulsar://127.0.0.1:6650
@Configuration
public class PulsarConfig {@Value("${spring.pulsar.client.service-url}")private String serviceUrl;@Beanpublic PulsarClient pulsarClient() throws PulsarClientException {ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(serviceUrl).operationTimeout(30, java.util.concurrent.TimeUnit.SECONDS).connectionTimeout(10, java.util.concurrent.TimeUnit.SECONDS);// 可以添加認證等其他配置// clientBuilder.authentication(AuthenticationFactory.token("your-token"));return clientBuilder.build();}
}

創建生產者

完成連接配置后,就可以創建 Pulsar 生產者來發送消息了。下面的代碼實現了同步和異步發送消息的功能。

@Service
public class PulsarMessageProducer {private static final String TOPIC = "persistent://public/default/messages";@Autowiredprivate PulsarClient pulsarClient;public void sendMessage(String content) throws PulsarClientException {// 創建生產者Producer<Message> producer = pulsarClient.newProducer(Schema.JSON(Message.class)).topic(TOPIC).producerName("message-producer").create();// 創建消息對象Message message = new Message(UUID.randomUUID().toString(),content,LocalDateTime.now());// 發送消息(同步)MessageId messageId = producer.send(message);System.out.println("Message sent successfully. Message ID: " + messageId);// 關閉生產者producer.close();}public CompletableFuture<MessageId> sendMessageAsync(String content) throws PulsarClientException {// 創建生產者Producer<Message> producer = pulsarClient.newProducer(Schema.JSON(Message.class)).topic(TOPIC).producerName("async-message-producer").create();// 創建消息對象Message message = new Message(UUID.randomUUID().toString(),content,LocalDateTime.now());// 異步發送消息CompletableFuture<MessageId> future = producer.sendAsync(message);future.thenAccept(messageId -> {System.out.println("Async message sent successfully. Message ID: " + messageId);try {producer.close();} catch (PulsarClientException e) {e.printStackTrace();}}).exceptionally(throwable -> {System.err.println("Failed to send message: " + throwable.getMessage());try {producer.close();} catch (PulsarClientException e) {e.printStackTrace();}return null;});return future;}
}

創建消費者

創建完生產者后,還需要創建消費者來接收消息。下面的代碼展示了如何啟動一個消費者并異步接收消息。

@Service
public class PulsarMessageConsumer implements CommandLineRunner {private static final String TOPIC = "persistent://public/default/messages";private static final String SUBSCRIPTION = "message-subscription";@Autowiredprivate PulsarClient pulsarClient;@Overridepublic void run(String... args) throws Exception {// 啟動消費者startConsumer();}public void startConsumer() throws PulsarClientException {// 創建消費者Consumer<Message> consumer = pulsarClient.newConsumer(Schema.JSON(Message.class)).topic(TOPIC).subscriptionName(SUBSCRIPTION).subscriptionType(SubscriptionType.Shared).subscribe();// 異步消費消息new Thread(() -> {while (true) {try {// 等待接收消息,超時時間為10秒Message<Message> msg = consumer.receive(10, TimeUnit.SECONDS);if (msg != null) {try {// 處理消息Message message = msg.getValue();System.out.println("Received message: " + message);// 確認消息已消費consumer.acknowledge(msg);} catch (Exception e) {// 處理消息失敗,重新放回隊列consumer.negativeAcknowledge(msg);}}} catch (PulsarClientException e) {if (e.getCause() instanceof java.util.concurrent.TimeoutException) {// 超時異常,繼續等待System.out.println("No message received within timeout period, waiting again...");} else {e.printStackTrace();}}}}).start();}
}

社區Starter

社區提供的 pulsar-spring-boot-starter 簡化了 Pulsar 與 Spring Boot 的集成過程,適合需要快速接入的項目。下面我們來看看如何使用它。

引入依賴

首先,在配置文件中添加 Pulsar 服務的配置信息,這能幫助我們連接到 Pulsar 服務。

# Pulsar 服務
spring:pulsar:client:serviceUrl: pulsar://127.0.0.1:6650

發送消息

完成配置后,就可以使用 PulsarTemplate 來發送消息了。下面的代碼實現了同步和異步發送消息的功能。

@Service
public class MyProducer {private final PulsarTemplate<String> pulsarTemplate;public MyProducer(PulsarTemplate<String> pulsarTemplate) {this.pulsarTemplate = pulsarTemplate;}public void sendMessage(String message) {
// 由于 convertAndSend(String, String) 方法未定義,可能需要使用正確的方法
// 假設使用 send 方法來替代,具體根據 PulsarTemplate 的實際方法決定pulsarTemplate.send("my-topic", message);System.out.println("Sent: " + message);}public CompletableFuture<MessageId> sendMessageAsync(String message) {return pulsarTemplate.sendAsync("my-topic", message);}
}

接收消息

發送消息后,還需要創建消費者來接收消息。使用 @PulsarListener 注解可以方便地監聽消息。下面的代碼展示了如何接收消息。

@Service
public class MyConsumer {@PulsarListener(topics = "my-topic")public void receive(Message<String> message) {System.out.println("Received in Spring Boot: " + message.getValue());}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/86837.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/86837.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/86837.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

《Go語言高級編程》RPC 入門

《Go語言高級編程》RPC 入門 一、什么是 RPC&#xff1f; RPC&#xff08;Remote Procedure Call&#xff0c;遠程過程調用&#xff09;是分布式系統中不同節點間的通信方式&#xff0c;允許程序像調用本地函數一樣調用遠程服務的方法。 Go 語言的標準庫 net/rpc 提供了基礎的…

第N5周:Pytorch文本分類入門

&#x1f368; 本文為&#x1f517;365天深度學習訓練營中的學習記錄博客 &#x1f356; 原作者&#xff1a;K同學啊 一、前期準備 1.加載數據 import torch import torch.nn as nn import torchvision from torchvision import transforms,datasets import os,PIL,p…

uniappx 安卓app項目本地打包運行,騰訊地圖報錯:‘鑒權失敗,請檢查你的key‘

根目錄下添加 AndroidManifest.xml 文件&#xff0c; <application><meta-data android:name"TencentMapSDK" android:value"騰訊地圖申請的key" /> </application> manifest.json 文件中添加&#xff1a; "app": {"…

【向上教育】結構化面試開口秘籍.pdf

向 上 教 育 XI A N G S H A N G E D U C A T I O N 結構化 面試 開口秘笈 目 錄 第一章 自我認知類 ........................................................................................................................... 2 第二章 工作關系處理類 .......…

Webpack 熱更新(HMR)原理詳解

&#x1f525; Webpack 熱更新&#xff08;HMR&#xff09;原理詳解 &#x1f4cc; 本文適用于 Vue、React 等使用 Webpack 的項目開發者&#xff0c;適配 Vue CLI / 自定義 Webpack 項目。 &#x1f3af; 一、什么是 HMR&#xff1f; Hot Module Replacement 是 Webpack 提供的…

MySQL索引完全指南

一、索引是什么&#xff1f;為什么這么重要&#xff1f; 索引就像字典的目錄 想象一下&#xff0c;你要在一本1000頁的字典里找"程序員"這個詞&#xff0c;你會怎么做&#xff1f; 沒有目錄&#xff1a;從第1頁開始一頁一頁翻&#xff0c;可能要翻500頁才能找到有…

學習使用dotnet-dump工具分析.net內存轉儲文件(2)

運行ShenNiusModularity項目&#xff0c;使用createdump工具dump完整的進程內存映射文件&#xff0c;然后運行dotnet-dump analyze命令加載dump文件。 ??可以先使用dumpheap命令顯示有關垃圾回收堆的信息和有關對象的收集統計信息。dumpheap支持多類參數&#xff08;如下所示…

Oracle BIEE 交互示例(一)同一分析內

Oracle BIEE 交互示例(一)同一分析內 1 示例背景2 實踐目標3 實操步驟3.1 創建數據集3.1.1 TEST_TABLE3.1.2 保存名字為【01 TEST_TABLE】3.2 創建分析3.2.1 創建列3.2.2 創建視圖3.2.2.1 數據透視表3.2.2.2 圖形3.2.2.3 表3.3 設置交互4 結果示例1 示例背景 版本:OBIEE 12…

使用API有效率地管理Dynadot域名,出售賬戶中的域名

關于Dynadot Dynadot是通過ICANN認證的域名注冊商&#xff0c;自2002年成立以來&#xff0c;服務于全球108個國家和地區的客戶&#xff0c;為數以萬計的客戶提供簡潔&#xff0c;優惠&#xff0c;安全的域名注冊以及管理服務。 Dynadot平臺操作教程索引&#xff08;包括域名郵…

Vite 打包原理詳解 + Webpack 對比

&#x1f680; Vite 打包原理詳解 Webpack 對比 &#x1f44b; 本文適合&#xff1a;Vite 使用者、Vue/React 工程師、希望搞清楚打包流程及與 Webpack 區別的開發者 &#x1f310; 技術背景&#xff1a;Vite 采用 ES Modules 原生瀏覽器能力驅動開發體驗&#xff0c;Webpack…

區塊鏈RWA(Real World Assets)系統開發全棧技術架構與落地實踐指南

一、技術架構設計&#xff1a;分層架構與模塊協同 1. 核心區塊鏈層 區塊鏈選型策略&#xff1a; 公鏈&#xff1a;以太坊主網&#xff08;安全性高&#xff0c;DeFi生態完備&#xff09; Polygon CDK&#xff08;Layer2定制化合規鏈&#xff0c;Gas費低至$0.003&#xff09;…

GBDT:梯度提升決策樹——集成學習中的預測利器

核心定位&#xff1a;一種通過串行集成弱學習器&#xff08;決策樹&#xff09;、以梯度下降方式逐步逼近目標函數的機器學習算法&#xff0c;在結構化數據預測任務中表現出色。 本文由「大千AI助手」原創發布&#xff0c;專注用真話講AI&#xff0c;回歸技術本質。拒絕神話或妖…

Redis持久化機制深度解析:RDB與AOF全面指南

摘要 本文深入剖析Redis的持久化機制&#xff0c;全面講解RDB和AOF兩種持久化方式的原理、配置與應用場景。通過詳細的操作步驟和原理分析&#xff0c;您將掌握如何配置Redis持久化策略&#xff0c;確保數據安全性與性能平衡。文章包含思維導圖概覽、命令實操演示、核心原理圖…

CentOS7升級openssh10.0p2和openssl3.5.0詳細操作步驟

背景 近期漏洞掃描時&#xff0c;發現有很多關于openssh的相關高危漏洞&#xff0c;因此需要升級openssh的版本 升級步驟 由于openssh和openssl的版本是需要相匹配的&#xff0c;這次計劃將openssh升級至10.0p2版本&#xff0c;將openssl升級至3.5.0版本&#xff0c;都是目前…

fishbot隨身系統安裝nvidia顯卡驅動

小魚的fishbot是已經配置好的ubuntu22.04,我聽說在預先配置系統時需要勾選安裝第三方圖形化軟件&#xff0c;不然直接安裝會有進不去圖形化界面的風險&#xff0c;若沒有勾選&#xff0c;建議使用其他安裝方法&#xff0c;比如禁用系統自帶的驅動那套安裝流程 1.打開設置->關…

學習昇騰開發的第十天--ffmpeg推拉流

1、FFmpeg推流 注意&#xff1a;在推流之前先運行rtsp-simple-server&#xff08;mediamtx&#xff09; ./mediamtx 1.1 UDP推流 ffmpeg -re -i input.mp4 -c copy -f rtsp rtsp://127.0.0.1:8554/stream 1.2 TCP推流 ffmpeg -re -i input.mp4 -c copy -rtsp_transport t…

成為一名月薪 2 萬的 web 安全工程師需要掌握哪些技能??

現在 web 安全工程師比較火&#xff0c;崗位比較稀缺&#xff0c;現在除了一些大公司對學歷要求嚴格&#xff0c;其余公司看中的大部分是能力。 有個親戚的兒子已經工作 2 年了……當初也是因為其他的行業要求比較高&#xff0c;所以才選擇的 web 安全方向。 資料免費分享給你…

Pytorch8實現CNN卷積神經網絡

CNN卷積神經網絡 本章提供一個對CNN卷積網絡的快速實現 全連接網絡 VS 卷積網絡 全連接神經網絡之所以不太適合圖像識別任務&#xff0c;主要有以下幾個方面的問題&#xff1a; 參數數量太多 考慮一個輸入10001000像素的圖片(一百萬像素&#xff0c;現在已經不能算大圖了)&…

平地起高樓: 環境搭建

技術選型 本小冊是采用純前端的技術棧模擬實現小程序架構的系列文章&#xff0c;所以主要以前端技術棧為主&#xff0c;但是為了模擬一個App應用的效果&#xff0c;以及小程序包下載管理流程的實現&#xff0c;我們還是需要搭建一個基礎的App應用。這里我們將選擇 Tauri2.0 來…

langgraph學習2 - MCP編程

3中通信方式&#xff1a; 目前sse用的很少 3.開發mcp框架 主流框架2個&#xff1a; MCP skd 官方 Fast Mcp V2 &#xff0c;&#xff08;V1捐給MCP 官方&#xff09; 大模型如何識別用哪個tools&#xff0c; 以及如何使用tools&#xff1a;