Pulsar隊列與Springboot集成有2種模式:官方pulsar-client 或社區Starter(如pulsar-spring-boot-starter)
- 如果考慮最新、最快、最齊全的功能,使用官方pulsar-client
- 如果考慮快速低成本接入,使用社區Starter(如pulsar-spring-boot-starter)
環境依賴:
-
SpringBoot 3.3.12
-
Java 17
-
官方pulsar-client
- 引入依賴
- 配置Pulsar連接
- 創建生產者
- 創建消費者
-
社區Starter
- 引入依賴
- 發送消息
- 接收消息
官方pulsar-client
官方 pulsar-client
提供了最全面的 Pulsar 功能,適合對功能完整性有較高要求的項目。下面我們一步步實現生產者和消費者的功能。
引入依賴
首先,需要在項目中引入 pulsar-client
的依賴,這能幫助我們在 Spring Boot 項目里使用 Pulsar 客戶端功能。
<dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client</artifactId><version>3.3.1</version>
</dependency>
配置Pulsar連接
引入依賴后,我們需要對 Pulsar 進行連接配置,指定 Pulsar 服務的地址。可以在配置文件里添加相關配置,同時創建一個配置類來初始化 Pulsar 客戶端。
spring:pulsar:service-url: pulsar://127.0.0.1:6650
@Configuration
public class PulsarConfig {@Value("${spring.pulsar.client.service-url}")private String serviceUrl;@Beanpublic PulsarClient pulsarClient() throws PulsarClientException {ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(serviceUrl).operationTimeout(30, java.util.concurrent.TimeUnit.SECONDS).connectionTimeout(10, java.util.concurrent.TimeUnit.SECONDS);// 可以添加認證等其他配置// clientBuilder.authentication(AuthenticationFactory.token("your-token"));return clientBuilder.build();}
}
創建生產者
完成連接配置后,就可以創建 Pulsar 生產者來發送消息了。下面的代碼實現了同步和異步發送消息的功能。
@Service
public class PulsarMessageProducer {private static final String TOPIC = "persistent://public/default/messages";@Autowiredprivate PulsarClient pulsarClient;public void sendMessage(String content) throws PulsarClientException {// 創建生產者Producer<Message> producer = pulsarClient.newProducer(Schema.JSON(Message.class)).topic(TOPIC).producerName("message-producer").create();// 創建消息對象Message message = new Message(UUID.randomUUID().toString(),content,LocalDateTime.now());// 發送消息(同步)MessageId messageId = producer.send(message);System.out.println("Message sent successfully. Message ID: " + messageId);// 關閉生產者producer.close();}public CompletableFuture<MessageId> sendMessageAsync(String content) throws PulsarClientException {// 創建生產者Producer<Message> producer = pulsarClient.newProducer(Schema.JSON(Message.class)).topic(TOPIC).producerName("async-message-producer").create();// 創建消息對象Message message = new Message(UUID.randomUUID().toString(),content,LocalDateTime.now());// 異步發送消息CompletableFuture<MessageId> future = producer.sendAsync(message);future.thenAccept(messageId -> {System.out.println("Async message sent successfully. Message ID: " + messageId);try {producer.close();} catch (PulsarClientException e) {e.printStackTrace();}}).exceptionally(throwable -> {System.err.println("Failed to send message: " + throwable.getMessage());try {producer.close();} catch (PulsarClientException e) {e.printStackTrace();}return null;});return future;}
}
創建消費者
創建完生產者后,還需要創建消費者來接收消息。下面的代碼展示了如何啟動一個消費者并異步接收消息。
@Service
public class PulsarMessageConsumer implements CommandLineRunner {private static final String TOPIC = "persistent://public/default/messages";private static final String SUBSCRIPTION = "message-subscription";@Autowiredprivate PulsarClient pulsarClient;@Overridepublic void run(String... args) throws Exception {// 啟動消費者startConsumer();}public void startConsumer() throws PulsarClientException {// 創建消費者Consumer<Message> consumer = pulsarClient.newConsumer(Schema.JSON(Message.class)).topic(TOPIC).subscriptionName(SUBSCRIPTION).subscriptionType(SubscriptionType.Shared).subscribe();// 異步消費消息new Thread(() -> {while (true) {try {// 等待接收消息,超時時間為10秒Message<Message> msg = consumer.receive(10, TimeUnit.SECONDS);if (msg != null) {try {// 處理消息Message message = msg.getValue();System.out.println("Received message: " + message);// 確認消息已消費consumer.acknowledge(msg);} catch (Exception e) {// 處理消息失敗,重新放回隊列consumer.negativeAcknowledge(msg);}}} catch (PulsarClientException e) {if (e.getCause() instanceof java.util.concurrent.TimeoutException) {// 超時異常,繼續等待System.out.println("No message received within timeout period, waiting again...");} else {e.printStackTrace();}}}}).start();}
}
社區Starter
社區提供的 pulsar-spring-boot-starter
簡化了 Pulsar 與 Spring Boot 的集成過程,適合需要快速接入的項目。下面我們來看看如何使用它。
引入依賴
首先,在配置文件中添加 Pulsar 服務的配置信息,這能幫助我們連接到 Pulsar 服務。
# Pulsar 服務
spring:pulsar:client:serviceUrl: pulsar://127.0.0.1:6650
發送消息
完成配置后,就可以使用 PulsarTemplate
來發送消息了。下面的代碼實現了同步和異步發送消息的功能。
@Service
public class MyProducer {private final PulsarTemplate<String> pulsarTemplate;public MyProducer(PulsarTemplate<String> pulsarTemplate) {this.pulsarTemplate = pulsarTemplate;}public void sendMessage(String message) {
// 由于 convertAndSend(String, String) 方法未定義,可能需要使用正確的方法
// 假設使用 send 方法來替代,具體根據 PulsarTemplate 的實際方法決定pulsarTemplate.send("my-topic", message);System.out.println("Sent: " + message);}public CompletableFuture<MessageId> sendMessageAsync(String message) {return pulsarTemplate.sendAsync("my-topic", message);}
}
接收消息
發送消息后,還需要創建消費者來接收消息。使用 @PulsarListener
注解可以方便地監聽消息。下面的代碼展示了如何接收消息。
@Service
public class MyConsumer {@PulsarListener(topics = "my-topic")public void receive(Message<String> message) {System.out.println("Received in Spring Boot: " + message.getValue());}
}