目錄
工作隊列模式
概念
特點
應用場景
工作原理
注意事項
代碼案例
引入依賴
常量類
編寫生產者代碼
編寫消費者1代碼
編寫消費者2代碼
先運行生產者,后運行消費者
先運行消費者,后運行生產者
工作隊列模式
概念
在工作隊列模式中,一個生產者(producer)將任務發布到隊列中,多個消費者(consumer)從隊列中獲取任務并執行。這種模式的主要目標是提高任務的并行處理能力,從而提高系統的吞吐量和效率。
特點
可以有多個消費者,但一條消息只能被一個消費者獲取。
消費者在處理完某條消息后,才會收到下一條消息。
RabbitMQ采用輪詢(Round-Robin)或公平分發(Fair Dispatch)的方式將消息發送給消費者。?
應用場景
1.任務分發:將任務分發給多個工作者(消費者),以便并行處理。這對于需要高吞吐量和任務處理效率的應用程序非常有用。例如,圖像處理、視頻編碼、數據轉換等應用可以使用工作隊列模式來并行處理大量任務。
2.負載均衡:當有多個消費者時,工作隊列模式可以用來實現負載均衡。任務將均勻分布給可用的消費者,以確保每個消費者都有工作可做,而且不會超負荷。
3.后臺任務處理:在Web應用程序中,后臺任務處理是一個常見的需求。工作隊列模式可用于處理與Web請求無關的長時間運行任務,而不會影響用戶體驗。例如,發送電子郵件、生成報告、備份數據等后臺任務可以使用工作隊列來處理。
工作原理
1.生產者發送任務:生產者將任務封裝為消息,并將其發送到RabbitMQ隊列中。
2.RabbitMQ分發任務:RabbitMQ根據配置的分發策略(如輪詢或公平分發)將任務分發給消費者。
3.消費者處理任務:消費者從隊列中獲取任務并執行。在處理完任務后,消費者會向RabbitMQ發送確認消息,表示任務已完成。
4.RabbitMQ確認任務完成:在收到消費者的確認消息后,RabbitMQ會將該任務從隊列中移除。
注意事項
1.消息確認:為了確保消息不會丟失,消費者在處理完任務后需要向RabbitMQ發送確認消息。如果消費者在處理任務時失敗或崩潰,RabbitMQ會將該任務重新分發給其他消費者。
2.負載均衡:RabbitMQ默認采用輪詢方式將消息分發給消費者。如果需要更復雜的負載均衡策略,可以考慮使用其他分發策略或自定義交換機類型。
3.錯誤處理:在生產者和消費者中都需要添加適當的錯誤處理邏輯,以處理可能出現的異常情況,如連接失敗、消息發送失敗等。
代碼案例
引入依賴
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --> <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.21.0</version> </dependency>
常量類
public class Constants {public static final String HOST = "47.98.109.138";public static final int PORT = 5672;public static final String USER_NAME = "study";public static final String PASSWORD = "study";public static final String VIRTUAL_HOST = "aaa";//工作隊列模式public static final String WORK_QUEUE = "work.queue";
}
編寫生產者代碼
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前開放端口號connectionFactory.setUsername(Constants.USER_NAME);//賬號connectionFactory.setPassword(Constants.PASSWORD); //密碼connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虛擬主機Connection connection = connectionFactory.newConnection();//2. 開啟信道Channel channel = connection.createChannel();//3. 聲明隊列 使用內置的交換機//如果隊列不存在, 則創建, 如果隊列存在, 則不創建channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//4. 發送消息for (int i = 0; i < 10; i++) {String msg = "hello work queue...."+i;channel.basicPublish("",Constants.WORK_QUEUE, null, msg.getBytes());}System.out.println("消息發送成功~");//6. 資源釋放channel.close();connection.close();}
}
編寫消費者1代碼
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前開放端口號connectionFactory.setUsername(Constants.USER_NAME);//賬號connectionFactory.setPassword(Constants.PASSWORD); //密碼connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虛擬主機Connection connection = connectionFactory.newConnection();//2. 開啟信道Channel channel = connection.createChannel();//3. 聲明隊列 使用內置的交換機//如果隊列不存在, 則創建, 如果隊列存在, 則不創建channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//4. 消費消息DefaultConsumer consumer = new DefaultConsumer(channel){//從隊列中收到消息, 就會執行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.WORK_QUEUE, true, consumer);//6. 資源釋放
// channel.close();
// connection.close();}
}
編寫消費者2代碼
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前開放端口號connectionFactory.setUsername(Constants.USER_NAME);//賬號connectionFactory.setPassword(Constants.PASSWORD); //密碼connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虛擬主機Connection connection = connectionFactory.newConnection();//2. 開啟信道Channel channel = connection.createChannel();//3. 聲明隊列 使用內置的交換機//如果隊列不存在, 則創建, 如果隊列存在, 則不創建channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//4. 消費消息DefaultConsumer consumer = new DefaultConsumer(channel){//從隊列中收到消息, 就會執行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//TODOSystem.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.WORK_QUEUE, true, consumer);// //6. 資源釋放
// channel.close();
// connection.close();}
}
先運行生產者,后運行消費者
查看管理界面
我們此時會看到,先啟動的消費者會消費掉隊列中所有的消息。
先運行消費者,后運行生產者
此時我們能看到,兩個消費者都能夠消費消息。