RocketMQ - Spring Cloud Alibaba RocketMQ

Spring Cloud Stream是Spring Cloud體系內的一個框架,用于構建與共享消息傳遞系統連接的高度可伸縮的事件驅動微服務,其目的是簡化消息業務在Spring Cloud應用中的開發。

Spring Cloud Stream的架構圖如下所示,應用程序通過Spring Cloud Stream注入的輸入通道inputs和輸出通道outputs與消息中間件Middleware通信,消息通道通過特定的中間件綁定器Binder實現連接到外部代理。
Spring Cloud Stream的架構圖
Spring Cloud Stream的實現基于發布/訂閱機制,核心由四部分構成:Spring Framework中的Spring Messaging和Spring Integrataion,以及Spring Cloud Stream中的Binders和Bindings。

Spring Messaging:Spring Framework中的統一消息編程模型,其核心對象如下:

  • Message: 消息對象,包含消息頭Header和消息體Payload。
  • MessageChannel:消息通道接口,用于接收消息,提供send方法將消息發送致消息通道。
  • MessageHandler:消息處理器接口,用于處理消息邏輯。

Spring Integration:Spring Framework中用于支持企業集成的一種擴展機制,作用是提供一個簡單的模型來構建企業集成解決方案,對Spring Messaging進行了擴展。

  • MessageDispatcher: 消息分發接口,用于分發消息和添加刪除消息處理器。
  • MessageRouter:消息路由接口,定義默認的輸出消息通道。
  • Filter:消息的過濾注解,用于配置消息過濾表達式。
  • Aggregator:消息的聚合注解,用于將多條消息聚合成一條。
  • Splitter:消息的分割,用于將一條消息拆分成多條。

Binders:目標綁定器,負責與外部消息中間件系統集成的組件。

  • doBindProducer:綁定消息中間件客戶端發送消息模塊。
  • doBindConsumer:綁定消息中間件客戶端接收消息模塊。

Bindings:外部消息中間件系統與應用程序提供的消息生產者和消費者之間的橋梁。

Spring Cloud Stream官方提供了Kafka Binder和RabbitMQ Binder,用于集成Kafka和RabbitMQ,Spring Cloud Alibaba中加入了RocketMQ Binder,用于將RocketMQ集成到Spring Cloud Stream。

Spring Cloud Alibaba RocketMQ架構圖

Spring Cloud Alibaba RocketMQ的架構圖如下所示:
在這里插入圖片描述

  • MessageChannel(output):消息通道,用于發送消息,Spring Cloud Stream的標準接口。
  • MessageChannel(input):消息通道,用于訂閱消息,Spring Cloud Stream的標準接口。
  • Binder bindProducer:目標綁定器,將發送通道發過來的消息發送到RocketMQ消息服務器,由Spring Cloud Alibaba團隊按照Spring Cloud Stream的標準協議實現。
  • Binder bindConsumer:目標綁定器,將接收到RocketMQ消息服務器的消息推送給訂閱通道,由Spring Cloud Alibaba團隊按照Spring Cloud Stream的標準協議實現。

Spring Cloud Stream消息發送流程

Spring Cloud Stream消息發送流程如下圖所示,包括發送、訂閱、分發、委派、消息處理等,具體實現如下:
在這里插入圖片描述
在業務代碼中調用MessageChannel接口的Send()方法,例如source.output().send(message)。

public interface Source {String OUTPUT = "output";@Output("output")MessageChannel output();
}@FunctionalInterface
public interface MessageChannel {long INDEFINITE_TIMEOUT = -1L;default boolean send(Message<?> message) {return this.send(message, -1L);}boolean send(Message<?> var1, long var2);
}

AbstractMessageChannel是消息通道的基本實現類,提供發送消息和接收消息的公用方法。

@IntegrationManagedResource
public abstract class AbstractMessageChannel extends IntegrationObjectSupport implements MessageChannel, TrackableComponent, InterceptableChannel, MessageChannelMetrics, ConfigurableMetricsAware<AbstractMessageChannelMetrics>, IntegrationPattern {public boolean send(Message<?> messageArg, long timeout) {// 省略部分代碼sent = this.doSend(message, timeout);// 省略部分代碼return sent;}protected abstract boolean doSend(Message<?> var1, long var2);
}

消息發送到AbstractSubscribableChannel類實現的doSend()方法。

protected boolean doSend(Message<?> message, long timeout) {try {return this.getRequiredDispatcher().dispatch(message);} catch (MessageDispatchingException var6) {String description = var6.getMessage() + " for channel '" + this.getFullChannelName() + "'.";throw new MessageDeliveryException(message, description, var6);}}

通過消息分發類MessageDispatcher把消息分發給MessageHandler。

private MessageDispatcher getRequiredDispatcher() {MessageDispatcher dispatcher = this.getDispatcher();Assert.state(dispatcher != null, "'dispatcher' must not be null");return dispatcher;
}protected abstract MessageDispatcher getDispatcher();

從AbstractSubscribableChannel的實現類DirectChannel得到MessageDispatcher的實現類UnicastingDispatcher。

public class DirectChannel extends AbstractSubscribableChannel {protected UnicastingDispatcher getDispatcher() {return this.dispatcher;}
}

調用dispatch()方法把消息分發給各個MessageHandler。

public class UnicastingDispatcher extends AbstractDispatcher {public final boolean dispatch(Message<?> message) {if (this.executor != null) {Runnable task = this.createMessageHandlingTask(message);this.executor.execute(task);return true;} else {return this.doDispatch(message);}}private boolean doDispatch(Message<?> message) {if (this.tryOptimizedDispatch(message)) {return true;} else {boolean success = false;Iterator<MessageHandler> handlerIterator = this.getHandlerIterator(message);if (!handlerIterator.hasNext()) {throw new MessageDispatchingException(message, "Dispatcher has no subscribers");} else {ArrayList exceptions = null;while(!success && handlerIterator.hasNext()) {MessageHandler handler = (MessageHandler)handlerIterator.next();try {handler.handleMessage(message);success = true;} catch (Exception var9) {RuntimeException runtimeException = IntegrationUtils.wrapInDeliveryExceptionIfNecessary(message, () -> {return "Dispatcher failed to deliver Message";}, var9);if (exceptions == null) {exceptions = new ArrayList();}exceptions.add(runtimeException);boolean isLast = !handlerIterator.hasNext();if (!isLast && this.failover) {this.logExceptionBeforeFailOver(var9, handler, message);}this.handleExceptions(exceptions, message, isLast);}}return success;}}}
}

遍歷所有MessageHandler,調用handlerMessage()處理消息。

private Iterator<MessageHandler> getHandlerIterator(Message<?> message) {Set<MessageHandler> handlers = this.getHandlers();return this.loadBalancingStrategy != null ? this.loadBalancingStrategy.getHandlerIterator(message, handlers) : handlers.iterator();}

查看MessageHandler是從哪里來的,也就是handlers列表中的MessageHandler是如何添加的。

public abstract class AbstractSubscribableChannel extends AbstractMessageChannel implements SubscribableChannel, SubscribableChannelManagement {public boolean subscribe(MessageHandler handler) {MessageDispatcher dispatcher = this.getRequiredDispatcher();boolean added = dispatcher.addHandler(handler);this.adjustCounterIfNecessary(dispatcher, added ? 1 : 0);return added;}
}

AbstractMessageChannelBinder在初始化Binding時,會創建并初始化SendingHandler,調用subscribe()添加到handlers列表。

public abstract class AbstractMessageChannelBinder<C extends ConsumerProperties, P extends ProducerProperties, PP extends ProvisioningProvider<C, P>> extends AbstractBinder<MessageChannel, C, P> implements PollableConsumerBinder<MessageHandler, C>, ApplicationEventPublisherAware {public final Binding<MessageChannel> doBindProducer(final String destination, MessageChannel outputChannel, final P producerProperties) throws BinderException {// 創建Producer的messageHandlerfinal MessageHandler producerMessageHandler;final ProducerDestination producerDestination;try {// 省略部分代碼producerMessageHandler = this.createProducerMessageHandler(producerDestination, producerProperties, outputChannel, errorChannel);// 省略部分代碼// 創建SendingHandler并調用subscribe((SubscribableChannel)outputChannel).subscribe(new AbstractMessageChannelBinder.SendingHandler(producerMessageHandler, HeaderMode.embeddedHeaders.equals(producerProperties.getHeaderMode()), this.headersToEmbed, this.useNativeEncoding(producerProperties)));// 省略部分代碼}}

Producer的MessageHandler是由消息中間件Binder來完成的,Spring Cloud Stream提供了創建MessageHandler的規范。

AbstractMessageChannelBinder的初始化由AbstractBindingLifecycle在Spring 容器加載所有Bean。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/213751.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/213751.shtml
英文地址,請注明出處:http://en.pswp.cn/news/213751.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

論文閱讀《Domain Generalized Stereo Matching via Hierarchical Visual Transformation》

論文地址&#xff1a;https://openaccess.thecvf.com/content/CVPR2023/html/Chang_Domain_Generalized_Stereo_Matching_via_Hierarchical_Visual_Transformation_CVPR_2023_paper.html 概述 立體匹配模型是近年來的研究熱點。但是&#xff0c;現有的方法過分依賴特定數據集上…

五年制專轉本備考沖刺階段,老師給你六點建議助你上岸

1、熱衷的不是學習&#xff0c;而是思考 人與人之間最大的差別在于思維的差別&#xff0c;也可以說是思考的差別。專轉本也是如此&#xff0c;有人思考得簡單&#xff0c;有人思考得復雜&#xff1b;有人想得全面&#xff0c;有人想得膚淺。 只有善于思考&#xff0c;才會對問…

100:ReconFusion: 3D Reconstruction with Diffusion Priors

簡介 官網 少樣本重建必然導致nerf失敗&#xff0c;論文提出使用diffusion模型來解決這一問題。從上圖不難看出&#xff0c;論文一步步提升視角數量&#xff0c;逐步與Zip-NeRF對比。 實現流程 Diffusion Model for Novel View Synthesis 給定一組輸入圖像 x o b s { x i…

Jmeter beanshell編程實例

1、引言 BeanShell是一種小型的&#xff0c;免費的&#xff0c;可嵌入的符合Java語法規范的源代碼解釋器&#xff0c;具有對象腳本語言特性。 在Jmeter實踐中&#xff0c;由于BeanShell組件較高的自由度&#xff0c;通常被用來處理較為復雜&#xff0c;其它組件難以處理的問題…

c語言:文件操作(1)

前言&#xff1a;為什么要使用文件 使用文件可以讓程序在不同運行之間保存和讀取數據。這樣可以實現持久化存儲&#xff0c;即使程序關閉后數據也不會丟失。文件也可以用于數據交換&#xff0c;允許不同程序之間共享信息。在 C 語言中&#xff0c;文件還可以用于讀取配置信息&…

系統架構設計師教程(三)信息系統基礎知識

信息系統基礎知識 3.1 信息系統概述3.1.1 信息系統的定義3.1.2 信息系統的發展3.1.3 信息系統的分類3.1.4 信息系統的生命周期3.1.5 信息系統建設原則3.1.6 信息系統開發方法 3.2 業務處理系統 (TPS)3.2.1 業務處理系統的概念3.2.2 業務處理系統的功能3.2.3 業務處理系統的特點…

Python:核心知識點整理大全13-筆記

目錄 6.4.3 在字典中存儲字典 6.5 小結 第7章 用戶輸入和while循環 7.1 函數 input()的工作原理 7.1.1 編寫清晰的程序 7.1.2 使用 int()來獲取數值輸入 7.1.3 求模運算符 7.1.4 在 Python 2.7 中獲取輸入 7.2 while 循環簡介 7.2.1 使用 while 循環 往期快速傳送門…

基于jsonrpc4j實現JSON-RPC over HTTP(客戶端多種調用方式)

1.說明 前文基于jsonrpc4j實現JSON-RPC over HTTP(服務端集成Spring Boot)&#xff0c; 介紹了JSON-RPC over HTTP服務端的實現方法&#xff0c; 并且通過Postman工具調用服務端對外提供的方法&#xff0c; 下面介紹兩種基于Java代碼調用客戶端的方法&#xff1a; 非Spring框…

什么是https 加密協議?

什么是https 加密協議&#xff1f; 加密通信的作用加密原理數字證書SSL/TLS 協議部署和使用重要性 HTTPS&#xff08;Hyper Text Transfer Protocol Secure&#xff09;是一種網絡傳輸協議&#xff0c;它是基于HTTP協議的擴展&#xff0c;通過加密通信內容來保障數據傳輸的安全…

SPI 通信-stm32入門

本節我們將繼續學習下一個通信協議 SPI&#xff0c;SPI 通信和我們剛學完的 I2C 通信差不多。兩個協議的設計目的都一樣&#xff0c;都是實現主控芯片和各種外掛芯片之間的數據交流&#xff0c;有了數據交流的能力&#xff0c;我們主控芯片就可以掛載并操縱各式各樣的外部芯片&…

gpu版本的GNN的demo

1、當涉及到在GPU上運行圖神經網絡&#xff08;GNN&#xff09;時&#xff0c;通常使用深度學習框架&#xff0c;如PyTorch或TensorFlow。在這里&#xff0c;我將為您提供一個使用PyTorch Geometric庫實現GNN的簡單示例。 首先&#xff0c;確保您已經安裝了PyTorch和PyTorch G…

第 375 場 LeetCode 周賽題解

A 統計已測試設備 模擬&#xff1a;記錄當前已測試設備數量 class Solution { public:int countTestedDevices(vector<int> &batteryPercentages) {int res 0;int s 0;for (auto x: batteryPercentages) {if (x - s > 0) {res;s;}}return res;} };B 雙模冪運算 …

【無線網絡技術】——無線城域網(學習筆記)

&#x1f4d6; 前言&#xff1a;無線城域網&#xff08;WMAN&#xff09;是指在地域上覆蓋城市及其郊區范圍的分布節點之間傳輸信息的本地分配無線網絡。能實現語音、數據、圖像、多媒體、IP等多業務的接入服務。其覆蓋范圍的典型值為3~5km&#xff0c;點到點鏈路的覆蓋可以高達…

少兒編程考級:激發孩子邏輯思維能力的關鍵

在當今信息化時代&#xff0c;少兒編程已經成為孩子們不可或缺的一項技能。而少兒編程考級&#xff0c;則是檢驗孩子們在這一技能上所取得的成就的重要途徑。少兒編程考級不僅能夠激發孩子們的邏輯思維能力&#xff0c;還能夠提高他們的動手能力和創造力。6547網將詳細介紹少兒…

電源模塊測試系統測試LED電源項目的優勢

LED電源測試是電源在設計、生產過程中的關鍵環節&#xff0c;也是確保LED照明產品可靠性和穩定性的重要步驟。LED電源測試一般包括電壓、電流、效率、穩定性等。電源模塊測試系統測試LED電源&#xff0c;實現自動化測試&#xff0c;保證測試結果的可靠性。 LED電源測試項目及方…

實現加鹽加密方法以及MappedByteBuffer,RandomAccess

目錄 自己實現 Spring Security MappedByteBuffer RandomAccess 加鹽加密的實現 自己實現 傳統MD5可通過彩虹表暴力破解&#xff0c; 加鹽加密算法是一種常用的密碼保護方法&#xff0c;它將一個隨機字符串&#xff08;鹽&#xff09;添加到原始密碼中&#xff0c;然后再進…

力扣17. 電話號碼的字母組合(java 回溯法)

Problem: 17. 電話號碼的字母組合 文章目錄 題目描述思路解題方法復雜度Code 題目描述 思路 題目給定一串數字&#xff0c;要求我們找出所有可能的字母組合&#xff0c;即我們可以窮舉出所有可能的結果&#xff0c;而涉及到窮舉我們自然可以想到利用回溯來解決問題&#xff0c…

xv6 中的一些系統調用(下)

〇、前言 本文將會結合源代碼談論 sleep、wakeup 這兩個系統調用。 一、sleep()系統調用 以下是sleep()函數源碼&#xff1a; // Atomically release lock and sleep on chan. // Reacquires lock when awakened. void sleep(void *chan, struct spinlock *lk) {struct pro…

無線且列窄圖片如何轉excel?

寫此文原因&#xff1a;圖片要轉excel&#xff0c;這放以前&#xff0c;是不能實現的功能&#xff0c;但隨著人工智能的蓬勃發展&#xff0c;人們已克服了這一難題&#xff0c;但是&#xff0c;我們知道&#xff0c;要將圖片識別成excel&#xff0c;識別程序首先要先識別圖片中…

如何在小米路由器4A千兆版刷入OpenWRT并通過內網穿透工具實現公網遠程訪問

文章目錄 前言1. 安裝Python和需要的庫2. 使用 OpenWRTInvasion 破解路由器3. 備份當前分區并刷入新的Breed4. 安裝cpolar內網穿透4.1 注冊賬號4.2 下載cpolar客戶端4.3 登錄cpolar web ui管理界面4.4 創建公網地址 5. 固定公網地址訪問 前言 OpenWRT是一個高度模塊化、高度自…