結束了一個超級消耗周末,滿安排之健身+梅溪湖游泳+做飯喝酒+羽毛球賽
完全力竭了,久久不能恢復過來,暫停健身安排了 端午后再繼續
今日完成記錄
Time | Plan | 完成情況 |
---|---|---|
7:30 - 8:10 | 有氧爬坡 | √ |
9:00 - 11:00 | RabbitMQ學習 | √ |
12:00 - 14:30 | 繼續RabbitMQ學習 | √ |
14:30 - 17:30 | 小論文2 | √ |
RabbitMQ
整體架構以及核心概念
rabbitmq由消息生產者publisher、消息消費者consumer以及消息代理server broker構成,生產者生產消息,消息代理根據規則將消息放在消息隊列中,消費者從消息隊列消費消息。
什么是rabbitmq server broker?
rabbitmq server broker是一種消息代理(message broker),它由exchange交換機、queue消息隊列和binding路由規則組成,規定了消息如何傳遞。
為什么需要broker?(broker的好處)
- 解耦生產者和消費者,實現異步通信!
- 提供靈活的路由策略(一對一【直連】,一對多【主題、扇出】)
- 支持消息持久化、重試等高級功能
什么是virtual host?
(簡稱vhost)
vhost是RabbitMQ的邏輯隔離機制,類似于文件系統中的目錄或者數據庫中的schema。它允許在同一個rabbitmq server上創建多個獨立環境,每個vhost有自己的exchange、queue、binding和權限,彼此互不干擾。
vhost和集群的關系?
- vhost是邏輯隔離,集群式物理部署
- 集群中所有節點都會同步vhost定義,但隊列數據默認只存儲在創建它的節點上(除非配置鏡像隊列)
vhost能跨服務器共享嗎?
- 不能。vhost是單服務器內的邏輯概念,不同mq server之間vhost完全獨立。
快速實戰(Java)
這里直接使用spring提供的基于RabbitMQ的消息收發模板:SpringAMQP
maven依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
環境配置
基本配置如下,部署rabbitmq的主機ip、端口、業務對應虛擬主機、用戶名、密碼
spring:rabbitmq:host: 192.168.4.144port: 5672virtual-host: /hmallusername: hmallpassword: 123321
發送信息
@Autowired
RabbitMQTemplate rt;String queueName = "hello.queue";
String msg = "hello mq";
// 使用
rt.convertAndSend(queueName, msg);
消費消息
@RabbitListener(queues = "hello.queue")
public void ListenSimpleQueue(String msg){System.out.println(msg);
}
上面兩部分中的消息,并非一定是String類型,任意類型都可以,SpringAMQP會自動進行類型轉換。
WorkQueues模型
任務模型,就是讓多個消費者綁定到同一個隊列,共同消費隊列中的消息。
(1)接下來代碼實現兩個消費者綁定在同一個隊列上以每秒50條消息的速度消費消息,并且由一個生產者以每秒鐘50條消息的速度生產消息。
生產者
@SpringBootTest
public class MQTest {@Autowiredprivate RabbitTemplate rt;@Testvoid testSendMQHello() throws InterruptedException {for(int i = 0; i < 50; i ++ ) {Thread.sleep(20);rt.convertAndSend("hello.queue", "hello, dame mq + " + i + " 號消息");}}
}
消費者
@Slf4j
@Component
public class MyMQListener {@RabbitListener(queues = "hello.queue")public void listenSimpleQueue1(String msg){System.out.println("消費者1收到了simple.queue的消息 :【" + msg +"】");try {Thread.sleep(20);} catch (InterruptedException e) {}}@RabbitListener(queues = "hello.queue")public void listenSimpleQueue2(String msg){System.out.println("消費者2收到了simple.queue的消息:【" + msg +"】");try {Thread.sleep(20);} catch (InterruptedException e) {}}
}
結果如下:兩個消費者依次消費生產者產生的消息
(2)實際上部署在不同設備上的代碼執行速度會因為設備性能而表現有所差異,因此接下來修改消費者代碼,將一個消費者消費速度改為5個消息一秒鐘,觀察一下結果
@RabbitListener(queues = "hello.queue")public void listenSimpleQueue2(String msg){System.out.println("消費者2收到了simple.queue的消息:【" + msg +"】");try {Thread.sleep(200);} catch (InterruptedException e) {}}
結果如下,我們發現消費者2消費速度下降了以后,它們消費的編號依舊沒有區別,也就是隊列分配給不同性能的消費者相同的消息量,不考慮消費者的實際消費能力,這樣會導致性能差的機器持續阻塞很多消息,而性能好的機器空閑。
(3)SpringAMQP中提供了prefetch的設置,可以限制消費者獲取消息隊列中消息的數量
spring:rabbitmq:listener:simple:prefetch: 1
設置后測試,結果如下:可見消費能力強的消費者1不再是固定地消費編號為02468的消息,而是消費了02345678,也就是每個消費者每次只能取一條消息,消費完成后再取下一個消息。
交換機類型
交換機沒有存儲消息的能力,只負責轉發消息,因此如果沒有任何隊列和交換機綁定,或者沒有符合路由規則的隊列,那么消息會丟失。
交換機一共有四種:
- Fanout:扇出型,廣播給所有綁定到交換機的隊列
- Direct:訂閱型,基于RouteKey發送給訂閱了消息的隊列
- Topic:主題訂閱,根據通配符匹配RoutingKey進行轉發
- Headers:頭匹配
Fanout交換機
如圖,fanout交換機接收到生產者的消息會將其路由給所有綁定的消息隊列,消息會在每個消息隊列中被消費一次
Direct交換機
如圖,direct exchange會根據消息的routing key 匹配相同的key發送到消息隊列中
Topic交換機
如圖,Topic Exchange也是使用routing key匹配binding key,但是允許bindingkey使用通配符。