ActiveMQ源碼解析 建立連接

作為一個消息中間件,有客戶端和服務端兩部分代碼,這次的源碼解析系列主要從客戶端的代碼入手,分成建立連接、消息發送、消息消費三個部分。趁著我昨天弄明白了源碼編譯的興奮勁頭還沒過去,今天研究一下建立連接的部分。

如果讀起來吃力,代碼部分可以略過,我把主要的功能點給加粗。

通常來說,客戶端使用MQ的API建立時,可以分成兩個步驟:

  1. 對于連接的配置,比如服務器IP地址,用戶名和密碼等等
  2. 建立連接并啟動
    客戶端示例代碼:
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(username,password,url);  
ActiveMQConnection connection = connectionFactory.createConnection();  
connection.start();

可以看到主要的方法是ActiveMQConnectionFactory的構造函數,和createConnection(),以及connection中的start()方法。

ActiveMQConnectionFactory中的createConnection
構造函數比較簡單,直接把傳入的用戶名密碼和url放在變量里

public ActiveMQConnectionFactory(String userName, String password, URI brokerURL) {setUserName(userName);setPassword(password);setBrokerURL(brokerURL.toString());
}

createConnection方法指向了createActiveMQConnection方法,該方法中主要做的事情有三個:

  1. 建立Transport和通過Transport建立Connection
  2. 配置Connection,建立好的Transport對象會被放到Connection對象中
  3. 啟動Transport
//建立Transport和通過Transport建立Connection
Transport transport = createTransport();
connection = createActiveMQConnection(transport, factoryStats);            
//配置
connection.setUserName(userName);            
connection.setPassword(password);            
configureConnection(connection);
//啟動Transport
transport.start();

configureConnection(connection);這個方法的作用是對實例化出的ActiveMQConnetion對象中的參數的一系列配置,代碼有點長就不上了。
對于我們來說其實主要想看的是連接是如何建立起來的,也就是

Transport transport = createTransport();
connection = createActiveMQConnection(transport, factoryStats);      

createTransport();方法中包含了對客戶端傳入的url的初步校驗,主要是驗證URL的合法性,而后調用工廠類TransportFactory.connection(url)來進行連接的建立。

我們客戶端在建立連接的時候,有可能有TCP、UDP等等協議,AMQ實現了簡單工廠類FactoryFinder,在TransportFactory.connection(url)方法中,先是通過FactoryFinder根據用戶輸入的url(比如tcp://192.168.0.1)來找到使用的協議工廠TcpTransportFactory,然后使用TcpTransportFactory中的類來進行連接的建立。這個過程從代碼上來看有點曲折:

  1. TransportFactory的connect()調用findTransportFactory方法
  2. findTransportFactory調用FactoryFinder類的newInstance方法
  3. newInstance調用ObjectFactory類的create方法
  4. ObejctFactory是一個接口類,實現類是StandaloneObjectFactory,其中的create方法調用自身的loadClass方法
  5. loadClass方法中最終找到正確的類,返回至TransportFactory中
  6. 如果是tcp連接,最終得到的就是一個實例化的TcpTransportFactory類
public abstact class TransportFactory {
……private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/");public static Transport connect(URI location) throws Exception {TransportFactory tf = findTransportFactory(location);return tf.doConnect(location);}public static TransportFactory findTransportFactory(URI location) throws IOException {//拆分urlString scheme = location.getScheme();if (scheme == null) {throw new IOException("Transport not scheme specified: [" + location + "]");}TransportFactory tf = TRANSPORT_FACTORYS.get(scheme);if (tf == null) {// 調用FactoryFinder找到正確的TransportFactorytry {tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme);TRANSPORT_FACTORYS.put(scheme, tf);} catch (Throwable e) {throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e);}}return tf;}
……
}
public class FactoryFinder {
……//通過ObjectFactory來找到正確的TransportFactorypublic Object newInstance(String key) throws IllegalAccessException, InstantiationException, IOException, ClassNotFoundException {return objectFactory.create(path+key);}
……
}

ObjectFactory的設計也是很有趣的。AMQ在代碼中的說法是之所以這么實現是因為這樣如果用戶想自己寫一個ObjectFactory,也可以支持。

/*** The strategy that the FactoryFinder uses to find load and instantiate Objects* can be changed out by calling the* {@link org.apache.activemq.util.FactoryFinder#setObjectFactory(org.apache.activemq.util.FactoryFinder.ObjectFactory)}* method with a custom implementation of ObjectFactory.** The default ObjectFactory is typically changed out when running in a specialized container* environment where service discovery needs to be done via the container system.  For example,* in an OSGi scenario.*/public interface ObjectFactory {/*** @param path the full service path* @return*/public Object create(String path) throws IllegalAccessException, InstantiationException, IOException, ClassNotFoundException;}

Anyway,我們現在通過這么曲折的過程得到了一個實例化的TcpTransportFactory對象,下一步應該是調用doConnect(url)方法進行連接的建立了。因為TcpTransportFactory繼承了TransportFactory類,doConnect方法仍然是在TransportFactory中的:

public Transport doConnect(URI location) throws Exception {try {//把url里關于Transport的配置提取出來,WireFormat基本都可以看成是url的配置。//如果使用Openwire(默認協議),那么WireFormat就是openwire的相關配置。//見http://activemq.apache.org/configuring-wire-formats.htmlMap<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location));if( !options.containsKey("wireFormat.host") ) {options.put("wireFormat.host", location.getHost());}WireFormat wf = createWireFormat(options);//建立socket連接Transport transport = createTransport(location, wf);//裝飾者模式,在連接上包裝上MutexTransportFilter、WireFormatNegotiator、InactivityMonitor、ResponseCorrelator四個功能Transport rc = configure(transport, wf, options);//remove autoIntrospectionSupport.extractProperties(options, "auto.");if (!options.isEmpty()) {throw new IllegalArgumentException("Invalid connect parameters: " + options);}return rc;} catch (URISyntaxException e) {throw IOExceptionSupport.create(e);}}

這個方法中主要有三個重要功能:

  1. 配置wireformat
  2. 建立TcpTransport連接
  3. 在連接上包裝四大輔助功能
    其中配置WireFormat可以不看,建立TcpTransport其實是在調用createTransport(location, wf);時引用了TcpTransport的構造函數,代碼如下:
public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation,URI localLocation) throws UnknownHostException, IOException {this.wireFormat = wireFormat;this.socketFactory = socketFactory;try {this.socket = socketFactory.createSocket();} catch (SocketException e) {this.socket = null;}this.remoteLocation = remoteLocation;this.localLocation = localLocation;this.initBuffer = null;setDaemon(false);}

這里直接調用了socketFactory.createSocket();,使用的是默認的方法,所以無法指定本地網卡建立連接。我看了下其實可以用socketFactory.createSocket(host, port, localHost, localPort)來改寫,改寫后就可以指定本地IP和端口了。

此外,查了下網絡上的資料,四大輔助后續再看:

MutexTransportFilter類實現了對每個請求的同步鎖,同一時間只允許發送一個請求,如果有第二個請求需要等待第一個請求發送完畢才可繼續發送。

WireFormatNegotiator類實現了在客戶端連接broker的時候先發送數據解析相關的協議信息,例如解析版本號,是否使用緩存等信息。

InactivityMonitor類實現了連接成功后啟動心跳檢查機制,客戶端每10秒發送一次心跳信息,服務端每30秒讀一次心跳信息,如果沒有讀到則會斷開連接,心跳檢測是相互的,客戶端也會每30秒讀取服務端發送來的心跳信息,如果沒有讀到也一樣會斷開連接。

ResponseCorrelator類實現了異步請求但需要獲取響應信息否則就會阻塞等待功能。

ActiveMQConnection的Start()
在使用AMQ的過程中,很多用戶問我為什么Connection需要start(),不能在createConnection的時候直接start了嗎?而且不調用start方法其實不影響發送,但是會影響接收。抱著這樣的疑惑,我們來看一下源碼。

 /*** Starts (or restarts) a connection's delivery of incoming messages. A call* to <CODE>start</CODE> on a connection that has already been started is* ignored.** @throws JMSException if the JMS provider fails to start message delivery*                 due to some internal error.* @see javax.jms.Connection#stop()*/@Overridepublic void start() throws JMSException {checkClosedOrFailed();ensureConnectionInfoSent();if (started.compareAndSet(false, true)) {for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {ActiveMQSession session = i.next();session.start();}}}

源碼里直接對start方法加了注釋,說明start就是啟動connection可以接收消息的功能。其實源碼里可以很明顯看出來start里包含了幾個步驟:

  1. 檢查連接是否關閉或失效
  2. 確認客戶端的ConnectionInfo是否被送到服務器
  3. 啟動這個Connection中的每一個Session

我好奇的是第二步,看看源碼

 /*** Send the ConnectionInfo to the Broker** @throws JMSException*/protected void ensureConnectionInfoSent() throws JMSException {synchronized(this.ensureConnectionInfoSentMutex) {// Can we skip sending the ConnectionInfo packet??if (isConnectionInfoSentToBroker || closed.get()) {return;}//TODO shouldn't this check be on userSpecifiedClientID rather than the value of clientID?if (info.getClientId() == null || info.getClientId().trim().length() == 0) {info.setClientId(clientIdGenerator.generateId());}syncSendPacket(info.copy(), getConnectResponseTimeout());this.isConnectionInfoSentToBroker = true;// Add a temp destination advisory consumer so that// We know what the valid temporary destinations are on the// broker without having to do an RPC to the broker.ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId());if (watchTopicAdvisories) {advisoryConsumer = new AdvisoryConsumer(this, consumerId);}}}

從源碼里還能看到討論和待辦……我覺得我已經深入核心了……這個方法里做了兩件事,

發送ConnectionInfo的數據包到服務端,如果info里沒有用戶自己設定的clientID,還會自動幫忙生成一個。發送的時候調用的是syncSendPacket方法,很明顯是個同步發送,需要服務端同步返回response才算發送成功,我理解這里應該是一個試探連接的步驟。
建立一個通往臨時目的地的消費者。所以其實每一個ActiveMQConnection的連接中都自動包含了一個消費者。我臨時寫了個客戶端試了下,的確是存在的。
在這里插入圖片描述
奇葩的是我就算不調用connection.start()方法,直接發送消息,這個臨時消費者也是存在的,所以肯定是在消息發送的時候的哪個地方調用了connection的start方法。

至于為什么不調用start()方法就無法消費,我看了下session的start方法:

/*** Start this Session.** @throws JMSException*/protected void start() throws JMSException {started.set(true);for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {ActiveMQMessageConsumer c = iter.next();c.start();}executor.start();}

原來是在session的start方法里啟動了這個session里的consumer,想想session的建立過程的確是不需要調用session.start方法的,但是我們一般是先調用start方法,而后建立consumer,這個邏輯順序還是有點錯亂……
等下一次研究接收端的源碼時再深入吧。

本次發現的源碼優化點

  1. socket建立時,使用不同的createSocket方法,指定本機IP和端口。
  2. 項目用到了advisory message,每當agent建立/斷開連接的時候,ActiveMQ.Advisory.Connection中會生成一條消息,這個消息中帶上了ConnectionInfo。項目就是使用這個來即時檢測agent的在線和離線狀態的。因此如果我們改一下ConnectionInfo,加上agent的一些重要信息,比如agent版本號,操作系統類型,真實IP地址等等,會在獲取agent信息的即時性上得到很大的提高。

我真的去試了一下……在ConnectionInfo里添加了一條test=test,然后重新編譯服務端和客戶端的依賴jar包,開啟MQ的logging plugins,并且用客戶端去監聽了一下ActiveMQ.Advisory.Connection,得到了這樣的結果。
在這里插入圖片描述

ConnectionInfo {commandId = 1, 
responseRequired = true, 
connectionId = ID:Air.local-51230-1502000963732-1:1, 
clientId = ID:Air.local-51230-1502000963732-0:1, 
clientIp = tcp://127.0.0.1:51231, 
userName = null, password = *****, 
test = test, 
brokerPath = null, 
brokerMasterConnector = false, 
manageable = true, 
clientMaster = true, 
faultTolerant = true, 
failoverReconnect = false}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/249538.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/249538.shtml
英文地址,請注明出處:http://en.pswp.cn/news/249538.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

原生Js_實現廣告彈窗

廣告樣式當頁面加載后5s刷新在右下角 <!DOCTYPE html> <html><head><meta charset"utf-8" /><title>Gary圖片輪播</title><style type"text/css">#ad{width:300px;height: 300px;background-color:antiquewhite…

springcloud注冊中心eureka

1、前提 springcloud的注冊中心是以springboot為基礎搭建起來的。 開發工具&#xff1a;IDEA 項目管理工具&#xff1a;maven 2、搭建步驟 創建一個web項目&#xff08;建議使用IDEA工具構建項目&#xff09;修改pom文件 <dependency><groupId>org.springframework…

Nancy in .Net Core學習筆記 - 視圖引擎

前文中我們介紹了Nancy中的路由&#xff0c;這一篇我們來介紹一下Nancy中的視圖引擎。 Nancy中如何返回一個視圖(View) 在ASP.NET Mvc中&#xff0c;我們使用ViewResult類來返回一個視圖。Nancy中也提供了類似的功能, 在NancyModule類中&#xff0c;Nancy提供了一個ViewRendere…

設計模式之組合模式(Composite 模式)

引入composite模式 在計算機文件系統中&#xff0c;有文件夾的概念&#xff0c;文件夾里面既可以放入文件也可以放入文件夾&#xff0c;但是文件中卻不能放入任何東西。文件夾和文件構成了一種遞歸結構和容器結構。 雖然文件夾和文件是不同的對象&#xff0c;但是他們都可以被放…

Ansible批量在遠程主機執行命令

Ansible直接執行遠程命令&#xff0c;不用ssh登陸交互執行。    如下&#xff1a;    ansible all -i 192.168.199.180, -m shell -a "ifconfig" -u supermap    參數解釋&#xff1a;    -i 連接到遠程主機“192.168.199.180&#xff0c;”&#xf…

HOJ 2651

一道二分的題目&#xff0c;但要注意不能用double&#xff0c; 并且要注意一下二分的步驟 #include <cstdio> #include <cstring> #include <iostream> #include <algorithm> #define pi 3.1415926535898 #define eps 0.0001 using namespace std; inl…

HierarchicalBeanFactory接口

HierarchicalBeanFactory 提供父容器的訪問功能.至于父容器的設置,需要找ConfigurableBeanFactory的setParentBeanFactory(接口把設置跟獲取給拆開了!). HierarchicalBeanFactory源碼具體&#xff1a; 1、第一個方法返回本Bean工廠的父工廠。這個方法實現了工廠的分層。 2、第…

C++: C++函數聲明的時候后面加const

C: C函數聲明的時候后面加const 轉自&#xff1a;http://blog.csdn.net/zhangss415/article/details/7998123 非靜態成員函數后面加const&#xff08;加到非成員函數或靜態成員后面會產生編譯錯誤&#xff09;&#xff0c;表示成員函數隱含傳入的this指針為const指針&#xff0…

【計蒜客習題】消除字符串

問題描述 蒜頭君喜歡中心對稱的字符串&#xff0c;即回文字符串。現在蒜頭君手里有一個字符串 SS&#xff0c;蒜頭君每次都會進行這樣的操作&#xff1a;從 SS 中挑選一個回文的子序列&#xff0c;將其從字符串 SS 中去除&#xff0c;剩下的字符重組成新的字符串 SS。 蒜頭君想…

HierarchicalBeanFactory

BeanFactory分層 package org.springframework.beans.factory;//分層工廠 public interface HierarchicalBeanFactory extends BeanFactory {//返回工廠的父工廠BeanFactory getParentBeanFactory();//這個工廠中是否包含這個Beanboolean containsLocalBean(String name); }測…

Training a classifier

你已經學習了如何定義神經網絡&#xff0c;計算損失和執行網絡權重的更新。 現在你或許在思考。 What about data? 通常當你需要處理圖像&#xff0c;文本&#xff0c;音頻&#xff0c;視頻數據&#xff0c;你能夠使用標準的python包將數據加載進numpy數組。之后你能夠轉換這些…

19歲白帽子通過bug懸賞賺到一百萬美元--轉

出處&#xff1a;https://news.cnblogs.com/n/620858/ 19 歲的 Santiago Lopez 通過 bug 懸賞平臺 HackerOne 報告漏洞&#xff0c;成為第一位通過 bug 懸賞賺到一百萬美元的白帽子黑客。他的白帽子生涯始于 2015 年&#xff0c;至今共報告了超過 1600 個安全漏洞。他在 16 歲時…

代碼分層的設計

分層思想&#xff0c;是應用系統最常見的一種架構模式&#xff0c;我們會將系統橫向切割&#xff0c;根據業務職責劃分。MVC 三層架構就是非常典型架構模式&#xff0c;劃分的目的是規劃軟件系統的邏輯結構便于開發維護。MVC&#xff1a;英文即 Model-View-Controller&#xff…

【24小時內第四更】為什么我們要堅持寫博客?

前言 從2018年7月份&#xff0c;我開始了寫作博客之路。開始之前&#xff0c;我打算分享下之前的經歷。去年初公司來了個架構師&#xff0c;內部分享過docker原理&#xff0c;TDD單元測試驅動&#xff0c;并發并行異步編程等內容&#xff0c;讓我著實驚呆了&#xff0c;因為確實…

sqoop快速入門

轉自http://www.aboutyun.com/thread-22549-1-1.html 轉載于:https://www.cnblogs.com/drjava/p/10473297.html

ListableBeanFactory接口

ListableBeanFactory獲取bean時,Spring 鼓勵使用這個接口定義的api. 還有個Beanfactory方便使用.其他的4個接口都是不鼓勵使用的. 提供容器中bean迭代的功能,不再需要一個個bean地查找.比如可以一次獲取全部的bean(太暴力了),根據類型獲取bean.在看SpringMVC時,掃描包路徑下的…

HDU 4035 Maze

Maze http://acm.hdu.edu.cn/showproblem.php?pid4035 分析&#xff1a; 在樹上走來走去&#xff0c;然后在一個點可以k的概率回到1&#xff0c;可以e的概率走出去&#xff0c;可以1-k-e的概率走到其他的位置&#xff08;分為父節點和子節點討論&#xff09;。 轉移方程就是&a…

面向對象之三大特性:繼承,封裝,多態

python面向對象的三大特性&#xff1a;繼承&#xff0c;封裝&#xff0c;多態。 1. 封裝: 把很多數據封裝到?個對象中. 把固定功能的代碼封裝到?個代碼塊, 函數, 對象, 打包成模塊. 這都屬于封裝的思想. 具體的情況具體分析. 比如. 你寫了?個很?B的函數. 那這個也可以被稱為…

configurablebeanfactory

ConfigurableBeanFactory定義BeanFactory的配置.ConfigurableBeanFactory中定義了太多太多的api,比如類加載器,類型轉化,屬性編輯器,BeanPostProcessor,作用域,bean定義,處理bean依賴關系,合并其他ConfigurableBeanFactory,bean如何銷毀. ConfigurableBeanFactory同時繼承了Hi…

Xlua文件在熱更新中調用方法

Xlua文件在熱更新中調用方法 public class news : MonoBehaviour { LuaEnv luaEnv;//定義Lua初始變量 void Awake() { luaEnv new LuaEnv();//new開辟空間 luaEnv.AddLoader(myload);//調用方法地址、返回字節 luaEnv.DoString("requirefish");//更新文件 } void O…