本節介紹Spring for Apache Pulsar如何支持事務。
Overview
Spring for Apache Pulsar事務支持是基于Spring Framework提供的事務支持構建的。在高層,事務資源向事務管理器注冊,事務管理器反過來處理注冊資源的事務狀態(提交、回滾等)。
Apache Pulsar的Spring提供了以下功能:
PulsaTransactionManager-用于正常的Spring事務支持(@transactional,transactionTemplate等)
交易脈沖星模板
交易@pulsaListener
與其他事務管理器的事務同步
事務支持尚未添加到響應式組件中
默認情況下,事務支持已禁用。要在使用Spring Boot時啟用支持,只需設置Spring.pulsar.transaction.enabled屬性。下面每個組件部分都概述了進一步的配置選項。
Transactional Publishing with PulsarTemplate
事務性PulsarTemplate上的所有發送操作都會查找活動事務,并在事務中登記每個發送操作(如果找到)。
Non-transactional use
默認情況下,事務性PulsarTemplate也可用于非事務性操作。當未找到現有事務時,它將以非事務方式繼續發送操作。但是,如果模板配置為需要事務,則任何在事務范圍之外使用模板的嘗試都會導致異常。
事務可以由TransactionTemplate、@Transactional方法、調用executeInTransaction或事務偵聽器容器啟動。
Local Transactions
我們使用術語“本地”事務來表示不受Spring事務管理工具(即PulsarTransactionManager)管理或與之關聯的Pulsar本地事務。相反,“同步”事務是由PulsarTransactionManager管理或與之關聯的事務。
您可以使用PulsarTemplate在本地事務中執行一系列操作。以下示例顯示了如何執行此操作:
var results = pulsarTemplate.executeInTransaction((template) -> {var rv = new HashMap<String, MessageId>();rv.put("msg1", template.send(topic, "msg1"));rv.put("msg2", template.send(topic, "msg2"));return rv;
});
回調中的參數是調用executeInTransaction方法的模板實例。模板上的所有操作都登記在當前事務中。如果回調正常退出,則事務被提交。如果拋出異常,則事務將回滾。
若有一個同步的事務正在處理中,它將被忽略,并使用一個新的“嵌套”事務。
Configuration
以下交易設置可直接在PulsarTemplate上使用(通過交易字段):
enabled-模板是否支持事務(默認為false)
required-模板是否需要交易(默認為false)
timeout-事務超時的持續時間(默認為空)
不使用Spring Boot時,您可以在提供的模板上調整這些設置。但是,使用Spring Boot時,模板是自動配置的,沒有影響屬性的機制。在這種情況下,您可以注冊一個可用于調整設置的PulsarTemplateCustomizer bean。以下示例顯示了如何在自動配置的模板上設置超時:
@Bean
PulsarTemplateCustomizer<?> templateCustomizer() {return (template) -> template.transactions().setTimeout(Duration.ofSeconds(45));
}
Transactional Receiving with @PulsarListener
當啟用偵聽器事務時,在同步事務的范圍內調用@PulsarListener注釋的偵聽器方法。
DefaultPulsarMessageListenerContainer使用配置了PulsarTransactionManager的Spring TransactionTemplate在方法調用之前啟動事務。
每個接收到的消息的確認都登記在作用域事務中。
Consume-Process-Produce Scenario
一種常見的事務模式是,消費者從Pulsar主題讀取消息,轉換消息,最后生產者將生成的消息寫入另一個Pulsar主題。當啟用事務并且您的偵聽器方法使用事務性PulsarTemplate來生成轉換后的消息時,該框架支持此用例。
給定以下偵聽器方法:
@PulsarListener(topics = "my-input-topic")
void listen(String msg) {var transformedMsg = msg.toUpperCase(Locale.ROOT);this.transactionalTemplate.send("my-output-topic", transformedMsg);
}
啟用偵聽器事務時會發生以下交互:
偵聽器容器啟動新事務并在事務范圍內調用偵聽器方法
偵聽器方法接收消息
偵聽器方法轉換消息
監聽器方法使用事務模板發送轉換后的消息,該模板在活動事務中注冊發送操作
偵聽器容器自動確認消息,并在活動事務中注冊確認操作
偵聽器容器(通過TransactionTemplate)提交事務
如果您沒有使用@PulsarListener,而是直接使用監聽器容器,則會提供與上述相同的事務支持。記住,@PulsarListener只是為了方便將Java方法注冊為偵聽器容器消息偵聽器。
Transactions with Record Listeners
上面的例子使用了一個記錄監聽器。使用記錄偵聽器時,每次偵聽器方法調用時都會創建一個新事務,相當于每條消息一個事務。
由于事務邊界是針對每條消息的,并且每條消息的確認都登記在每個事務中,因此批處理確認模式不能用于事務記錄偵聽器。
Transactions with Batch Listeners
使用批偵聽器時,每次偵聽器方法調用時都會創建一個新事務,相當于每批消息創建一個事務。
事務性批處理偵聽器當前不支持自定義錯誤處理程序。
Configuration
Listener container factory
以下事務設置可以直接在ConcurrentPulsarListenerContainerFactory在創建偵聽器容器時使用的PulsarContainerProperties上使用。這些設置會影響所有偵聽器容器,包括@PulsarListener使用的容器。
enabled-容器是否支持事務(默認為false)
required-容器是否需要事務(默認為false)
timeout-事務超時的持續時間(默認為空)
transactionDefinition-一個藍圖事務定義,其屬性將被復制到容器的事務模板中(默認為null)
transactionManager-用于啟動事務的事務管理器
不使用Spring Boot時,您可以在提供的容器出廠設置中調整這些設置。但是,使用Spring Boot時,容器工廠是自動配置的。在這種情況下,您可以注冊一個org.springframework.boot.pulser.autofigure。PulsarContainerFactory定制器<并發PulsarListenerContainerFactory<?>>bean訪問和自定義容器屬性。以下示例顯示了如何在容器工廠設置超時:
@Bean
PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>> containerCustomizer() {return (containerFactory) -> containerFactory.getContainerProperties().transactions().setTimeout(Duration.ofSeconds(45));
}
@PulsarListener
默認情況下,每個偵聽器都尊重其相應偵聽器容器工廠的事務設置。但是,用戶可以在每個@PulsarListener上設置事務屬性,以覆蓋容器工廠設置,如下所示:
如果容器工廠啟用了事務,那么transaction=false將禁用單個偵聽器的事務。
如果容器工廠啟用了事務并且是必需的,那么嘗試設置transaction=false將導致拋出一個異常,說明事務是必需的。
如果容器工廠已禁用事務,則將忽略設置transaction=true的嘗試,并記錄警告。
Using PulsarTransactionManager
PulsarTransactionManager是Spring框架的PlatformTransactionManager的實現。您可以將PulsarTransactionManager與正常的Spring事務支持(@Transactional、TransactionTemplate等)一起使用。
如果事務處于活動狀態,則在事務范圍內執行的任何PulsarTemplate操作都會登記并參與正在進行的事務。經理提交或回滾事務,取決于成功或失敗。
您可能不需要直接使用PulsarTransactionManager,因為大多數事務用例都包含在PulsarTemplate和@PulsarListener中。
Pulsar Transactions with Other Transaction Managers
Producer-only transaction
如果你想將記錄發送到Pulsar并在單個事務中執行一些數據庫更新,你可以使用DataSourceTransactionManager進行正常的Spring事務管理。
以下示例假設有一個名為“DataSourceTransactionManager”的DataSourceTransactionManager bean注冊
@Transactional("dataSourceTransactionManager")
public void myServiceMethod() {var msg = calculateMessage();this.pulsarTemplate.send("my-topic", msg);this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(msg));
}
@Transactional注釋的攔截器啟動數據庫事務,PulsarTemplate將與DB事務管理器同步事務;每次發送都將參與該交易。當該方法退出時,數據庫事務將提交,然后是Pulsar事務。
如果您希望首先提交Pulsar事務,并且僅在Pulsar事務成功時提交DB事務,請使用嵌套的@Transactional方法,其中外部方法配置為使用DataSourceTransactionManager,內部方法配置為用PulsarTransactionManager。
@Transactional("dataSourceTransactionManager")
public void myServiceMethod() {var msg = calculateMessage();this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(msg));this.sendToPulsar(msg);
}@Transactional("pulsarTransactionManager")
public void sendToPulsar(String msg) {this.pulsarTemplate.send("my-topic", msg);
}
Consumer + Producer transaction
如果你想使用Pulsar的記錄,將記錄發送到Pulsar,并在事務中執行一些數據庫更新,你可以將正常的Spring事務管理(使用DataSourceTransactionManager)與容器發起的事務相結合。
在以下示例中,偵聽器容器啟動Pulsar事務,@Transactional注釋啟動DB事務。DB事務首先提交;如果Pulsar事務未能提交,記錄將被重新傳遞,因此DB更新應該是冪等的。
@PulsarListener(topics = "my-input-topic")
@Transactional("dataSourceTransactionManager")
void listen(String msg) {var transformedMsg = msg.toUpperCase(Locale.ROOT);this.pulsarTemplate.send("my-output-topic", transformedMsg);this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(transformedMsg));
}