Using Spring for Apache Pulsar:Transactions

本節介紹Spring for Apache Pulsar如何支持事務。

Overview

Spring for Apache Pulsar事務支持是基于Spring Framework提供的事務支持構建的。在高層,事務資源向事務管理器注冊,事務管理器反過來處理注冊資源的事務狀態(提交、回滾等)。

Apache Pulsar的Spring提供了以下功能:

PulsaTransactionManager-用于正常的Spring事務支持(@transactional,transactionTemplate等)

交易脈沖星模板

交易@pulsaListener

與其他事務管理器的事務同步

事務支持尚未添加到響應式組件中

默認情況下,事務支持已禁用。要在使用Spring Boot時啟用支持,只需設置Spring.pulsar.transaction.enabled屬性。下面每個組件部分都概述了進一步的配置選項。

Transactional Publishing with PulsarTemplate

事務性PulsarTemplate上的所有發送操作都會查找活動事務,并在事務中登記每個發送操作(如果找到)。

Non-transactional use

默認情況下,事務性PulsarTemplate也可用于非事務性操作。當未找到現有事務時,它將以非事務方式繼續發送操作。但是,如果模板配置為需要事務,則任何在事務范圍之外使用模板的嘗試都會導致異常。

事務可以由TransactionTemplate、@Transactional方法、調用executeInTransaction或事務偵聽器容器啟動。

Local Transactions

我們使用術語“本地”事務來表示不受Spring事務管理工具(即PulsarTransactionManager)管理或與之關聯的Pulsar本地事務。相反,“同步”事務是由PulsarTransactionManager管理或與之關聯的事務。

您可以使用PulsarTemplate在本地事務中執行一系列操作。以下示例顯示了如何執行此操作:

var results = pulsarTemplate.executeInTransaction((template) -> {var rv = new HashMap<String, MessageId>();rv.put("msg1", template.send(topic, "msg1"));rv.put("msg2", template.send(topic, "msg2"));return rv;
});

回調中的參數是調用executeInTransaction方法的模板實例。模板上的所有操作都登記在當前事務中。如果回調正常退出,則事務被提交。如果拋出異常,則事務將回滾。

若有一個同步的事務正在處理中,它將被忽略,并使用一個新的“嵌套”事務。

Configuration

以下交易設置可直接在PulsarTemplate上使用(通過交易字段):

enabled-模板是否支持事務(默認為false)

required-模板是否需要交易(默認為false)

timeout-事務超時的持續時間(默認為空)

不使用Spring Boot時,您可以在提供的模板上調整這些設置。但是,使用Spring Boot時,模板是自動配置的,沒有影響屬性的機制。在這種情況下,您可以注冊一個可用于調整設置的PulsarTemplateCustomizer bean。以下示例顯示了如何在自動配置的模板上設置超時:

@Bean
PulsarTemplateCustomizer<?> templateCustomizer() {return (template) -> template.transactions().setTimeout(Duration.ofSeconds(45));
}

Transactional Receiving with @PulsarListener

當啟用偵聽器事務時,在同步事務的范圍內調用@PulsarListener注釋的偵聽器方法。

DefaultPulsarMessageListenerContainer使用配置了PulsarTransactionManager的Spring TransactionTemplate在方法調用之前啟動事務。

每個接收到的消息的確認都登記在作用域事務中。

Consume-Process-Produce Scenario

一種常見的事務模式是,消費者從Pulsar主題讀取消息,轉換消息,最后生產者將生成的消息寫入另一個Pulsar主題。當啟用事務并且您的偵聽器方法使用事務性PulsarTemplate來生成轉換后的消息時,該框架支持此用例。

給定以下偵聽器方法:

@PulsarListener(topics = "my-input-topic")
void listen(String msg) {var transformedMsg = msg.toUpperCase(Locale.ROOT);this.transactionalTemplate.send("my-output-topic", transformedMsg);
}

啟用偵聽器事務時會發生以下交互:

偵聽器容器啟動新事務并在事務范圍內調用偵聽器方法
偵聽器方法接收消息
偵聽器方法轉換消息
監聽器方法使用事務模板發送轉換后的消息,該模板在活動事務中注冊發送操作
偵聽器容器自動確認消息,并在活動事務中注冊確認操作
偵聽器容器(通過TransactionTemplate)提交事務

如果您沒有使用@PulsarListener,而是直接使用監聽器容器,則會提供與上述相同的事務支持。記住,@PulsarListener只是為了方便將Java方法注冊為偵聽器容器消息偵聽器。

Transactions with Record Listeners

上面的例子使用了一個記錄監聽器。使用記錄偵聽器時,每次偵聽器方法調用時都會創建一個新事務,相當于每條消息一個事務。

由于事務邊界是針對每條消息的,并且每條消息的確認都登記在每個事務中,因此批處理確認模式不能用于事務記錄偵聽器。

Transactions with Batch Listeners

使用批偵聽器時,每次偵聽器方法調用時都會創建一個新事務,相當于每批消息創建一個事務。

事務性批處理偵聽器當前不支持自定義錯誤處理程序。

Configuration

Listener container factory

以下事務設置可以直接在ConcurrentPulsarListenerContainerFactory在創建偵聽器容器時使用的PulsarContainerProperties上使用。這些設置會影響所有偵聽器容器,包括@PulsarListener使用的容器。

enabled-容器是否支持事務(默認為false)

required-容器是否需要事務(默認為false)

timeout-事務超時的持續時間(默認為空)

transactionDefinition-一個藍圖事務定義,其屬性將被復制到容器的事務模板中(默認為null)

transactionManager-用于啟動事務的事務管理器

不使用Spring Boot時,您可以在提供的容器出廠設置中調整這些設置。但是,使用Spring Boot時,容器工廠是自動配置的。在這種情況下,您可以注冊一個org.springframework.boot.pulser.autofigure。PulsarContainerFactory定制器<并發PulsarListenerContainerFactory<?>>bean訪問和自定義容器屬性。以下示例顯示了如何在容器工廠設置超時:

@Bean
PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>> containerCustomizer() {return (containerFactory) -> containerFactory.getContainerProperties().transactions().setTimeout(Duration.ofSeconds(45));
}
@PulsarListener

默認情況下,每個偵聽器都尊重其相應偵聽器容器工廠的事務設置。但是,用戶可以在每個@PulsarListener上設置事務屬性,以覆蓋容器工廠設置,如下所示:

如果容器工廠啟用了事務,那么transaction=false將禁用單個偵聽器的事務。

如果容器工廠啟用了事務并且是必需的,那么嘗試設置transaction=false將導致拋出一個異常,說明事務是必需的。

如果容器工廠已禁用事務,則將忽略設置transaction=true的嘗試,并記錄警告。

Using PulsarTransactionManager

PulsarTransactionManager是Spring框架的PlatformTransactionManager的實現。您可以將PulsarTransactionManager與正常的Spring事務支持(@Transactional、TransactionTemplate等)一起使用。

如果事務處于活動狀態,則在事務范圍內執行的任何PulsarTemplate操作都會登記并參與正在進行的事務。經理提交或回滾事務,取決于成功或失敗。

您可能不需要直接使用PulsarTransactionManager,因為大多數事務用例都包含在PulsarTemplate和@PulsarListener中。

Pulsar Transactions with Other Transaction Managers

Producer-only transaction

如果你想將記錄發送到Pulsar并在單個事務中執行一些數據庫更新,你可以使用DataSourceTransactionManager進行正常的Spring事務管理。

以下示例假設有一個名為“DataSourceTransactionManager”的DataSourceTransactionManager bean注冊

@Transactional("dataSourceTransactionManager")
public void myServiceMethod() {var msg = calculateMessage();this.pulsarTemplate.send("my-topic", msg);this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(msg));
}

@Transactional注釋的攔截器啟動數據庫事務,PulsarTemplate將與DB事務管理器同步事務;每次發送都將參與該交易。當該方法退出時,數據庫事務將提交,然后是Pulsar事務。

如果您希望首先提交Pulsar事務,并且僅在Pulsar事務成功時提交DB事務,請使用嵌套的@Transactional方法,其中外部方法配置為使用DataSourceTransactionManager,內部方法配置為用PulsarTransactionManager。

@Transactional("dataSourceTransactionManager")
public void myServiceMethod() {var msg = calculateMessage();this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(msg));this.sendToPulsar(msg);
}@Transactional("pulsarTransactionManager")
public void sendToPulsar(String msg) {this.pulsarTemplate.send("my-topic", msg);
}

Consumer + Producer transaction

如果你想使用Pulsar的記錄,將記錄發送到Pulsar,并在事務中執行一些數據庫更新,你可以將正常的Spring事務管理(使用DataSourceTransactionManager)與容器發起的事務相結合。

在以下示例中,偵聽器容器啟動Pulsar事務,@Transactional注釋啟動DB事務。DB事務首先提交;如果Pulsar事務未能提交,記錄將被重新傳遞,因此DB更新應該是冪等的。

@PulsarListener(topics = "my-input-topic")
@Transactional("dataSourceTransactionManager")
void listen(String msg) {var transformedMsg = msg.toUpperCase(Locale.ROOT);this.pulsarTemplate.send("my-output-topic", transformedMsg);this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(transformedMsg));
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/913912.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/913912.shtml
英文地址,請注明出處:http://en.pswp.cn/news/913912.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

在Ubuntu上從零開始編譯并運行Home Assistant源碼并集成HACS與小米開源的Ha Xiaomi Home

目錄1. 前言&&疊甲2. 使用的環境3. 相關鏈接4. 前期步驟4.1 安裝路徑提前說明4.2 Ubuntu 相關依賴安裝4.3 Python源碼編譯安裝4.3.1 編譯安裝4.3.2 換源4.3.3 環境變量5. 構建Home Assistant源碼5.1 clone源碼5.2 創建虛擬Python環境5.3 安裝項目依賴5.4 安裝項目5.5 運…

【實習篇】之Http頭部字段之Disposition介紹

Http頭部字段之DispositionDisposition頭部字段介紹RFC規范介紹RFC 6266與RFC 2047實習的時候公司將一個某個關于下載的Bug交給了我來修&#xff0c;看了代碼和日志后發現是Disposition字段的規范兼容性惹的鍋&#xff0c;因為有些協議使用的是老協議&#xff0c;我們的項目沒有…

VM文件管理與Vi/vim操作

[rootlocalhost /]# sudo mkdir /opt [rootlocalhost /]# sudo mkdir /opt/tmp [rootlocalhost /]# sudo touch /opt/tmp/a.txt [rootlocalhost /]# ls /opt/tmp/ a.txt [rootlocalhost /]# 3.步驟1&#xff1a;創建文件并插入日期時間vi /tmp/newfile在vi編輯器中輸入以下命令…

【Android】安卓四大組件之內容提供者(ContentProvider):從基礎到進階

你手機里的通訊錄&#xff0c;存儲了所有聯系人的信息。如果你想把這些聯系人信息分享給其他App&#xff0c;就可以通過ContentProvider來實現。。 一、什么是 ContentProvider ?ContentProvider? 是 Android 四大組件之一&#xff0c;負責實現?跨應用程序的數據共享與訪問…

Vue-19-前端框架Vue之應用基礎組件通信(二)

文章目錄 1 v-model(父子相傳)1.1 App.vue1.2 Father.vue1.2.1 v-model用在html標簽上1.2.2 v-model用在html標簽上(本質寫法)1.2.3 v-model用在組件標簽上1.2.4 v-model用在組件標簽上(本質寫法)1.3 MyInput(自定義的組件)1.4 修改modelValue1.4.1 Father.vue1.4.2 MyInput.vu…

寶塔下載pgsql適配spring ai

1.寶塔安裝pgvector 1.先去github下載pgvectorpgvector/pgvector: Open-source vector similarity search for Postgres 2.把壓縮包上傳到系統文件的/temp下解壓&#xff0c;重命名文件名為pgvector&#xff0c;之后命令操作 cd /tmp cd pgvector export PG_CONFIG/www/serv…

RK3568項目(八)--linux驅動開發之基礎外設(上)

目錄 一、引言 二、準備工作 ------>2.1、驅動加載/卸載命令 三、字符設備驅動開發 ------>3.1、驅動模塊的加載和卸載 ------>3.2、外部模塊編譯模板 Makefile ------>3.3、cdev 四、LED驅動 ------>4.1、原理圖 ------>4.2、驅動 五、設備樹 -…

BUUCTF在線評測-練習場-WebCTF習題[GXYCTF2019]BabySQli1-flag獲取、解析

解題思路打開靶場&#xff0c;題目提示是sql注入輸入數據&#xff0c;判斷下閉合11123報錯&#xff1a;Error: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 1 at line 1報錯提示…

“AI 曼哈頓計劃”:科技競賽還是人類挑戰?

美國國會下屬的經濟與安全審查委員會已將“推動建立并資助一項堪比曼哈頓計劃的通用人工智能研發項目”列為其對國會的核心建議之一&#xff0c;明確顯示出對AI競賽戰略意義的高度重視。與此同時&#xff0c;美國能源部在近幾個月中多次公開將人工智能的突破比作“下一場曼哈頓…

音頻信號的預加重:提升語音清晰度

一、預加重介紹預加重是一種信號處理技術&#xff0c;主要用于增強音頻信號中的高頻成分。由于人類語音的頻譜特性&#xff0c;尤其是在輔音和音調的表達上&#xff0c;高頻成分對于語音的清晰度至關重要。然而&#xff0c;在錄音和傳輸過程中&#xff0c;這些高頻成分往往會受…

WebSocket實戰:實現實時聊天應用 - 雙向通信技術詳解

目錄一、WebSocket&#xff1a;實時通信的"高速公路"1.1 HTTP的短板&#xff1a;永遠的"單相思"1.2 WebSocket的優勢&#xff1a;真正的"雙向對話"二、30分鐘搭建聊天服務器2.1 環境準備2.2 WebSocket配置類2.3 核心消息處理器三、前端實現&…

宏集案例 | 基于CODESYS的自動化控制系統,開放架構 × 高度集成 × 遠程運維

??案例概況客戶&#xff1a;MACS Sterilisationsanlagen GmbH&#xff08;Ermafa Environmental Technologies GmbH 旗下&#xff09; 應用場景&#xff1a;醫療與感染性廢棄物的無害化處理控制系統應用產品&#xff1a;宏集Berghof高性能控制器設備&#xff08;一&#xff0…

學習JNI 二

創建一個名為Learn1項目&#xff08;Android Studio&#xff09;。一、項目結構二、配置 build.gradlebuild.gradle.kts(:app)plugins {alias(libs.plugins.android.application)alias(libs.plugins.jetbrains.kotlin.android) }android {namespace "com.demo.learn1&quo…

基于Spring Boot+Vue的DIY手工社預約管理系統(Echarts圖形化、騰訊地圖API)

2.10 視頻課程管理功能實現2.11手工互動&#xff08;視頻彈幕&#xff09;2.8預約設置管理功能實現&#x1f388;系統亮點&#xff1a;Echarts圖形化、騰訊地圖API&#xff1b;文檔包含功能結構圖、系統架構圖、用例圖、實體屬性圖、E-R圖。一.系統開發工具與環境搭建1.系統設計…

leetcode 每日一題 1353. 最多可以參加的會議數目

更多技術訪問 我的個人網站 &#xff08;免費服務器&#xff0c;沒有80/443端口&#xff09; 1353. 最多可以參加的會議數目 給你一個數組 events&#xff0c;其中 events[i] [startDayi, endDayi] &#xff0c;表示會議 i 開始于 startDayi &#xff0c;結束于 endDayi 。 …

AI+智慧園區 | 事件處置自動化——大模型重構園區治理邏輯

在智慧園區的建設浪潮中&#xff0c;事件管理一直是園區高效運營的關鍵環節。考拉悠然所推出的大模型 智慧園區解決方案&#xff0c;在事件智能閉環管理方面獨樹一幟&#xff0c;為園區的日常運營編織了一張嚴密、高效、智能的管理網絡&#xff0c;實現了從事件感知到處置的全…

FFmpeg Windows安裝

FFmpeg 用于音頻文件轉換 Builds - CODEX FFMPEG gyan.dev ffmpeg-release-full.7z 下載完成之后 zip解壓 大概就是 ffmpeg/ └── bin/ └── ffmpeg.exe 配置環境變量 ffmpeg -version 有可能idea還是找不到命令 就把命令路徑寫在程序里 例如

【2025/07/10】GitHub 今日熱門項目

GitHub 今日熱門項目 &#x1f680; 每日精選優質開源項目 | 發現優質開源項目&#xff0c;跟上技術發展趨勢 &#x1f4cb; 報告概覽 &#x1f4ca; 統計項&#x1f4c8; 數值&#x1f4dd; 說明&#x1f4c5; 報告日期2025-07-10 (周四)GitHub Trending 每日快照&#x1f55…

JVM 基礎 - JVM 內存結構

前言 本文主要對JVM 內存結構進行講解&#xff0c;注意不要和Java內存模型混淆了。 運行時數據區 內存是非常重要的系統資源&#xff0c;是硬盤和 CPU 的中間倉庫及橋梁&#xff0c;承載著操作系統和應用程序的實時運行。JVM 內存布局規定了 Java 在運行過程中內存申請、分配…

【案例】二手車交易價格預測-472

二手車交易價格預測 數據來源數據特征探索構建模型參考數據來源 天池 https://tianchi.aliyun.com/competition/entrance/231784/information 數據特征探索 目標特征工程做好之后,能同時進行 lightgbm catboost 神經網絡等模型,所以盡量都轉換為數值類特征。 如果僅僅是使用…