Spring Cloud Stream是Spring Cloud體系內的一個框架,用于構建與共享消息傳遞系統連接的高度可伸縮的事件驅動微服務,其目的是簡化消息業務在Spring Cloud應用中的開發。
Spring Cloud Stream的架構圖如下所示,應用程序通過Spring Cloud Stream注入的輸入通道inputs和輸出通道outputs與消息中間件Middleware通信,消息通道通過特定的中間件綁定器Binder實現連接到外部代理。
Spring Cloud Stream的實現基于發布/訂閱機制,核心由四部分構成:Spring Framework中的Spring Messaging和Spring Integrataion,以及Spring Cloud Stream中的Binders和Bindings。
Spring Messaging:Spring Framework中的統一消息編程模型,其核心對象如下:
- Message: 消息對象,包含消息頭Header和消息體Payload。
- MessageChannel:消息通道接口,用于接收消息,提供send方法將消息發送致消息通道。
- MessageHandler:消息處理器接口,用于處理消息邏輯。
Spring Integration:Spring Framework中用于支持企業集成的一種擴展機制,作用是提供一個簡單的模型來構建企業集成解決方案,對Spring Messaging進行了擴展。
- MessageDispatcher: 消息分發接口,用于分發消息和添加刪除消息處理器。
- MessageRouter:消息路由接口,定義默認的輸出消息通道。
- Filter:消息的過濾注解,用于配置消息過濾表達式。
- Aggregator:消息的聚合注解,用于將多條消息聚合成一條。
- Splitter:消息的分割,用于將一條消息拆分成多條。
Binders:目標綁定器,負責與外部消息中間件系統集成的組件。
- doBindProducer:綁定消息中間件客戶端發送消息模塊。
- doBindConsumer:綁定消息中間件客戶端接收消息模塊。
Bindings:外部消息中間件系統與應用程序提供的消息生產者和消費者之間的橋梁。
Spring Cloud Stream官方提供了Kafka Binder和RabbitMQ Binder,用于集成Kafka和RabbitMQ,Spring Cloud Alibaba中加入了RocketMQ Binder,用于將RocketMQ集成到Spring Cloud Stream。
Spring Cloud Alibaba RocketMQ架構圖
Spring Cloud Alibaba RocketMQ的架構圖如下所示:
- MessageChannel(output):消息通道,用于發送消息,Spring Cloud Stream的標準接口。
- MessageChannel(input):消息通道,用于訂閱消息,Spring Cloud Stream的標準接口。
- Binder bindProducer:目標綁定器,將發送通道發過來的消息發送到RocketMQ消息服務器,由Spring Cloud Alibaba團隊按照Spring Cloud Stream的標準協議實現。
- Binder bindConsumer:目標綁定器,將接收到RocketMQ消息服務器的消息推送給訂閱通道,由Spring Cloud Alibaba團隊按照Spring Cloud Stream的標準協議實現。
Spring Cloud Stream消息發送流程
Spring Cloud Stream消息發送流程如下圖所示,包括發送、訂閱、分發、委派、消息處理等,具體實現如下:
在業務代碼中調用MessageChannel接口的Send()方法,例如source.output().send(message)。
public interface Source {String OUTPUT = "output";@Output("output")MessageChannel output();
}@FunctionalInterface
public interface MessageChannel {long INDEFINITE_TIMEOUT = -1L;default boolean send(Message<?> message) {return this.send(message, -1L);}boolean send(Message<?> var1, long var2);
}
AbstractMessageChannel是消息通道的基本實現類,提供發送消息和接收消息的公用方法。
@IntegrationManagedResource
public abstract class AbstractMessageChannel extends IntegrationObjectSupport implements MessageChannel, TrackableComponent, InterceptableChannel, MessageChannelMetrics, ConfigurableMetricsAware<AbstractMessageChannelMetrics>, IntegrationPattern {public boolean send(Message<?> messageArg, long timeout) {// 省略部分代碼sent = this.doSend(message, timeout);// 省略部分代碼return sent;}protected abstract boolean doSend(Message<?> var1, long var2);
}
消息發送到AbstractSubscribableChannel類實現的doSend()方法。
protected boolean doSend(Message<?> message, long timeout) {try {return this.getRequiredDispatcher().dispatch(message);} catch (MessageDispatchingException var6) {String description = var6.getMessage() + " for channel '" + this.getFullChannelName() + "'.";throw new MessageDeliveryException(message, description, var6);}}
通過消息分發類MessageDispatcher把消息分發給MessageHandler。
private MessageDispatcher getRequiredDispatcher() {MessageDispatcher dispatcher = this.getDispatcher();Assert.state(dispatcher != null, "'dispatcher' must not be null");return dispatcher;
}protected abstract MessageDispatcher getDispatcher();
從AbstractSubscribableChannel的實現類DirectChannel得到MessageDispatcher的實現類UnicastingDispatcher。
public class DirectChannel extends AbstractSubscribableChannel {protected UnicastingDispatcher getDispatcher() {return this.dispatcher;}
}
調用dispatch()方法把消息分發給各個MessageHandler。
public class UnicastingDispatcher extends AbstractDispatcher {public final boolean dispatch(Message<?> message) {if (this.executor != null) {Runnable task = this.createMessageHandlingTask(message);this.executor.execute(task);return true;} else {return this.doDispatch(message);}}private boolean doDispatch(Message<?> message) {if (this.tryOptimizedDispatch(message)) {return true;} else {boolean success = false;Iterator<MessageHandler> handlerIterator = this.getHandlerIterator(message);if (!handlerIterator.hasNext()) {throw new MessageDispatchingException(message, "Dispatcher has no subscribers");} else {ArrayList exceptions = null;while(!success && handlerIterator.hasNext()) {MessageHandler handler = (MessageHandler)handlerIterator.next();try {handler.handleMessage(message);success = true;} catch (Exception var9) {RuntimeException runtimeException = IntegrationUtils.wrapInDeliveryExceptionIfNecessary(message, () -> {return "Dispatcher failed to deliver Message";}, var9);if (exceptions == null) {exceptions = new ArrayList();}exceptions.add(runtimeException);boolean isLast = !handlerIterator.hasNext();if (!isLast && this.failover) {this.logExceptionBeforeFailOver(var9, handler, message);}this.handleExceptions(exceptions, message, isLast);}}return success;}}}
}
遍歷所有MessageHandler,調用handlerMessage()處理消息。
private Iterator<MessageHandler> getHandlerIterator(Message<?> message) {Set<MessageHandler> handlers = this.getHandlers();return this.loadBalancingStrategy != null ? this.loadBalancingStrategy.getHandlerIterator(message, handlers) : handlers.iterator();}
查看MessageHandler是從哪里來的,也就是handlers列表中的MessageHandler是如何添加的。
public abstract class AbstractSubscribableChannel extends AbstractMessageChannel implements SubscribableChannel, SubscribableChannelManagement {public boolean subscribe(MessageHandler handler) {MessageDispatcher dispatcher = this.getRequiredDispatcher();boolean added = dispatcher.addHandler(handler);this.adjustCounterIfNecessary(dispatcher, added ? 1 : 0);return added;}
}
AbstractMessageChannelBinder在初始化Binding時,會創建并初始化SendingHandler,調用subscribe()添加到handlers列表。
public abstract class AbstractMessageChannelBinder<C extends ConsumerProperties, P extends ProducerProperties, PP extends ProvisioningProvider<C, P>> extends AbstractBinder<MessageChannel, C, P> implements PollableConsumerBinder<MessageHandler, C>, ApplicationEventPublisherAware {public final Binding<MessageChannel> doBindProducer(final String destination, MessageChannel outputChannel, final P producerProperties) throws BinderException {// 創建Producer的messageHandlerfinal MessageHandler producerMessageHandler;final ProducerDestination producerDestination;try {// 省略部分代碼producerMessageHandler = this.createProducerMessageHandler(producerDestination, producerProperties, outputChannel, errorChannel);// 省略部分代碼// 創建SendingHandler并調用subscribe((SubscribableChannel)outputChannel).subscribe(new AbstractMessageChannelBinder.SendingHandler(producerMessageHandler, HeaderMode.embeddedHeaders.equals(producerProperties.getHeaderMode()), this.headersToEmbed, this.useNativeEncoding(producerProperties)));// 省略部分代碼}}
Producer的MessageHandler是由消息中間件Binder來完成的,Spring Cloud Stream提供了創建MessageHandler的規范。
AbstractMessageChannelBinder的初始化由AbstractBindingLifecycle在Spring 容器加載所有Bean。