消費端限流
1. 為什么要對消費端限流
假設一個場景,首先,我們 Rabbitmq 服務器積壓了有上萬條未處理的消息,我們隨便打開一個消費者客戶端,會出現這樣情況: 巨量的消息瞬間全部推送過來,但是我們單個客戶端無法同時處理這么多數據!
當數據量特別大的時候,我們對生產端限流肯定是不科學的,因為有時候并發量就是特別大,有時候并發量又特別少,我們無法約束生產端,這是用戶的行為。所以我們應該對消費端限流,用于保持消費端的穩定,當消息數量激增的時候很有可能造成資源耗盡,以及影響服務的性能,導致系統的卡頓甚至直接崩潰。
2.限流的 api 講解
RabbitMQ 提供了一種 qos (服務質量保證)功能,即在非自動確認消息的前提下,如果一定數目的消息(通過基于 consume 或者 channel 設置 Qos 的值)未被確認前,不進行消費新的消息。
/*** Request specific "quality of service" settings.* These settings impose limits on the amount of data the server* will deliver to consumers before requiring acknowledgements.* Thus they provide a means of consumer-initiated flow control.* @param prefetchSize maximum amount of content (measured in* octets) that the server will deliver, 0 if unlimited* @param prefetchCount maximum number of messages that the server* will deliver, 0 if unlimited* @param global true if the settings should be applied to the* entire channel rather than each consumer* @throws java.io.IOException if an error is encountered*/void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
- prefetchSize:0,單條消息大小限制,0代表不限制
- prefetchCount:一次性消費的消息數量。會告訴 RabbitMQ 不要同時給一個消費者推送多于 N 個消息,即一旦有 N 個消息還沒有 ack,則該 consumer 將 block 掉,直到有消息 ack。
- global:true、false 是否將上面設置應用于 channel,簡單點說,就是上面限制是 channel 級別的還是 consumer 級別。當我們設置為 false 的時候生效,設置為 true 的時候沒有了限流功能,因為 channel 級別尚未實現。
- 注意:prefetchSize 和 global 這兩項,rabbitmq 沒有實現,暫且不研究。特別注意一點,prefetchCount 在 no_ask=false 的情況下才生效,即在自動應答的情況下這兩個值是不生效的。
3.如何對消費端進行限流
- 首先第一步,我們既然要使用消費端限流,我們需要關閉自動 ack,將 autoAck 設置為 falsechannel.basicConsume(queueName, false, consumer);
- 第二步我們來設置具體的限流大小以及數量。channel.basicQos(0, 15, false);
- 第三步在消費者的 handleDelivery 消費方法中手動 ack,并且設置批量處理 ack 回應為 truechannel.basicAck(envelope.getDeliveryTag(), true);
這是生產端代碼,與前幾章的生產端代碼沒有做任何改變,主要的操作集中在消費端。
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class QosProducer { public static void main(String[] args) throws Exception { //1. 創建一個 ConnectionFactory 并進行設置 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); //2. 通過連接工廠來創建連接 Connection connection = factory.newConnection(); //3. 通過 Connection 來創建 Channel Channel channel = connection.createChannel(); //4. 聲明 String exchangeName = "test_qos_exchange"; String routingKey = "item.add"; //5. 發送 String msg = "this is qos msg"; for (int i = 0; i < 10; i++) { String tem = msg + " : " + i; channel.basicPublish(exchangeName, routingKey, null, tem.getBytes()); System.out.println("Send message : " + tem); } //6. 關閉連接 channel.close(); connection.close(); }}
這里我們創建了兩個消費者,以方便驗證限流api中的 global 參數設置為 true 時不起作用.。整體結構如下圖所示,兩個 Consumer 都綁定在同一個隊列上,這樣的話兩個消費者將共同消費發送的10條消息。

import com.rabbitmq.client.*;import java.io.IOException;public class QosConsumer { public static void main(String[] args) throws Exception { //1. 創建一個 ConnectionFactory 并進行設置 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); factory.setAutomaticRecoveryEnabled(true); factory.setNetworkRecoveryInterval(3000); //2. 通過連接工廠來創建連接 Connection connection = factory.newConnection(); //3. 通過 Connection 來創建 Channel final Channel channel = connection.createChannel(); //4. 聲明 String exchangeName = "test_qos_exchange"; String queueName = "test_qos_queue"; String queueName1 = "test_qos_queue_1"; String routingKey = "item.#"; channel.exchangeDeclare(exchangeName, "topic