1.MQ
?MQ全稱 Message Queue(消息隊列),是在消息的傳輸過程中? 保存消息的容器。它是應用程序和應用程序之間的通信方法
1.1?為什么使用MQ
在項目中,可將一些無需即時返回且耗時的操作提取出來,進行異步處理,而這種異步處理的方式大大的節省了服務器的請求響應時間,從而提高了系統的吞吐量。
1.2MQ的好處
1.應用解耦? ?系統間通過消息通信,不用關心其他系統的處理。
2.異步提速??相比于傳統的串行、并行方式,提高了系統吞吐量。
3.削峰填谷? ?可以通過消息隊列長度控制請求量;可以緩解短時間內的高并發請求。
簡單來說: 就是在訪問量劇增的情況下,但是應用仍然不能停,比如“雙十一”下單的人多,但是淘寶這個應用仍然要運行,所以就可以使用消息中間件采用隊列的形式減少突然訪問的壓力
使用MQ后,可以提高系統穩定性
1.3劣勢
-
系統可用性降低 系統引入的外部依賴越多,系統穩定性越差。一旦 MQ 宕機,就會對業務造成影響。如何保證MQ的高可用?
-
系統復雜度提高 MQ 的加入大大增加了系統的復雜度,以前系統間是同步的遠程調用,現在是通過 MQ 進行異步調用。如何保證消息沒有被重復消費?怎么處理消息丟失情況?那么保證消息傳遞的順序性?
-
一致性問題 A 系統處理完業務,通過 MQ 給B、C、D三個系統發消息數據,如果 B 系統、C 系統處理成功,D 系統處理失敗。如何保證消息數據處理的一致性?
1.4常見的MQ組件
RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等
2.RabbitMQ
RabbitMQ是一個由erlang開發的AMQP(Advanced Message Queue 高級消息隊列協議 )的開源實現,由于erlang 語言的高并發特性,性能較好,本質是個隊列,FIFO 先入先出,里面存放的內容是message
RabbitMQ是一個消息中間件:它接受并轉發消息。你可以把它當做一個快遞站點,當你要發送一個包裹時,你把你的包裹放到快遞站,快遞員最終會把你的快遞送到收件人那里,按照這種邏輯RabbitMQ是一個快遞站,一個快遞員幫你傳遞快件。RabbitMQ與快遞站的主要區別在于,它不處理快件而是接收,存儲和轉發消息數據。
2.1RabbitMQ的原理
核心組件
-
生產者(Producer):負責發送消息到交換器的客戶端應用程序。
-
消費者(Consumer):從隊列中獲取并處理消息的客戶端應用程序。
-
交換器(Exchange):接收生產者發送的消息,并根據路由規則將消息轉發到相應的隊列。
-
隊列(Queue):存儲消息,直到消費者取走消息。
-
綁定(Binding):定義交換器和隊列之間的關聯關系。
工作流程
-
消息發送:生產者通過信道(Channel)將消息發送到交換器。
-
消息路由:交換器根據路由鍵(Routing Key)和綁定鍵(Binding Key)將消息路由到相應的隊列。
-
消息存儲:隊列存儲消息,等待消費者取走。
-
消息消費:消費者通過信道從隊列中獲取消息并處理。
交換器類型
-
Direct:根據完全匹配的路由鍵將消息發送到相應的隊列。
-
Fanout:將消息廣播到所有綁定的隊列,不考慮路由鍵。
-
Topic:根據模式匹配的路由鍵將消息發送到相應的隊列。
2.2簡單模式simple
生產者向隊列投遞消息,消費者從其中取出消息
1.依賴
<!-- java連接rabbitmq的依賴--><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version></dependency>
2.生產消息
package com.ghx.hello;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author :guo* @date :Created in 2025/3/20 11:35* @description:* @version:*/
public class Test01 {public static void main(String[] args) throws IOException, TimeoutException {//創建連接工廠ConnectionFactory factory=new ConnectionFactory();//rabbitmq服務器地址 默認本地localhostfactory.setHost("xxxx");//端口號 默認5672factory.setPort(5672);//用戶名 密碼 默認guestfactory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");//創建連接對象Connection connection=factory.newConnection();//獲取channel對象Channel channel = connection.createChannel();//創建隊列 存在則不創建,不存在則創建//String queue, 隊列名// boolean durable, 是否持久化// boolean exclusive, 是否獨占隊列 false// boolean autoDelete,是否自動刪除 false// Map<String, Object> arguments 隊列的參數配置--消息的格式 消息存放的時間等channel.queueDeclare("hello",true,false,false,null);String msg="hello rabbitmq2";//String exchange,交換機的名稱 "":默認交換機// String routingKey, 路由key "hello":隊列名// BasicProperties props, 消息的屬性--設置過期時間 設置id等 null// byte[] body 消息的內容channel.basicPublish("","hello",null,msg.getBytes());System.out.println("消息發送成功");channel.close();connection.close();}
}
3.消費消息
package com.ghx.hello;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author :guo* @date :Created in 2025/3/20 14:22* @description:* @version:*/
public class Test01 {public static void main(String[] args) throws IOException, TimeoutException {//創建連接工廠ConnectionFactory factory=new ConnectionFactory();//rabbitmq服務器地址 默認本地localhostfactory.setHost("xxxx");//端口號 默認5672factory.setPort(5672);//用戶名 密碼 默認guestfactory.setUsername("guest");factory.setPassword("guest");//虛擬機名稱 默認/factory.setVirtualHost("/");//創建連接對象Connection connection = factory.newConnection();//獲取channel對象Channel channel = connection.createChannel();DefaultConsumer consumer=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者1收到消息"+new String(body));}};//接受消息channel.basicConsume("hello",true,consumer);//不要關閉連接和channel 監聽消息}
}
2.3工作者模式work queues
多個消費者消費同一個隊列中的消息,多個消費者之間屬于競爭關系,一個消息只能被一個消費者消費,適合對于任務過重或任務較多的情況,使用工作隊列可以提高任務的處理速度
1.生產者
package com.ghx.work;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;/*** @author :guo* @date :Created in 2025/3/20 14:51* @description:* @version:*/
public class Test03 {private static final String QUEUE_NAME="queue01";public static void main(String[] args) {ConnectionFactory factory=new ConnectionFactory();factory.setHost("xxxx");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");try {Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME,true,false,false,null);for (int i = 0; i < 10; i++){String msg="你好 世界"+i;channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes("utf-8"));}channel.close();connection.close();}catch (Exception e){}}
}
2.? 2個消費者
package com.ghx.work;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author :guo* @date :Created in 2025/3/20 15:00* @description:* @version:*/
public class Test03 {private static final String QUEUE_NAME="queue01";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory=new ConnectionFactory();factory.setHost("xxxX");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME,true,false,false,null);channel.basicQos(1);Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者1收到消息"+new String(body));}};//接收消息channel.basicConsume(QUEUE_NAME,true,consumer);}}
package com.ghx.work;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author :guo* @date :Created in 2025/3/20 15:00* @description:* @version:*/
public class Consumer02 {private static final String QUEUE_NAME="queue01";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory=new ConnectionFactory();factory.setHost("xxxx");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME,true,false,false,null);channel.basicQos(1);Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者2收到消息"+new String(body));}};//接收消息channel.basicConsume(QUEUE_NAME,true,consumer);}}
2.3發布訂閱模式 publish/subscribe
x? : 交換機
????????一方面,接收生產者發送的消息。另一方面,知道如何處理消息,例如遞交給某個特別隊列、遞交給所有隊列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。Exchange有常見以下3種類型:
-
Fanout:廣播,將消息交給所有綁定到交換機的隊列
-
Direct:定向,把消息交給符合指定routing key 的隊列
-
Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊列
每個消費者都有自己獨立的隊列
2.3.1生產者
package com.ghx.work;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author :guo* @date :Created in 2025/3/20 11:35* @description:* @version:*/
public class Test01 {public static void main(String[] args) throws IOException, TimeoutException {//創建連接工廠ConnectionFactory factory=new ConnectionFactory();//rabbitmq服務器地址 默認本地localhostfactory.setHost("xxxx");//端口號 默認5672factory.setPort(5672);//用戶名 密碼 默認guestfactory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");//創建連接對象Connection connection=factory.newConnection();//獲取channel對象Channel channel = connection.createChannel();//創建交換機
// String exchange,交換機的名稱
// BuiltinExchangeType type, 交換機的類型
// boolean durable: 是否持久化channel.exchangeDeclare("fanout_exchange", BuiltinExchangeType.FANOUT,true);//創建隊列channel.queueDeclare("fanout_queue1",true,false,false,null);channel.queueDeclare("fanout_queue2",true,false,false,null);//綁定隊列和交換機
// String queue,隊列名
// String exchange,交換機名
// String routingKey: 路由key 因為廣播模式沒有路由key ""channel.queueBind("fanout_queue1","fanout_exchange","");channel.queueBind("fanout_queue2","fanout_exchange","");//發送消息String msg="hello fanout交換機";channel.basicPublish("fanout_exchange","",null,msg.getBytes());channel.close();connection.close();}
}
2.4路由模式routing
-
隊列與交換機的綁定,不能是任意綁定了,而是要指定一個 RoutingKey(路由key)
-
消息的發送方在向 Exchange 發送消息時,也必須指定消息的 RoutingKey
-
Exchange 不再把消息交給每一個綁定的隊列,而是根據消息的 Routing Key 進行判斷,只有隊列的Routingkey 與消息的 Routing key 完全一致,才會接收到消息
package com.ghx.router;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author :guo* @date :Created in 2025/3/20 11:35* @description:* @version:*/
public class Test01 {public static void main(String[] args) throws IOException, TimeoutException {//創建連接工廠ConnectionFactory factory=new ConnectionFactory();//rabbitmq服務器地址 默認本地localhostfactory.setHost("xxxx");//端口號 默認5672factory.setPort(5672);//用戶名 密碼 默認guestfactory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");//創建連接對象Connection connection=factory.newConnection();//獲取channel對象Channel channel = connection.createChannel();//創建交換機
// String exchange,交換機的名稱
// BuiltinExchangeType type, 交換機的類型
// boolean durable: 是否持久化channel.exchangeDeclare("direct_exchange", BuiltinExchangeType.DIRECT,true);//創建隊列channel.queueDeclare("direct_queue1",true,false,false,null);channel.queueDeclare("direct_queue2",true,false,false,null);//綁定隊列和交換機
// String queue,隊列名
// String exchange,交換機名
// String routingKey: 路由key 因為廣播模式沒有路由key ""channel.queueBind("direct_queue1","direct_exchange","error");channel.queueBind("direct_queue2","direct_exchange","error");channel.queueBind("direct_queue2","direct_exchange","info");channel.queueBind("direct_queue2","direct_exchange","warning");//發送消息String msg="hello direct交換機";channel.basicPublish("direct_exchange","info",null,msg.getBytes());channel.close();connection.close();}
}
2.5主題模式topics
-
Topic 類型與 Direct 相比,都是可以根據 RoutingKey 把消息路由到不同的隊列。只不過 Topic 類型Exchange 可以讓隊列在綁定 Routing key 的時候使用==通配符==!
-
Routingkey 一般都是有一個或多個單詞組成,多個單詞之間以”.”分割,例如: item.insert
-
通配符規則:# 匹配一個或多個詞,* 匹配不多不少恰好1個詞,例如:item.# 能夠匹配 item.insert.abc 或者 item.insert,item.* 只能匹配 item.insert
下面的只會發送給2
package com.ghx.topic;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author :guo* @date :Created in 2025/3/20 11:35* @description:* @version:*/
public class Test01 {public static void main(String[] args) throws IOException, TimeoutException {//創建連接工廠ConnectionFactory factory=new ConnectionFactory();//rabbitmq服務器地址 默認本地localhostfactory.setHost("121.196.229.251");//端口號 默認5672factory.setPort(5672);//用戶名 密碼 默認guestfactory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");//創建連接對象Connection connection=factory.newConnection();//獲取channel對象Channel channel = connection.createChannel();//創建交換機
// String exchange,交換機的名稱
// BuiltinExchangeType type, 交換機的類型
// boolean durable: 是否持久化channel.exchangeDeclare("topic_exchange", BuiltinExchangeType.TOPIC,true);//創建隊列channel.queueDeclare("topic_queue1",true,false,false,null);channel.queueDeclare("topic_queue2",true,false,false,null);//綁定隊列和交換機
// String queue,隊列名
// String exchange,交換機名
// String routingKey: 路由key 因為廣播模式沒有路由key ""channel.queueBind("topic_queue1","topic_exchange","*.orange.*");channel.queueBind("topic_queue2","topic_exchange","*.*.rabbit");channel.queueBind("topic_queue2","topic_exchange","lazy.#");//發送消息String msg="hello topic交換機";channel.basicPublish("topic_exchange","lazy.orange",null,msg.getBytes());channel.close();connection.close();}
}