spring集成RabbitMQ配置文件詳解(生產者和消費者)

1,首先引入配置文件org.springframework.amqp,如下:

<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>1.7.1.RELEASE</version></dependency>

2,準備工作:安裝好rabbitmq,并在項目中增加配置文件?? rabbit.properties 內容如下:

rmq.ip=192.188.113.114   
rmq.port=5672
rmq.producer.num=20
rmq.manager.user=admin
rmq.manager.password=admin

3,rabbitmq屬性介紹:

概念解釋:

Brocker:消息隊列服務器實體。Exchange:消息交換機,指定消息按什么規則,路由到哪個隊列。Queue:消息隊列,每個消息都會被投入到一個或者多個隊列里。Binding:綁定,它的作用是把exchange和queue按照路由規則binding起來。Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。Virtual Host: 虛擬主機,一個broker里可以開設多個vhost,用作不用用戶的權限分離。每個virtual host本質上都是一個RabbitMQ Server(但是一個server中可以有多個virtual host),擁有它自己若干的個Exchange、Queue和bings rule等等。其實這是一個虛擬概念,類似于權限控制組。Virtual Host是權限控制的最小粒度。Producer:消息生產者,就是投遞消息的程序。Consumer:消息消費者,就是接受消息的程序。Connection: 就是一個TCP的連接。Producer和Consumer都是通過TCP連接到RabbitMQ Server的。接下來的實踐案例中我們就可以看到,producer和consumer與exchange的通信的前提是先建立TCP連接。僅僅創建了TCP連接,producer和consumer與exchange還是不能通信的。我們還需要為每一個Connection創建Channel。Channel: 它是建立在上述TCP連接之上的虛擬連接。數據傳輸都是在Channel中進行的。AMQP協議規定只有通過Channel才能執行AMQP的命令。一個Connection可以包含多個Channel。有人要問了,為什么要使用Channel呢,直接用TCP連接不就好了么?對于一個消息服務器來說,它的任務是處理海量的消息,當有很多線程需要從RabbitMQ中消費消息或者生產消息,那么必須建立很多個connection,也就是許多個TCP連接。然而對于操作系統而言,建立和關閉TCP連接是非常昂貴的開銷,而且TCP的連接數也有限制,頻繁的建立關閉TCP連接對于系統的性能有很大的影響,如果遇到高峰,性能瓶頸也隨之顯現。RabbitMQ采用類似NIO的做法,選擇TCP連接服用,不僅可以減少性能開銷,同時也便于管理。在TCP連接中建立Channel是沒有上述代價的,可以復用TCP連接。對于Producer或者Consumer來說,可以并發的使用多個Channel進行Publish或者Receive。有實驗表明,在Channel中,1秒可以Publish10K的數據包。對于普通的Consumer或者Producer來說,這已經足夠了。除非有非常大的流量時,一個connection可能會產生性能瓶頸,此時就需要開辟多個connection。

消息隊列的使用過程大概如下:
消息接收

客戶端連接到消息隊列服務器,打開一個channel。
客戶端聲明一個exchange,并設置相關屬性。
客戶端聲明一個queue,并設置相關屬性。
客戶端使用routing key,在exchange和queue之間建立好綁定關系。

消息發布

客戶端投遞消息到exchange。
exchange接收到消息后,就根據消息的key和已經設置的binding,進行消息路由,將消息投遞到一個或多個隊列里。

AMQP 里主要要說兩個組件:

Exchange 和 Queue
綠色的X就是Exchange ,紅色的是Queue,這兩者都在Server端,又稱作Broker
這部分是RabbitMQ實現的,而藍色的則是客戶端,通常有Producer和Consumer兩種類型。

?

4,配置spring-rabbitmq.xml,內容如下:

<!-- 公共部分 -->
<!-- 創建連接類 連接安裝好的 rabbitmq -->
<bean id="connectionFactory"  class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"><constructor-arg value="localhost" />    <!-- username,訪問RabbitMQ服務器的賬戶,默認是guest --><property name="username" value="${rmq.manager.user}" /><!-- username,訪問RabbitMQ服務器的密碼,默認是guest -->   <property name="password" value="${rmq.manager.password}" /><!-- host,RabbitMQ服務器地址,默認值"localhost" -->   <property name="host" value="${rmq.ip}" />   <!-- port,RabbitMQ服務端口,默認值為5672 --><property name="port" value="${rmq.port}" /><!-- channel-cache-size,channel的緩存數量,默認值為25 --><property name="channel-cache-size" value="50" /><!-- cache-mode,緩存連接模式,默認值為CHANNEL(單個connection連接,連接之后關閉,自動銷毀) --><property name="cache-mode" value="CHANNEL" />   
</bean>
<!--或者這樣配置,connection-factory元素實際就是注冊一個org.springframework.amqp.rabbit.connection.CachingConnectionFactory實例
<rabbit:connection-factory id="connectionFactory" host="${rmq.ip}" port="${rmq.port}"
username="${rmq.manager.user}" password="${rmq.manager.password}" />-->
<rabbit:admin connection-factory="connectionFactory"/><!--定義消息隊列,durable:是否持久化,如果想在RabbitMQ退出或崩潰的時候,不會失去所有的queue和消息,需要同時標志隊列(queue)和交換機(exchange)是持久化的,即rabbit:queue標簽和rabbit:direct-exchange中的durable=true,而消息(message)默認是持久化的可以看類org.springframework.amqp.core.MessageProperties中的屬性public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;exclusive: 僅創建者可以使用的私有隊列,斷開后自動刪除;auto_delete: 當所有消費客戶端連接斷開后,是否自動刪除隊列 -->
<rabbit:queue name="spittle.alert.queue.1" id="queue_1" durable="true" auto-delete="false" exclusive="false" />
<rabbit:queue name="spittle.alert.queue.2" id="queue_2" durable="true" auto-delete="false" exclusive="false" />
<rabbit:queue name="spittle.alert.queue.3" id="queue_3" durable="true" auto-delete="false" exclusive="false" /><!--綁定隊列,rabbitmq的exchangeType常用的三種模式:direct,fanout,topic三種,我們用direct模式,即rabbit:direct-exchange標簽,Direct交換器很簡單,如果是Direct類型,就會將消息中的RoutingKey與該Exchange關聯的所有Binding中的BindingKey進行比較,如果相等,則發送到該Binding對應的Queue中。有一個需要注意的地方:如果找不到指定的exchange,就會報錯。但routing key找不到的話,不會報錯,這條消息會直接丟失,所以此處要小心,auto-delete:自動刪除,如果為Yes,則該交換機所有隊列queue刪除后,自動刪除交換機,默認為false -->
<rabbit:direct-exchange id="spittle.fanout" name="spittle.fanout" durable="true" auto-delete="false"><rabbit:bindings><rabbit:binding queue="spittle.alert.queue.1" key="{alert.queue.1}"></rabbit:binding><rabbit:binding queue="spittle.alert.queue.2" key="{alert.queue.2}"></rabbit:binding><rabbit:binding queue="spittle.alert.queue.3" key="{alert.queue.3}"></rabbit:binding></rabbit:bindings>
</rabbit:fanout-exchange><!-- 生產者部分 -->
<!-- 發送消息的producer類,也就是生產者 -->
<bean id="msgProducer" class="com.asdf.sdf.ClassA"><!-- value中的值就是producer中的的routingKey,也就是隊列名稱,它與上面的rabbit:bindings標簽中的key必須相同 --><property name="queueName" value="{alert.queue.1}"/>
</bean><!-- spring amqp默認的是jackson 的一個插件,目的將生產者生產的數據轉換為json存入消息隊列,由于fastjson的速度快于jackson,這里替換為fastjson的一個實現 -->
<bean id="jsonMessageConverter" class="com.jy.utils.FastJsonMessageConverter"></bean>
<!-- 或者配置jackson -->
<!--
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
--><rabbit:template exchange="test-exchange" id="rabbitTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" /><!-- 消費者部分 -->
<!-- 自定義接口類 -->
<bean id="testHandler" class="com.rabbit.TestHandler"></bean><!-- 用于消息的監聽的代理類MessageListenerAdapter -->
<bean id="testQueueListenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter" ><!-- 類名 --><constructor-arg ref="testHandler" /><!-- 方法名 --><property name="defaultListenerMethod" value="handlerTest"></property><property name="messageConverter" ref="jsonMessageConverter"></property>
</bean><!-- 配置監聽acknowledeg="manual"設置手動應答,它能夠保證即使在一個worker處理消息的時候用CTRL+C來殺掉這個worker,或者一個consumer掛了(channel關閉了、connection關閉了或者TCP連接斷了),也不會丟失消息。因為RabbitMQ知道沒發送ack確認消息導致這個消息沒有被完全處理,將會對這條消息做re-queue處理。如果此時有另一個consumer連接,消息會被重新發送至另一個consumer會一直重發,直到消息處理成功,監聽容器acknowledge="auto" concurrency="30"設置發送次數,最多發送30次 -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" concurrency="20"><rabbit:listener queues="spittle.alert.queue.1" ref="testQueueListenerAdapter" /><rabbit:listener queues="spittle.alert.queue.2" ref="testQueueListenerAdapter" /><rabbit:listener queues="spittle.alert.queue.2" ref="testQueueListenerAdapter" />
</rabbit:listener-container>

?

5,生產者(發送端)代碼:

@Resource  
private RabbitTemplate rabbitTemplate;
private String queueName;  
public void sendMessage(CommonMessage msg){try {  logger.error("發送信息開始");System.out.println(rabbitTemplate.getConnectionFactory().getHost());  //發送信息  queueName交換機,就是上面的routingKey msg.getSource() 為 test_key 
             rabbitTemplate.convertAndSend(queueName,msg.getSource(), msg);//如果是普通字符串消息需要先序列化,再發送消息//rabbitTemplate.convertAndSend(queueName,msg.getSource(), SerializationUtils.serialize(msg));logger.error("發送信息結束");} catch (Exception e) {  e.printStackTrace();}}public void setQueueName(String queueName) {this.queueName = queueName;
}

?

6,消費端代碼:TestHandler 類

public class TestHandler  {@Overridepublic void handlerTest(CommonMessage commonMessage) {System.out.println("DetailQueueConsumer: " + new String(message.getBody()));}
}

其他exchangeType介紹:

fanOut:

<!-- Fanout 扇出,顧名思義,就是像風扇吹面粉一樣,吹得到處都是。如果使用fanout類型的exchange,那么routing key就不重要了。因為凡是綁定到這個exchange的queue,都會受到消息。 -->
<rabbit:fanout-exchange name="delayed_message_exchange" durable="true" auto-delete="false" id="delayed_message_exchange">  <rabbit:bindings>  <rabbit:binding queue="test_delay_queue"/>  </rabbit:bindings>  
</rabbit:fanout-exchange>

topic:如果說direct是將消息放到exchange綁定的一個queue里(一對一);fanout是將消息放到exchange綁定的所有queue里(一對所有);那么topic類型的exchange就可以實現(一對部分),應用場景就是打印不同級別的錯誤日志,我們的系統出錯后會根據不同的錯誤級別生成error_levelX.log日志,我們在后臺首先要把所有的error保存在一個總的queue(綁定了一個*.error的路由鍵)里,然后再按level分別存放在不同的queue。

<!-- 發送端不是按固定的routing key發送消息,而是按字符串“匹配”發送,接收端同樣如此 -->
<rabbit:topic-exchange name="message-exchange" durable="true" auto-delete="false" id="message-exchange"><rabbit:bindings><rabbit:binding queue="Q1" pattern="error.*.log" /><rabbit:binding queue="Q2" pattern="error.level1.log" /><rabbit:binding queue="Q3" pattern="error.level2.log" /></rabbit:bindings>
</rabbit:topic-exchange>

routing key綁定如下圖:

?

本文轉自:

https://blog.csdn.net/nandao158/article/details/81065892

https://www.cnblogs.com/LipeiNet/p/6079427.html

https://www.toutiao.com/a6598154241037042189/?tt_from=mobile_qq&utm_campaign=client_share&timestamp=1536277584&app=news_article&utm_source=mobile_qq&iid=43157585039&utm_medium=toutiao_android&group_id=6598154241037042189

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/277331.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/277331.shtml
英文地址,請注明出處:http://en.pswp.cn/news/277331.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

一天的學習成果:hash輸出,dcache工作原理,include的home directory,fist optype的含義...

最先獲得突破的是解決了下午的崩潰問題。其實原因很簡單&#xff0c;我聲明了一個unsigned int型指針&#xff0c;但是沒有給它分配空間…… 解決了這個問題之后就很簡單了&#xff0c;調用定義在linux/dcache.c文件中的full_name_hash函數對文件名進行hash計算。這里發現了一個…

linux顯示fio為非法指令,FORTRAN運行錯誤消息列表中英對照.doc

FORTRAN運行錯誤消息列表中英對照Fortran的運行時錯誤消息列表本節列出了英特爾Fortran運行時庫(RTL)處理的錯誤。對于每一個錯誤&#xff0c;該表提供了錯誤號&#xff0c;嚴重性代碼&#xff0c;錯誤信息文本&#xff0c;條件符號名稱&#xff0c;而錯誤的詳細說明。在程序中…

各種證書

軟考高級信息系統項目管理師https://www.zhihu.com/question/29904891 轉載于:https://www.cnblogs.com/trumbull/p/11154514.html

linux面試題中的簡答題,[計算機]linux面試題簡答題部分.doc

[計算機]linux面試題簡答題部分linux面試題(簡答題部分)2 簡述進程的啟動、終止的方式以及如何查看進程&#xff1f;答&#xff1a;啟動進程的方式分為手動啟動和自動啟動兩種方式,其中手動啟動的方法用services 服務名 start;或者是./腳本名稱,自動啟動進程的方法有將進程服務…

const用法

const的用法很讓人葷菜&#xff0c;現在總結以下&#xff1a;1&#xff0c;必須初始化2&#xff0c;作為函數的參數是個好習慣&#xff0c;const在*號左邊所指常量值&#xff0c;在右邊所指的是常量指針3&#xff0c;const成員函數的目的是指明該函數可以在const對象上調用,也就…

Multiverse: Revolutionary Backend for Alembic // Multiverse: 下一代Alembic后端

J CUBE&#xff0c;日本最大的動畫公司Polygon Picture&#xff08;以下簡稱PPI&#xff09;公司成立的專職R&D公司隆重推出Multiverse&#xff0c;下一代Alembic存儲后端。 我們還開發了針對Autodesk Maya的工具&#xff0c;運用Multiverse在流程中。 "multiverse&qu…

c語言 程序延時 校準,c語言實現系統時間校正工具代碼分享

//*******************************************************************//Time Protocol是一種非常簡單的應用層協議。它返回一個未格式化的32位二進制數字,//這個數字描述了從1900年1月1日午夜到現在的秒數。服務器在端口37監聽協議請求&#xff0c;以//TCP/IP或者UDP/IP格式…

近半年能力沒進步原因分析與求助

2019獨角獸企業重金招聘Python工程師標準>>> 20180907 思維方式有缺陷&#xff0c;想到的解決方法經常不是最有效率的。導致工作時間內基本沒自由學習的時間。 業余時間不夠專注&#xff0c;學習方向經常變&#xff0c;沒能堅持搞透一個點就換書看&#xff0c;沒有總…

疑問:關于Microsoft Office InfoPath 2003 Toolkit for Visual Studio 2005 Beta 2

因開發急須這個東西&#xff0c;但我不是msdn的subscriber用戶不能單獨下載&#xff0c;但微軟這樣提示http://blogs.msdn.com/vsto2/archive/2005/05/05/415003.aspxIf you need the Toolkit, but you are not an MSDN Universal subscriber, if you go to http://msdn.micros…

windows下安裝Redis并部署成服務

文章來源&#xff1a;https://www.cnblogs.com/weiqinl/p/6490372.html windows下安裝Redis并部署成服務 Redis 是一個開源&#xff08;BSD許可&#xff09;的&#xff0c;內存中的數據結構存儲系統&#xff0c;它可以用作數據庫、緩存和消息中間件。 一&#xff1a;下載 下載地…

c語言編寫程序計算行列式值,新手作品:行列式計算C語言版

該樓層疑似違規已被系統折疊 隱藏此樓查看此樓對話 ControlHeightDecrease ShiftUp Arrow 向上調整選定的控件或對話一個對話單位對話 ControlHeightIncrease ShiftDown Arrow 向下調整選定的控件或對話一個對話單位對話 ControlMoveDown Dow…

.net core高性能通訊開源組件BeetleX

BeetleX beetleX是基于dotnet core實現的輕量級高性能的TCP通訊組件&#xff0c;使用方便、性能高效和安全可靠是組件設計的出發點&#xff01;開發人員可以在Beetlx組件的支持下快帶地構建高性能的TCP通訊服務程序&#xff0c;在安全通訊方面只需要簡單地設置一下SSL信息即可實…

按組排名

rank() over,dense_rank() over,row_number() over的區別 1.rank() over&#xff1a;查出指定條件后的進行排名。特點是&#xff0c;加入是對學生排名&#xff0c;使用這個函數&#xff0c;成績相同的兩名是并列&#xff0c;下一位同學空出所占的名次。 select name,subject,sc…

《Excel與VBA程序設計》第一章

點擊下載&#xff1a;http://files.cnblogs.com/maweifeng/Excel_VBA_001.rar轉載于:https://www.cnblogs.com/maweifeng/archive/2005/06/23/179729.html

linux java環境變量設置

JAVA環境變量設置&#xff1a; #vi /etc/profile#在文件最后添加以下內容&#xff1a; export JAVA_HOME/usr/java/jdk1.8.0_91 export PATH$JAVA_HOME/bin:$PATH export CLASSPATH.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar 執行如下命令使環境變量生效&#xff1a; s…

win7 磁盤分區

右鍵點擊“我的電腦”&#xff0c;選擇“管理”命令。在打開的“計算機管理”窗口中&#xff0c;依次展開“計算機管理”→“存儲”→“磁盤管理”項。之后&#xff0c;在右側窗格中即可看到當前硬盤的分區情況。 在“未指派”的磁盤空間上點擊右鍵&#xff0c;選擇“新建磁盤…

[FxCop.設計規則]13. 定義自定義屬性參數的訪問屬性

13. 定義自定義屬性參數的訪問屬性 翻譯概述&#xff1a; 一個比較無聊的規則&#xff0c;實在看不出在什么情況下&#xff0c;一個開發者會做出違反這條規則的設計。沒有別的內容&#xff0c;只是說應該為自定義特性的構造函數中的參數提供一個相關的屬性去讀取它們的值。…

C語言程序設計實驗最短路徑,7最短路徑C語言程序設計.pdf

最短路徑旅行家的困擾第4章 圖結構 解放軍理工大學旅行家的困擾新疆特克斯縣“八卦城”第4章 圖結構 解放軍理工大學旅行家的困擾特克斯縣怎么樣幫助困擾的旅行家找到去各個地點的最短路線呢&#xff1f;旅行家居住的旅館旅行家想去的地點第4章 圖結構 解放軍理工大學問題建模使…

centos7安裝Cloudera Manager

第一部分&#xff1a;準備工作一&#xff0c;修改hostname $vim /etc/sysconfig/network $source /etc/sysconfig/network例如&#xff1a; NETWORKINGyes HOSTNAMEspark01reboot重啟服務器 二&#xff0c;關閉selinux查看SELinux狀態1&#xff0c;/usr/sbin/sestatus -v #如果…

He Fei ,First ,Good Luck

Tonight, I will go to HeFei.something as follows:1) speciality 2) sincerely3) valueHope i can bring customer some ideas .But i will throw my 100% energy to face it.Good Luck.First HeFei轉載于:https://www.cnblogs.com/boriscao/archive/2005/08/31/227199.html…