一、架構和技術介紹
1、簡介
ActiveMQ?是Apache出品,最流行的,能力強勁的開源消息總線。完全支持JMS1.1和J2EE 1.4規范的?JMS Provider實現
2、activemq的特性
1.?多種語言和協議編寫客戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
2.?完全支持JMS1.1和J2EE 1.4規范?(持久化,XA消息,事務)
3.?對Spring的支持,ActiveMQ可以很容易內嵌到使用Spring的系統里面去,而且也支持Spring2.0的特性
4.?通過了常見J2EE服務器(如?Geronimo,JBoss 4, GlassFish,WebLogic)的測試,其中通過JCA 1.5 resourceadaptors的配置,可以讓ActiveMQ可以自動的部署到任何兼容J2EE1.4商業服務器上
5.?支持多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
6.?支持通過JDBC和journal提供高速的消息持久化
7.?從設計上保證了高性能的集群,客戶端-服務器,點對點
8.?支持Ajax
9.?支持與Axis的整合
10.?可以很容易得調用內嵌JMS provider,進行測試
3、下載和安裝ActiveMQ
1、下載
ActiveMQ的最新版本是5.10.0,但由于我們內網下載存在問題,所以目前通過內網只能下載到5.9.0,下載地址:http://activemq.apache.org/activemq-590-release.html。
2、安裝
?????????如果是在windows系統中運行,可以直接解壓apache-activemq-5.9.0-bin.zip,并運行bin目錄下的activemq.bat文件,此時使用的是默認的服務端口:61616和默認的console端口:8161。
?????????如果是在linux或unix下運行,在bin目錄下執行命令:./activemq setup
3、修改ActiveMQ的服務端口和console端口
???????? A、修改服務端口:打開conf/activemq.xml文件,修改以下紅色字體部分
????????<transportConnectors>
???????????<transportConnector name="openwire" uri="tcp://10.42.220.72:61618"discoveryUri="multicast://default"/>
???????</transportConnectors>
?
B、修改console的地址和端口:打開conf/jetty.xml文件,修改以下紅色字體部分
????<bean id="jettyPort"class="org.apache.activemq.web.WebConsolePort"init-method="start">
???????<property name="port" value="8162"/>
?</bean>
4、通過客戶端代碼試用ActiveMQ
??????? 需要提前將activemq解壓包中的lib目錄下的相關包引入到工程中,再進行如下編碼:
1、發送端的代碼:
importjavax.jms.Connection;
importjavax.jms.ConnectionFactory;
importjavax.jms.DeliveryMode;
importjavax.jms.Destination;
importjavax.jms.MessageProducer;
importjavax.jms.Session;
importjavax.jms.TextMessage;
importorg.apache.activemq.ActiveMQConnection;
importorg.apache.activemq.ActiveMQConnectionFactory;
publicclass?Sender {
????privatestaticfinalintSEND_NUMBER?= 5;
?
????publicstaticvoid?main(String[] args) {
????????// ConnectionFactory:連接工廠,JMS用它創建連接
????????ConnectionFactory connectionFactory;
????????// Connection:JMS客戶端到JMS Provider的連接
????????Connection connection =?null;
????????// Session:一個發送或接收消息的線程
????????Session session;
????????// Destination:消息的目的地;消息發送給誰.
????????Destination destination;
????????// MessageProducer:消息發送者
????????MessageProducer producer;
????????// TextMessage message;
????????//構造ConnectionFactory實例對象,此處采用ActiveMq的實現jar
????????connectionFactory =?new?ActiveMQConnectionFactory(
????????????????ActiveMQConnection.DEFAULT_USER,
????????????????ActiveMQConnection.DEFAULT_PASSWORD,
????????????????"failover:(tcp://10.42.220.72:61617,tcp://10.42.220.72:61618)");
????????try?{
????????????//構造從工廠得到連接對象
????????????connection =connectionFactory.createConnection();
????????????//啟動
????????????connection.start();
????????????//獲取操作連接
????????????session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
????????????//獲取session
????????????destination = session.createQueue("FirstQueue");
????????????//得到消息生成者【發送者】
????????????producer =session.createProducer(destination);
????????????//設置不持久化,此處學習,實際根據項目決定
???????????producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
????????????//構造消息,此處寫死,項目就是參數,或者方法獲取
????????????sendMessage(session, producer);
????????????session.commit();
????????}?catch?(Exception e) {
????????????e.printStackTrace();
????????}?finally?{
????????????try?{
????????????????if?(null?!= connection)
????????????????????connection.close();
????????????}?catch?(Throwable ignore) {
????????????}
????????}
????????}
???
????publicstaticvoid?sendMessage(Session session,MessageProducer producer)
????????????throws?Exception {
????????for?(int?i = 1; i <=SEND_NUMBER; i++) {
????????????TextMessage message = session
????????????????????.createTextMessage("ActiveMq發送的消息"?+ i);
????????????//發送消息到目的地方
????????????System.out.println("發送消息:"?+?"ActiveMq?發送的消息"?+ i);
????????????producer.send(message);
????????}
????}
}
?
2、接收端代碼:
importjavax.jms.Connection;
importjavax.jms.ConnectionFactory;
importjavax.jms.Destination;
importjavax.jms.MessageConsumer;
importjavax.jms.Session;
importjavax.jms.TextMessage;
importorg.apache.activemq.ActiveMQConnection;
importorg.apache.activemq.ActiveMQConnectionFactory;
?
publicclass?Receive {
????publicstaticvoid?main(String[] args) {
????????// ConnectionFactory:連接工廠,JMS用它創建連接
????????ConnectionFactory connectionFactory;
????????// Connection:JMS客戶端到JMS Provider的連接
????????Connection connection =?null;
????????// Session:一個發送或接收消息的線程
????????Session session;
????????// Destination:消息的目的地;消息發送給誰.
????????Destination destination;
????????//消費者,消息接收者
????????MessageConsumer consumer;
????????connectionFactory =?new?ActiveMQConnectionFactory(
????????????????ActiveMQConnection.DEFAULT_USER,
????????????????ActiveMQConnection.DEFAULT_PASSWORD,
????????????????"failover:(tcp://10.42.220.72:61617,tcp://10.42.220.72:61618)");
????????try?{
????????????//構造從工廠得到連接對象
????????????connection =connectionFactory.createConnection();
????????????//啟動
????????????connection.start();
????????????//獲取操作連接
????????????session = connection.createSession(false,
????????????????????Session.AUTO_ACKNOWLEDGE);
????????????//獲取session
????????????destination = session.createQueue("FirstQueue");
????????????consumer =session.createConsumer(destination);
????????????while?(true) {
????????????????//設置接收者接收消息的時間,為了便于測試,這里誰定為100s
????????????????TextMessage message =(TextMessage) consumer.receive(100000);
????????????????if?(null?!= message) {
????????????????????System.out.println("收到消息"?+ message.getText());
????????????????}?else?{
????????????????????break;
????????????????}
????????????}
????????}?catch?(Exception e) {
????????????e.printStackTrace();
????????}?finally?{
????????????try?{
????????????????if?(null?!= connection)
????????????????????connection.close();
????????????}?catch?(Throwable ignore) {
????????????}
????????}
????}
}
3、通過監控查看消息堆棧的記錄:
??????登陸http://localhost:8162/admin/queues.jsp,默認的用戶名和密碼:admin/admin
二、ActiveMQ的多種部署方式
?????????單點的ActiveMQ作為企業應用無法滿足高可用和集群的需求,所以ActiveMQ提供了master-slave、broker cluster等多種部署方式,但通過分析多種部署方式之后我認為需要將兩種部署方式相結合才能滿足我們公司分布式和高可用的需求,所以后面就重點將解如何將兩種部署方式相結合。
1、Master-Slave部署方式
1)shared filesystem Master-Slave部署方式
?????????主要是通過共享存儲目錄來實現master和slave的熱備,所有的ActiveMQ應用都在不斷地獲取共享目錄的控制權,哪個應用搶到了控制權,它就成為master。
?????????多個共享存儲目錄的應用,誰先啟動,誰就可以最早取得共享目錄的控制權成為master,其他的應用就只能作為slave。
2)shared database Master-Slave方式
?????????與shared filesystem方式類似,只是共享的存儲介質由文件系統改成了數據庫而已。
3)Replicated LevelDB Store方式
?????????這種主備方式是ActiveMQ5.9以后才新增的特性,使用ZooKeeper協調選擇一個node作為master。被選擇的master broker node開啟并接受客戶端連接。
其他node轉入slave模式,連接master并同步他們的存儲狀態。slave不接受客戶端連接。所有的存儲操作都將被復制到連接至Master的slaves。
如果master死了,得到了最新更新的slave被允許成為master。fialed node能夠重新加入到網絡中并連接master進入slave mode。所有需要同步的disk的消息操作都將等待存儲狀態被復制到其他法定節點的操作完成才能完成。所以,如果你配置了replicas=3,那么法定大小是(3/2)+1=2. Master將會存儲并更新然后等待?(2-1)=1個slave存儲和更新完成,才匯報success。至于為什么是2-1,熟悉Zookeeper的應該知道,有一個node要作為觀擦者存在。
單一個新的master被選中,你需要至少保障一個法定node在線以能夠找到擁有最新狀態的node。這個node將會成為新的master。因此,推薦運行至少3個replica nodes,以防止一個node失敗了,服務中斷。
2、Broker-Cluster部署方式
?????????前面的Master-Slave的方式雖然能解決多服務熱備的高可用問題,但無法解決負載均衡和分布式的問題。Broker-Cluster的部署方式就可以解決負載均衡的問題。
???????? Broker-Cluster部署方式中,各個broker通過網絡互相連接,并共享queue。當broker-A上面指定的queue-A中接收到一個message處于pending狀態,而此時沒有consumer連接broker-A時。如果cluster中的broker-B上面由一個consumer在消費queue-A的消息,那么broker-B會先通過內部網絡獲取到broker-A上面的message,并通知自己的consumer來消費。
1)static Broker-Cluster部署
?????????在activemq.xml文件中靜態指定Broker需要建立橋連接的其他Broker:
1、??首先在Broker-A節點中添加networkConnector節點:
<networkConnectors>?
??????????????? <networkConnector?? uri="static:(tcp:// 0.0.0.0:61617)"duplex="false"/>
</networkConnectors>
2、??修改Broker-A節點中的服務提供端口為61616:
<transportConnectors>
?? ??????<transportConnectorname="openwire"uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
3、??在Broker-B節點中添加networkConnector節點:
<networkConnectors>?
? ????????????? <networkConnector?? uri="static:(tcp:// 0.0.0.0:61616)"duplex="false"/>
</networkConnectors>
4、??修改Broker-A節點中的服務提供端口為61617:
<transportConnectors>
?? ??????<transportConnectorname="openwire"uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
5、分別啟動Broker-A和Broker-B。
2)Dynamic Broker-Cluster部署
?????????在activemq.xml文件中不直接指定Broker需要建立橋連接的其他Broker,由activemq在啟動后動態查找:
1、??首先在Broker-A節點中添加networkConnector節點:
<networkConnectors>?
??????????????? <networkConnectoruri="multicast://default"
???????????dynamicOnly="true"
???????????networkTTL="3"
???????????prefetchSize="1"
???????????decreaseNetworkConsumerPriority="true" />
</networkConnectors>
2、修改Broker-A節點中的服務提供端口為61616:
<transportConnectors>
?? ??????<transportConnectorname="openwire"uri="tcp://0.0.0.0:61616? " discoveryUri="multicast://default"/>
</transportConnectors>
3、在Broker-B節點中添加networkConnector節點:
<networkConnectors>?
??????????????? <networkConnectoruri="multicast://default"
???????????dynamicOnly="true"
???????????networkTTL="3"
???????????prefetchSize="1"
???????????decreaseNetworkConsumerPriority="true" />
</networkConnectors>
4、修改Broker-B節點中的服務提供端口為61617:
<transportConnectors>
?? ??????<transportConnectorname="openwire"uri="tcp://0.0.0.0:61617" discoveryUri="multicast://default"/>
</transportConnectors>
5、啟動Broker-A和Broker-B
2、Master-Slave與Broker-Cluster相結合的部署方式
?????????可以看到Master-Slave的部署方式雖然解決了高可用的問題,但不支持負載均衡,Broker-Cluster解決了負載均衡,但當其中一個Broker突然宕掉的話,那么存在于該Broker上處于Pending狀態的message將會丟失,無法達到高可用的目的。
?????????由于目前ActiveMQ官網上并沒有一個明確的將兩種部署方式相結合的部署方案,所以我嘗試者把兩者結合起來部署:
?????????
1、部署的配置修改
?????????這里以Broker-A + Broker-B建立cluster,Broker-C作為Broker-B的slave為例:
1)首先在Broker-A節點中添加networkConnector節點:
<networkConnectors>?
??????????????? <networkConnector?? uri="masterslave:(tcp://0.0.0.0:61617,tcp:// 0.0.0.0:61618)" duplex="false"/>
</networkConnectors>
2)修改Broker-A節點中的服務提供端口為61616:
<transportConnectors>
?? ??????<transportConnectorname="openwire"uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
3)在Broker-B節點中添加networkConnector節點:
<networkConnectors>?
??????????????? <networkConnector?? uri="static:(tcp:// 0.0.0.0:61616)"duplex="false"/>
</networkConnectors>
4)修改Broker-B節點中的服務提供端口為61617:
<transportConnectors>
?? ??????<transportConnectorname="openwire"uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
5)修改Broker-B節點中的持久化方式:
??????<persistenceAdapter>
???????????<kahaDB directory="/localhost/kahadb"/>
????? ??</persistenceAdapter>
6)在Broker-C節點中添加networkConnector節點:
<networkConnectors>?
??????????????? <networkConnector?? uri="static:(tcp:// 0.0.0.0:61616)"duplex="false"/>
</networkConnectors>
7)修改Broker-C節點中的服務提供端口為61618:
<transportConnectors>
?? ??????<transportConnectorname="openwire"uri="tcp://0.0.0.0:61618?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
8)修改Broker-B節點中的持久化方式:
??????<persistenceAdapter>
???????????<kahaDB directory="/localhost/kahadb"/>
???????</persistenceAdapter>
9)分別啟動broker-A、broker-B、broker-C,因為是broker-B先啟動,所以“/localhost/kahadb”目錄被lock住,broker-C將一直處于掛起狀態,當人為停掉broker-B之后,broker-C將獲取目錄“/localhost/kahadb”的控制權,重新與broker-A組成cluster提供服務。