好消息:Spring for Apache Pulsar這兩天剛剛升到2.0.0版本
1. ReactivePulsarTemplate
在Pulsar生產者端,Spring Boot自動配置提供了一個ReactivePulsarTemplate用于發布記錄。該模板實現了一個名為ReactivePulse Operations的接口,并提供了通過其合約發布記錄的方法。
該模板提供了send方法,可以接受單個消息并返回Mono<MessageId>。它還提供了send方法,可以接受多條消息(以ReactiveStreams Publisher類型的形式)并返回Flux<MessageId>。
對于不包含主題參數的API變體,將使用主題解析過程來確定目標主題。
1.1. Fluent API
該模板提供了一個流暢的構建器來處理更復雜的發送請求。
1.2. Message customization
您可以指定MessageSpecBuilderCustomizer來配置傳出消息。例如,以下代碼顯示了如何發送鍵控消息:
template.newMessage(msg).withMessageCustomizer((mc) -> mc.key("foo-msg-key")).send();
1.3. Sender customization
您可以指定一個ReactiveMessageSenderBuilderCustomizer來配置底層Pulsar發送器生成器,該生成器最終構建用于發送傳出消息的發送器。
請謹慎使用,因為這可以完全訪問發送方構建器,調用其某些方法(如create)可能會產生意想不到的副作用。
例如,以下代碼顯示了如何禁用批處理和啟用分塊:
template.newMessage(msg).withSenderCustomizer((sc) -> sc.enableChunking(true).enableBatching(false)).send();
另一個示例顯示了如何在將記錄發布到分區主題時使用自定義路由。在發送方構建器上指定自定義MessageRouter實現,例如:
template.newMessage(msg).withSenderCustomizer((sc) -> sc.messageRouter(messageRouter)).send();
請注意,使用MessageRouter時,spring.pulsar.producter.message-routing-mode的唯一有效設置是自定義。
2. Specifying Schema Information
如果您使用Java基元類型,框架會自動為您檢測模式,您不需要指定任何模式類型來發布數據。對于非基元類型,如果在ReactivePulsarTemplate上調用send操作時沒有明確指定Schema,則Spring For Apache Pulsar框架將嘗試構建Schema。JSON類型。
目前支持的復雜模式類型有JSON、AVRO、PROTOBUF、AUTO_PRODUCE_BYTES和帶內聯編碼的KEY_VALUE。
2.1. Custom Schema Mapping
作為在ReactivePulse Template上為復雜類型調用發送操作時指定模式的替代方法,可以使用類型的映射配置模式解析器。這消除了在框架使用傳出消息類型咨詢解析器時指定模式的需要。
2.1.1. Configuration properties
模式映射可以使用spring.pulsar.defaults.type-mappings屬性進行配置。以下示例使用application.yml分別使用AVRO和JSON模式為User和Address復雜對象添加映射:
spring:pulsar:defaults:type-mappings:- message-type: com.acme.Userschema-info:schema-type: AVRO- message-type: com.acme.Addressschema-info:schema-type: JSON
消息類型是消息類的完全限定名。
2.1.2. Schema resolver customizer
添加映射的首選方法是通過上述屬性。但是,如果需要更多的控制,您可以提供一個模式解析器定制器來添加映射。
以下示例使用模式解析器定制器分別使用AVRO和JSON模式為User和Address復雜對象添加映射:
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {return (schemaResolver) -> {schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));}
}
2.1.3. Type mapping annotation
指定用于特定消息類型的默認模式信息的另一種選擇是用@PulsarMessage注釋標記消息類。可以通過注釋上的schemaType屬性指定架構信息。
以下示例將系統配置為在生成或使用Foo類型的消息時使用JSON作為默認模式:
@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}
有了這個配置,就不需要在發送操作上設置或指定模式。
2.2. Producing with AUTO_SCHEMA
如果沒有機會提前知道Pulsar主題的模式類型,您可以使用AUTO_PRODUCE模式將原始JSON或Avro有效載荷安全地發布為byte[]。
在這種情況下,生產者會驗證出站字節是否與目標主題的模式兼容。
只需指定schema的模式。模板上的AUTO_PRODUCE_BYTES()發送操作如下例所示:
void sendUserAsBytes(ReactivePulsarTemplate<byte[]> template, byte[] userAsBytes) {template.send("user-topic", userAsBytes, Schema.AUTO_PRODUCE_BYTES()).subscribe();
}
這僅支持Avro和JSON模式類型。
3. ReactivePulsarSenderFactory
ReactivePulsarTemplate依賴于ReactivePulse SenderFactory來實際創建底層發送方。
Spring Boot提供了這個發送器工廠,可以配置任何Spring.pulser.producer.*應用程序屬性。
如果直接使用發送方工廠API時未指定主題信息,則使用ReactivePulse Template使用的相同主題解析過程,但省略了“消息類型默認”步驟。
3.1. Producer Caching
每個底層Pulsar生產者都會消耗資源。為了提高性能并避免持續創建生產者,底層Apache Pulsar Reactive客戶端中的ReactiveMessageSenderCache緩存了它創建的生產者。它們以LRU方式緩存,并在配置的時間段內未被使用時被驅逐。
您可以通過指定任何spring.pulsinger.producer.cache.*應用程序屬性來配置緩存設置。