1. Pulsar Template
在Pulsar生產者端,Spring Boot自動配置提供了一個用于發布記錄的PulsarTemplate。該模板實現了一個名為PulsarOperations的接口,并提供了通過其合約發布記錄的方法。
這些send API方法有兩類:send和sendAsync。send方法通過Pulsar生成器上的同步發送功能阻止調用。它們返回消息在代理上持久化后發布的消息的MessageId。sendAsync方法調用是非阻塞的異步調用。它們返回一個CompletableFuture,您可以在消息發布后使用它異步接收消息ID。
對于不包含主題參數的API變體,將使用主題解析過程來確定目標主題。
1.1. Simple API
該模板為簡單的發送請求提供了一些方法(前綴為“send”)。對于更復雜的發送請求,流暢的API可以讓您配置更多選項。
1.2. Fluent API
該模板提供了一個流暢的構建器來處理更復雜的發送請求。
1.3. Message customization
您可以指定一個TypedMessageBuilderCustomizer來配置傳出消息。例如,以下代碼顯示了如何發送鍵控消息:
template.newMessage(msg).withMessageCustomizer((mb) -> mb.key("foo-msg-key")).send();
1.4. Producer customization
您可以指定一個ProducerBuilderCustomizer來配置底層Pulsar生產者構建器,該生成器最終構建用于發送傳出消息的生產者。
請謹慎使用,因為這可以完全訪問生產者構建器,調用其某些方法(如create)可能會產生意想不到的副作用。
例如,以下代碼顯示了如何禁用批處理和啟用分塊:
template.newMessage(msg).withProducerCustomizer((pb) -> pb.enableChunking(true).enableBatching(false)).send();
另一個示例顯示了如何在將記錄發布到分區主題時使用自定義路由。在Producer構建器上指定自定義MessageRouter實現,例如:
template.newMessage(msg).withProducerCustomizer((pb) -> pb.messageRouter(messageRouter)).send();
請注意,使用MessageRouter時,spring.pulsar.producter.message-routing-mode的唯一有效設置是自定義。
另一個示例顯示了如何添加一個ProducerInterceptor,該攔截器將攔截和修改生產者在發布到代理之前收到的消息:
template.newMessage(msg).withProducerCustomizer((pb) -> pb.intercept(interceptor)).send();
定制程序將僅適用于用于發送操作的生產者。如果要將自定義程序應用于所有生產商,則必須按照全球生產商自定義中的描述將其提供給生產商工廠。
2. Specifying Schema Information
如果您使用Java基元類型,框架會自動為您檢測模式,您不需要指定任何模式類型來發布數據。對于非基元類型,如果在PulsarTemplate上調用send操作時沒有明確指定Schema,則Spring For Apache Pulsar框架將嘗試構建Schema。JSON類型。
目前支持的復雜模式類型有JSON、AVRO、PROTOBUF、AUTO_PRODUCE_BYTES和帶內聯編碼的KEY_VALUE。
2.1. Custom Schema Mapping
作為在PulsarTemplate上調用復雜類型的發送操作時指定模式的替代方法,模式解析器可以配置類型的映射。這消除了在框架使用傳出消息類型咨詢解析器時指定模式的需要。
2.1.1. Configuration properties
模式映射可以使用spring.pulsar.defaults.type-mappings屬性進行配置。以下示例使用application.yml分別使用AVRO和JSON模式為User和Address復雜對象添加映射:
spring:pulsar:defaults:type-mappings:- message-type: com.acme.Userschema-info:schema-type: AVRO- message-type: com.acme.Addressschema-info:schema-type: JSON
消息類型是消息類的完全限定名。
2.1.2. Schema resolver customizer
添加映射的首選方法是通過上述屬性。但是,如果需要更多的控制,您可以提供一個模式解析器定制器來添加映射。
以下示例使用模式解析器定制器分別使用AVRO和JSON模式為User和Address復雜對象添加映射:
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {return (schemaResolver) -> {schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));}
}
2.1.3. Type mapping annotation
指定用于特定消息類型的默認模式信息的另一種選擇是用@PulsarMessage注釋標記消息類。可以通過注釋上的schemaType屬性指定架構信息。
以下示例將系統配置為在生成或使用Foo類型的消息時使用JSON作為默認模式:
@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}
有了這個配置,就不需要在發送操作上設置或指定模式。
2.2. Producing with AUTO_SCHEMA
如果沒有機會提前知道Pulsar主題的模式類型,您可以使用AUTO_PRODUCE模式將原始JSON或Avro有效載荷安全地發布為byte[]。
在這種情況下,生產者會驗證出站字節是否與目標主題的模式兼容。
只需指定schema的模式。模板上的AUTO_PRODUCE_BYTES()發送操作如下例所示:
void sendUserAsBytes(PulsarTemplate<byte[]> template, byte[] userAsBytes) {template.send("user-topic", userAsBytes, Schema.AUTO_PRODUCE_BYTES());
}
這僅支持Avro和JSON模式類型。
3. Pulsar Producer Factory
PulsarTemplate依賴于PulsarProducerFactory來實際創建底層生產者。Spring Boot自動配置還提供了這個生產者工廠,您可以通過指定任何Spring.pulser.producer.*應用程序屬性來進一步配置它。
如果在直接使用生產者工廠API時沒有指定主題信息,則使用PulsarTemplate使用的相同主題解析過程,唯一的例外是省略了“消息類型默認”步驟。
3.1. Global producer customization
該框架提供了ProducerBuilderCustomizer合約,該合約允許您配置用于構建每個生產者的底層構建器。要自定義所有生產者,您可以將自定義者列表傳遞給PulsarProducerFactory構造函數。使用多個自定義程序時,它們將按照在列表中顯示的順序應用。
如果您使用Spring Boot自動配置,您可以將自定義程序指定為bean,它們將根據其@Order注釋自動傳遞給PulsarProducerFactory。
如果您只想將自定義程序應用于單個生產商,您可以使用Fluent API并在發送時指定自定義程序。
4. Pulsar Producer Caching
每個底層Pulsar生產者都會消耗資源。為了提高性能并避免不斷創建生產者,生產者工廠會緩存它創建的生產者。它們以LRU方式緩存,并在配置的時間段內未被使用時被驅逐。緩存鍵由足夠的信息組成,以確保在后續的創建請求中,調用者返回相同的生產者。
此外,您可以通過指定任何spring.pulsinger.producer.cache.*應用程序屬性來配置緩存設置。
4.1. Caution on Lambda customizers
任何用戶提供的生產者定制器也包含在緩存密鑰中。由于緩存鍵依賴于equals/hashCode的有效實現,因此在使用Lambda自定義程序時必須謹慎。
規則:實現為Lambdas的兩個自定義程序將在equals/hashCode上匹配,當且僅當它們使用相同的Lambda實例并且不需要在其閉包外定義任何變量時。
為了澄清上述規則,我們將看幾個例子。在下面的示例中,定制器被定義為內聯Lambda,這意味著對sendUser的每次調用都使用相同的Lambda實例。此外,它不需要閉包外的變量。因此,它將作為緩存鍵匹配。
void sendUser() {var user = randomUser();template.newMessage(user).withTopic("user-topic").withProducerCustomizer((b) -> b.producerName("user")).send();
}
在下一種情況下,定制器被定義為內聯Lambda,這意味著對sendUser的每次調用都使用相同的Lambda實例。但是,它需要一個閉包外的變量。因此,它將不匹配為緩存鍵。
void sendUser() {var user = randomUser();var name = randomName();template.newMessage(user).withTopic("user-topic").withProducerCustomizer((b) -> b.producerName(name)).send();
}
在最后一個例子中,定制器被定義為內聯Lambda,這意味著對sendUser的每次調用都使用相同的Lambda實例。雖然它確實使用了一個變量名,但它并非源自其閉包之外,因此將作為緩存鍵進行匹配。這說明變量可以在Lambda閉包中使用,甚至可以調用靜態方法。
void sendUser() {var user = randomUser();template.newMessage(user).withTopic("user-topic").withProducerCustomizer((b) -> {var name = SomeHelper.someStaticMethod();b.producerName(name);}).send();
}
規則:如果你的Lambda定制器不是只定義一次(在后續調用中使用相同的實例),或者它需要在閉包之外定義變量,那么你必須提供一個具有有效equals/hashCode實現的定制器實現。
如果不遵守這些規則,那么生產者緩存將始終丟失,您的應用程序性能將受到負面影響。
5. Intercept Messages on the Producer
添加ProducerInterceptor可以讓您在生產者接收到的消息發布到代理之前對其進行攔截和修改。為此,您可以將攔截器列表傳遞給PulsarTemplate構造函數。使用多個攔截器時,應用它們的順序是它們在列表中出現的順序。
如果使用Spring Boot自動配置,則可以將攔截器指定為Beans。它們會自動傳遞給PulsarTemplate。攔截器的排序是通過使用@Order注釋實現的,如下所示:
@Bean
@Order(100)
ProducerInterceptor firstInterceptor() {...
}@Bean
@Order(200)
ProducerInterceptor secondInterceptor() {...
}
如果您沒有使用啟動器,則需要自己配置和注冊上述組件。