目錄
- 一、案例需求
- 二、代碼實現
- 三、總結
生產者源碼
消費者源碼
一、案例需求
模擬WorkQueue,實現一個隊列綁定多個消費者。
- 在RabbitMQ的控制臺創建一個隊列,命名為
work.queue
。 - 在生產者服務中定義測試方法,在1s內產生50條消息,發送到
work.queue
。 - 在消費這服務中定義兩個消息監聽,都監聽
work.queue
隊列。 - 消費者1每秒處理50條消息,消費者2每秒處理5條消息。
二、代碼實現
生產者
@Test
public void workQueueTest() throws InterruptedException {for (int i = 0; i < 50; i++) {String queueName = "work.queue";String message = String.format("hello %s, spring amqp!", i + 1);rabbitTemplate.convertAndSend(queueName, message);Thread.sleep(200);}}
消費者
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String message){System.out.println(String.format("消費者1,收到了work.queue: %s", message));
}@RabbitListener(queues = "work.queue")
public void listenSimpleQueue2(String message){System.err.println(String.format("消費者2,收到了work.queue: %s", message));
}
三、總結
默認情況下,RabbitMQ會將消息依次輪詢投遞給綁定在隊列上的所有消費者。但是這并沒有考慮到消費者是否已經處理完消息,可能會出現消息堆積。
因此我們需要修改application.yml
,設置prefetch
值為1,確保同一時刻最多投遞給消費者1條消息。
spring:rabbitmq:listener:simple:prefetch: 1
- 多個消費者綁定到一個隊列,可以加快消費處理速度。
- 同一個消息只會被一個消費者處理。
- 通過設置
prefetch
來控制消費者預取的消息數量,處理完一條再處理下一條,實現能者多勞。