文章目錄
- 一.RabbitMQ簡介
- 二.相關知識
- 1.AMQP
- 2.JMS是什么 ?
- 三.RabbitMQ的工作原理
- 四.Hello World
- 1.創建Maven工程
- 2.生產者
- 3.消費者
- 五.總結
一.RabbitMQ簡介
MQ全稱為Message Queue,即消息隊列, RabbitMQ是由erlang語言開發,基于AMQP(Advanced Message Queue 高級消息隊列協議)協議實現的消息隊列,它是一種應用程序之間的通信方法,消息隊列在分布式系統開發中應用非常廣泛。RabbitMQ官方地址:http://www.rabbitmq.com/
開發中消息隊列通常有如下應用場景:
1、任務異步處理。
將不需要同步處理的并且耗時長的操作由消息隊列通知消息接收方進行異步處理。提高了應用程序的響應時間。
2、應用程序解耦合
MQ相當于一個中介,生產方通過MQ與消費方交互,它將應用程序進行解耦合。
市場上還有哪些消息隊列?
ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ、Redis。
為什么使用RabbitMQ呢?
1、簡單,功能強大。
2、基于AMQP協議。
3、社區活躍,文檔完善。
4、高并發性能好,這主要得益于Erlang語言。
5、Spring Boot默認已集成RabbitMQ
二.相關知識
1.AMQP
總結:AMQP是一套公開的消息隊列協議,最早在2003年被提出,它旨在從協議層定義消息通信數據的標準格式,為的就是解決MQ市場上協議不統一的問題。RabbitMQ就是遵循AMQP標準協議開發的MQ服務。
百度鏈接:AMQP
官方:官網
2.JMS是什么 ?
總結:JMS是java提供的一套消息服務API標準,其目的是為所有的java應用程序提供統一的消息通信的標準,類似java的jdbc,只要遵循jms標準的應用程序之間都可以進行消息通信。
JMS和AMQP有什么 不同?
jms是java語言專屬的消息服務標準,它是在api層定義標準,并且只能用于java應用;而AMQP是在協議層定義的標準,是跨語言的。
三.RabbitMQ的工作原理
下圖是RabbitMQ的基本結構:
組成部分說明如下:
Producer:消息生產者,即生產方客戶端,生產方客戶端將消息發送到MQ。
Broker:消息隊列服務進程,此進程包括兩個部分:Exchange和Queue。
Exchange:消息隊列交換機,按一定的規則將消息路由轉發到某個隊列,對消息進行過慮。
Queue:消息隊列,存儲消息的隊列,消息到達隊列并轉發給指定的消費方。
Consumer:消息消費者,即消費方客戶端,接收MQ轉發的消息。
消息發布和接收流程:
-----發送消息-----
1、生產者和Broker建立TCP連接。
2、生產者和Broker建立通道。
3、生產者通過通道消息發送給Broker,由Exchange將消息進行轉發。
4、Exchange將消息轉發到指定的Queue(隊列)
----接收消息-----
1、消費者和Broker建立TCP連接
2、消費者和Broker建立通道
3、消費者監聽指定的Queue(隊列)
4、當有消息到達Queue時Broker默認將消息推送給消費者。
5、消費者接收到消息
四.Hello World
1.創建Maven工程
創建生產者工程和消費者工程,分別加入RabbitMQ java client的依賴。
test-rabbitmq-producer:生產者工程
test-rabbitmq-consumer:消費者工程
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp‐client</artifactId>
<version>4.0.3</version><!‐‐此版本與spring boot 1.5.9版本匹配‐‐>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring‐boot‐starter‐logging</artifactId>
</dependency>
2.生產者
在生產者工程下的test中創建測試類如下:
package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer01 {//隊列private static final String QUEUE = "helloworld";public static void main(String[] args) {//通過連接工廠創建新的連接和mq建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//設置虛擬機,一個mq服務可以設置多個虛擬機,每個虛擬機就相當于一個獨立的mqconnectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {//建立新連接connection = connectionFactory.newConnection();//創建會話通道,生產者和mq服務所有通信都在channel通道中完成channel = connection.createChannel();//聲明隊列,如果隊列在mq 中沒有則要創建//參數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments/*** 參數明細* 1、queue 隊列名稱* 2、durable 是否持久化,如果持久化,mq重啟后隊列還在* 3、exclusive 是否獨占連接,隊列只允許在該連接中訪問,如果connection連接關閉隊列則自動刪除,如果將此參數設置true可用于臨時隊列的創建* 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數和exclusive參數設置為true就可以實現臨時隊列(隊列不用了就自動刪除)* 5、arguments 參數,可以設置一個隊列的擴展參數,比如:可設置存活時間*/channel.queueDeclare(QUEUE,true,false,false,null);//發送消息//參數:String exchange, String routingKey, BasicProperties props, byte[] body/*** 參數明細:* 1、exchange,交換機,如果不指定將使用mq的默認交換機(設置為"")* 2、routingKey,路由key,交換機根據路由key來將消息轉發到指定的隊列,如果使用默認交換機,routingKey設置為隊列的名稱* 3、props,消息的屬性* 4、body,消息內容*///消息內容String message = "hello world 黑馬程序員";channel.basicPublish("",QUEUE,null,message.getBytes());System.out.println("send to mq "+message);} catch (Exception e) {e.printStackTrace();} finally {//關閉連接//先關閉通道try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}try {connection.close();} catch (IOException e) {e.printStackTrace();}}}
}
3.消費者
在消費者工程下的test中創建測試類如下:
package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 入門程序消費者* @author Administrator* @version 1.0* @create 2018-06-17 9:25**/
public class Consumer01 {//隊列private static final String QUEUE = "helloworld";public static void main(String[] args) throws IOException, TimeoutException {//通過連接工廠創建新的連接和mq建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//設置虛擬機,一個mq服務可以設置多個虛擬機,每個虛擬機就相當于一個獨立的mqconnectionFactory.setVirtualHost("/");//建立新連接Connection connection = connectionFactory.newConnection();//創建會話通道,生產者和mq服務所有通信都在channel通道中完成Channel channel = connection.createChannel();//監聽隊列//聲明隊列,如果隊列在mq 中沒有則要創建//參數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments/*** 參數明細* 1、queue 隊列名稱* 2、durable 是否持久化,如果持久化,mq重啟后隊列還在* 3、exclusive 是否獨占連接,隊列只允許在該連接中訪問,如果connection連接關閉隊列則自動刪除,如果將此參數設置true可用于臨時隊列的創建* 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數和exclusive參數設置為true就可以實現臨時隊列(隊列不用了就自動刪除)* 5、arguments 參數,可以設置一個隊列的擴展參數,比如:可設置存活時間*/channel.queueDeclare(QUEUE,true,false,false,null);//實現消費方法DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 當接收到消息后此方法將被調用* @param consumerTag 消費者標簽,用來標識消費者的,在監聽隊列時設置channel.basicConsume* @param envelope 信封,通過envelope* @param properties 消息屬性* @param body 消息內容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交換機String exchange = envelope.getExchange();//消息id,mq在channel中用來標識消息的id,可用于確認消息已接收long deliveryTag = envelope.getDeliveryTag();//消息內容String message= new String(body,"utf-8");System.out.println("receive message:"+message);}};//監聽隊列//參數:String queue, boolean autoAck, Consumer callback/*** 參數明細:* 1、queue 隊列名稱* 2、autoAck 自動回復,當消費者接收到消息后要告訴mq消息已接收,如果將此參數設置為tru表示會自動回復mq,如果設置為false要通過編程實現回復* 3、callback,消費方法,當消費者接收到消息要執行的方法*/channel.basicConsume(QUEUE,true,defaultConsumer);}
}
五.總結
1、發送端操作流程
1)創建連接connection
2)創建通道channel
3)聲明隊列channel.queueDeclare
4)發送消息channel.basicPublish
2、接收端
1)創建連接
2)創建通道
3)聲明隊列
4)監聽隊列
5)接收消息
6)ack回復