文章目錄
- 建立連接
- 工作隊列模式實現
- 創建隊列和交換機
- 生產者代碼
- 消費者代碼
- 運行程序
- 啟動消費者
- 啟動生產者
- 發布/訂閱模式實現
- 創建隊列和交換機
- 生產者代碼
- 創建交換機
- 聲明兩個隊列
- 綁定隊列和交換機
- 發送消息
- 完整代碼
- 消費者代碼
- 完整代碼
- 運行程序
- 啟動生產者
- 啟動消費者
建立連接
我們把建立連接時,創建的連接工廠部分創建成常量,方便后面進行使用
- 在
rabbitmq
包下,再創建一個constant
包
package rabbitmq.constant; public class Constants { static public final String HOST = "localhost"; static public final int PORT = 5672; static public final String USER_NAME = "study"; static public final String PASSWORD = "study"; static public final String VIRTUAL_HOST = "coding ";
}
工作隊列模式實現
和簡單模式相比較,工作隊列與之不同的就是有多個消費者,其他都一樣。所以我們只需要多添加幾個消費者即可
創建隊列和交換機
在 Constants
中添加:
// 工作隊列模式
public static final String WORK_QUEUE = "work.queue ";
生產者代碼
package rabbitmq.work; import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants; import java.io.IOException;
import java.util.concurrent.TimeoutException; public class Producer { public static void main(String[] args) throws IOException, TimeoutException { // 1. 建立連接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); //2. 開啟信道 Channel channel = connection.createChannel(); //3. 聲明隊列 channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null); //4. 發送消息 for (int i = 0; i < 10; i++) { String msg = "hello work queue..." + i; channel.basicPublish("", Constants.WORK_QUEUE, null, msg.getBytes()); } System.out.println("消息發送成功!"); // 5. 資源釋放 channel.close(); connection.close(); }
}
消費者代碼
package rabbitmq.work; import com.rabbitmq.client.*;
import rabbitmq.constant.Constants; import java.io.IOException;
import java.util.concurrent.TimeoutException; public class Consumer1 { public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { // 1. 建立連接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); //2. 開啟信道 Channel channel = connection.createChannel(); //3. 聲明隊列 channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null); //4. 消費消息 DefaultConsumer consumer = new DefaultConsumer(channel){ // 從隊列中收到消息,就會執行的方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //TODO System.out.println("接收到消息: " + new String(body)); } }; channel.basicConsume(Constants.WORK_QUEUE, true, consumer); // 等待程序執行完成 Thread.sleep(2000); // 5. 釋放資源
// channel.close();
// connection.close(); }
}
- 多個消費者的代碼都一樣的
運行程序
我們先啟動兩個消費者,再啟動生產者
- 如果先啟動生產者,再啟動消費者,由于消息較少,處理較快,那么第一個啟動的消費者就會瞬間把 10 條消息消費掉,所以我們先啟動兩個消費者,再啟動生產者
啟動消費者
我們將兩個消費者啟動
- 我們可以看到
rabbitmq
客戶端里面,work.queue
隊列已經被創建了出來
啟動生產者
在啟動消費者之后,我們啟動生產者,發送 10 條消息到隊列中
- 我們可以看到,連個該消費者將 10 條消息消費完了
發布/訂閱模式實現
在發布/訂閱模式中,多了一個 Exchange
角色
Exchange
常見有三種類型,分別代表不同的路由規則
Fanout
: 廣播,將消息交給所有綁定到交換機的隊列(Publish/Subscribe
)Direct
: 定向,將消息交給符合指定routingKey
的隊列 (Routing
模式)Topic
: 通配符,把消息交給符合routing pattern
(路由模式)的隊列(Topics
模式)
也就分別對應不同的工作模式
創建隊列和交換機
在 Constants
中添加:
// 發布訂閱模式
public static final String FANOUT_EXCHANGE = "fanout.exchange";
public static final String FANOUT_QUEUE1 = "fanout.queue1";
public static final String FANOUT_QUEUE2 = "fanout.queue2";
生產者代碼
發布/訂閱模式的生產者代碼和簡單模式類似,只是有些變化
- 需要聲明交換機
- 需要指出交換機和隊列之間的關系
創建交換機
相比于生產者代碼和簡單模式,這一步是關鍵的一步。我們需要聲明一個交換機,而不是使用默認交換機
channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);
- 我們會使用到
exchangeDeclare()
方法
Exchange.DeclareOk exchangeDeclare(String exchange,
BuiltinExchangeType type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments) throws IOException;
參數解釋:
exchange
:交換機名稱type
:交換機類型Direct("direct")
:定向,直連,routing
Fanout("fanout")
:扇形(廣播),每個隊列都能收到消息TOPIC("topic")
:通配符HEADERS("headers")
:參數匹配(工作時用到的少)
durable
:是否持久化true
:持久化false
:非持久化- 持久化可以將交換器存盤,在服務器重啟的時候不會丟失相關信息
autoDelete
:自動刪除- 自動刪除的前提是至少有一個對類或者交換器與這個交換器綁定,之后所有與這個交換器綁定的對類或交換器都與此解綁
- 而不是這種理解:當與此交換器連接的客戶端都斷開時,
RabbitMQ
會自動刪除本交換器
internal
:內部使用,一般false
- 如果設置為
true
,表示內部使用 - 客戶端程序無法直接發送消息到這個交換器中,只能通過交換器路由到交換器這種方式
- 如果設置為
argument
:參數
聲明兩個隊列
// 如果沒有一個這樣的隊列,會自動創建;如果有,則不創建
channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);
channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);
綁定隊列和交換機
channel.queueBind(Constants.FANOUT_QUEUE1, Constants.FANOUT_EXCHANGE, "");
channel.queueBind(Constants.FANOUT_QUEUE2, Constants.FANOUT_EXCHANGE, "");
- 這里會用到
queueBind()
方法
queueBind(String queue, String exchange, String routingKey)
參數解釋:
queue
:對類名稱exchange
:交換機名稱routingKey
:路由key
,路由規則
- 如果交換機類型為
fanout
,routingKey
設置為“”
,表示每個消費者都能收到全部信息
發送消息
String msg = "hello fanout...";
// 第二個參數 routingKey 為空。因為這是廣播模式,交換機收到消息后需要全部轉發(綁定的時候設為空,發送的時候也為空
channel.basicPublish(Constants.FANOUT_EXCHANGE, "", null, msg.getBytes());
System.out.println("消息發送成功!");
- 這里會用到
basicPublish()
方法
basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)
參數解釋:
Exchange
:交換機名稱routingKey
:如果交換機類型為fanout
,routingKey
設置為“”
,表示每個消費者都能收到全部信息
完整代碼
package rabbitmq.fanout; import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants; import java.io.IOException;
import java.util.concurrent.TimeoutException; public class Producer { public static void main(String[] args) throws IOException, TimeoutException { // 1. 建立連接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); //2. 開啟信道 Channel channel = connection.createChannel(); //3. 聲明交換機 /* Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException; 參數解釋: exchange:交換機名稱 type:交換機類型 DIRECT("direct"),定向,直連,routing FANOUT("fanout"),扇形(廣播),每個隊列都能收到消息 TOPIC("topic"),通配符 HEADERS("headers"),參數匹配(工作中用的少) durable:是否持久化 autoDelete:自動刪除 internal:內部使用(一般false) arguments:參數 */ channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true); // 4. 聲明隊列 channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null); channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null); // 5. 綁定交換機和隊列 channel.queueBind(Constants.FANOUT_QUEUE1, Constants.FANOUT_EXCHANGE, ""); channel.queueBind(Constants.FANOUT_QUEUE2, Constants.FANOUT_EXCHANGE, ""); // 6. 發布消息 String msg = "hello fanout..."; // 第二個參數 routingKey 為空。因為這是廣播模式,交換機收到消息后需要全部轉發(綁定的時候設為空,發送的時候也為空 channel.basicPublish(Constants.FANOUT_EXCHANGE, "", null, msg.getBytes()); System.out.println("消息發送成功!"); // 7. 釋放資源 channel.close(); connection.close(); }
}
消費者代碼
主要的步驟為:
- 創建
Channel
- 接收消息,并處理
完整代碼
package rabbitmq.fanout; import com.rabbitmq.client.*;
import rabbitmq.constant.Constants; import java.io.IOException;
import java.util.concurrent.TimeoutException; public class Consumer1 { public static void main(String[] args) throws IOException, TimeoutException { // 1. 建立連接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); // 2.建立信道 Channel channel = connection.createChannel(); // 3.聲明隊列(如果隊列已經存在,就不會創建;不存在就會創建) channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null); // 4. 消費信息 DefaultConsumer consumer = new DefaultConsumer(channel) { // 從隊列中收到消息,就會執行的方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收到消息:" + new String(body)); } }; channel.basicConsume(Constants.FANOUT_QUEUE1, true, consumer); } }
運行程序
啟動生產者
- 消息全轉發
- 我們可以看到兩個隊列中分別有了一條消息
- 這就是發布訂閱模式,他會把收到的消息都轉發
- 交換機綁定了隊列
- 這里,我們可以看到交換機和隊列之間的綁定關系
啟動消費者
消費者 1:
接收到消息:hello fanout...
消費者 2:
接收到消息:hello fanout...