RabbitMQ ②-工作模式

在這里插入圖片描述

RabbitMQ 工作模式

官方提供了七種工作模式

Simple(簡單模式)在這里插入圖片描述

  • P:生產者,發布消息到隊列
  • C:消費者,從隊列中獲取消息并消費
  • Queue:消息隊列,存儲消息。

一個生產者,一個消費者,消息只能被消費一次,也被稱為點對點(Point-to-Point)模式。

Work-Queues(工作隊列模式)

在這里插入圖片描述

  • P:生產者,發布消息到隊列
  • C1C2:消費者,從隊列中獲取消息并消費
  • Queue:消息隊列,存儲消息。

Queue 存儲多個消息時,就會分配給不同的消費者,每個消費者接收到不同的消息。

Publish/Subscribe(發布/訂閱模式)

在這里插入圖片描述

  • P:生產者,發布消息到隊列
  • C1C2:消費者,從隊列中獲取消息并消費
  • Q1Q2:消息隊列,存儲消息。
  • X:即為 Exchange,交換機,交換機可以根據一定的規則將生產者發布的消息路由到指定的隊列中。

RabbitMQ 的交換機有四種類型,不同的類型有著不同的策略:

  • Fanout:廣播,將消息交給所有綁定到交換機的隊列。(Publish/Subscribe 模式)
  • Direct:定向,將消息路由到符合 routing key(路由鍵)的隊列(Routing 模式)。
  • Topic:通配符,將消息路由到符合 routing pattern(路由匹配規則)的隊列(Topics 模式)。
  • HeadersHeaders 類型的交換機不依賴于路由鍵的匹配規則來路由消息,而是根據發送的消息內容中的 headers 屬性進行匹配,這種類型的交換機性能很差,一般不會使用。

routing key 和 binding key:

  • routing key:路由鍵,生產者將消息發給交換機時,指定的一個字符串,用來告訴交換機應該如何將消息路由到指定隊列。
  • binding key:綁定,將隊列和交換機綁定時,指定的一個字符串,這樣 RabbitMQ 就可以知道如何正確地將消息路由到指定的隊列。
    在這里插入圖片描述

Routing(路由模式)

在這里插入圖片描述

  • X:交換機,交換機根據 routing key 進行消息路由。

Topics(通配符模式)在這里插入圖片描述

  • X:交換機,交換機根據 routing pattern 進行消息路由。
  • *:匹配一個單詞。
  • #:匹配多個單詞。

RPC(遠程過程調用模式)

在這里插入圖片描述
在這里插入圖片描述

可以把該模式理解有客戶端和服務端間的通信,客戶端發送請求,服務端處理請求并返回結果。

客戶端發送請求時,指定 correlation _idreply_to,將該請求發送到 rpc_queue 里。

服務端從 rpc_queue 里取出請求,處理請求后,將結果發送到 reply_to 里。

客戶端根據 correlation _id 取出結果。

Publisher Confirms(發布確認模式)

在這里插入圖片描述
在這里插入圖片描述

該模式是 RabbitMQ 服務器也就是 Broker 向生產者發送確認消息,生產者接收到確認消息后才認為消息發送成功。

如果 RabbitMQ 服務器因為某種原因沒有接收到確認消息,需要根據業務情況決定是否重新發送消息。

工作模式使用案例

創建普通 Maven 項目,引入依賴:

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version>
</dependency>

定義 Constants 類

package mq.Constants;public class Constants {public static final String HOST = "47.94.9.33";public static final int PORT = 5672;public static final String USER_NAME = "admin";public static final String PASSWORD = "admin";public static final String VIRTUAL_HOST = "/";// * 工作隊列模式public static final String WORK_QUEUE = "work.queue";// * 發布訂閱模式public static final String FANOUT_EXCHANGE = "fanout.exchange";public static final String FANOUT_QUEUE1 = "fanout.queue1";public static final String FANOUT_QUEUE2 = "fanout.queue2";// * 路由模式public static final String DIRECT_EXCHANGE = "direct.exchange";public static final String DIRECT_QUEUE1 = "direct.queue1";public static final String DIRECT_QUEUE2 = "direct.queue2";// * 通配符模式public static final String TOPIC_EXCHANGE = "topic.exchange";public static final String TOPIC_QUEUE1 = "topic.queue1";public static final String TOPIC_QUEUE2 = "topic.queue2";// * RPC 模式public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";// * Publisher Confirms 模式public static final String P_CONFIRMS_QUEUE1 = "p.confirms.queue1";public static final String P_CONFIRMS_QUEUE2 = "p.confirms.queue2";public static final String P_CONFIRMS_QUEUE3 = "p.confirms.queue3";
}

Simple(簡單模式)

生產者

package mq.simple;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import mq.Constants.Constants;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;public class ProducerDemo {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST); // ? 公網 IPconnectionFactory.setPort(Constants.PORT); // ? 端口connectionFactory.setUsername(Constants.USER_NAME); // ? 用戶名connectionFactory.setPassword(Constants.PASSWORD); // ? 密碼connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); // ? 虛擬主機Connection connection = connectionFactory.newConnection();//  TODO 2. 開啟信道Channel channel = connection.createChannel();// TODO 3. 聲明交換機(使用內置的交換機)// TODO 4. 聲明隊列/*** Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,*                                  Map<String, Object> arguments)* queue:隊列名稱* durable:是否持久化* exclusive:是否獨占* autoDelete:是否自動刪除* arguments:參數*/channel.queueDeclare("hello", true, false, false, null);// TODO 5. 發送消息/*** void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)* exchange:交換機名稱* routingKey:使用內置交換機,routingKey 和隊列名保持一致* props:屬性配置* body:消息體*/String msg = "hello rabbitMQ~";for (int i = 0; i < 1000; i++) {channel.basicPublish("", "hello", null, msg.getBytes(StandardCharsets.UTF_8));}System.out.println("消息發送成功," + msg);// TODO 6. 釋放資源channel.close(); // ! 先關閉 channelconnection.close();}
}

消費者

package mq.simple;import com.rabbitmq.client.*;
import mq.Constants.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ConsumerDemo {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {// TODO 1. 建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST); // ? 公網 IPconnectionFactory.setPort(Constants.PORT); // ? 端口connectionFactory.setUsername(Constants.USER_NAME); // ? 用戶名connectionFactory.setPassword(Constants.PASSWORD); // ? 密碼connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); // ? 虛擬主機Connection connection = connectionFactory.newConnection();// TODO 2. 開啟信道Channel channel = connection.createChannel();// TODO 3. 聲明隊列,可以省略(如果生產者未聲明隊列的話,消費者也未聲明隊列則會報錯,因為不知道和哪個隊列綁定了)channel.queueDeclare("hello", true, false, false, null);// TODO 4. 接收消息DefaultConsumer consumer = new DefaultConsumer(channel){/*** 從隊列中,收到消息就會執行該方法* @param consumerTag the <i>consumer tag</i> associated with the consumer* @param envelope packaging data for the message 封包的消息,比如交換機,隊列名稱等...* @param properties content header data for the message* @param body the message body (opaque, client-specific byte array)* @throws IOException IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:" + new String(body));}};channel.basicConsume("hello", true, consumer);Thread.sleep(1000);// TODO 5. 釋放資源channel.close(); // ! 先關閉 channelconnection.close();}
}

Work-Queues(工作隊列模式)

生產者

package mq.workQueues;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import mq.Constants.Constants;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//  TODO 2. 開啟信道Channel channel = connection.createChannel();// TODO 3. 聲明交換機(使用內置的交換機)// TODO 4. 聲明隊列channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);// TODO 5. 發送消息for (int i = 0; i < 10; i++) {String msg = "hello work queue mode~:" + i;channel.basicPublish("", Constants.WORK_QUEUE, null, msg.getBytes(StandardCharsets.UTF_8));}System.out.println("消息發送成功");// TODO 6. 釋放資源channel.close(); // ! 先關閉 channelconnection.close();}
}

消費者1

package mq.workQueues;import com.rabbitmq.client.*;
import mq.Constants.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();// TODO 2. 開啟信道Channel channel = connection.createChannel();// TODO 3. 聲明隊列,可以省略(如果生產者未聲明隊列的話,消費者也未聲明隊列則會報錯,因為不知道和哪個隊列綁定了)channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);// TODO 4. 接收消息DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:" + new String(body));}};channel.basicConsume(Constants.WORK_QUEUE, true, consumer);// TODO 5. 釋放資源
//        channel.close(); // ! 先關閉 channel
//        connection.close();}
}

消費者2

package mq.workQueues;import com.rabbitmq.client.*;
import mq.Constants.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();// TODO 2. 開啟信道Channel channel = connection.createChannel();// TODO 3. 聲明隊列,可以省略(如果生產者未聲明隊列的話,消費者也未聲明隊列則會報錯,因為不知道和哪個隊列綁定了)channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);// TODO 4. 接收消息DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:" + new String(body));}};channel.basicConsume(Constants.WORK_QUEUE, true, consumer);// TODO 5. 釋放資源
//        channel.close(); // ! 先關閉 channel
//        connection.close();}
}

Publish/Subscribe(發布/訂閱模式)

生產者

package mq.fanout;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import mq.Constants.Constants;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//  TODO 2. 開啟信道Channel channel = connection.createChannel();// TODO 3. 聲明交換機/*** Exchange.DeclareOk exchangeDeclare(String exchange, the name of the exchange*         BuiltinExchangeType type, the exchange type*         boolean durable, true if we are declaring a durable exchange (the exchange will survive a server restart)*         boolean autoDelete, true if the server should delete the exchange when it is no longer in use*         boolean internal, true if the exchange is internal, it can't be directly published to by a client.*         Map<String, Object> arguments), other properties (construction arguments) for the exchange*/channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);// TODO 4. 聲明隊列channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);/*** Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments)*  queue: the name of the queue*  exchange: the name of the exchange*  routingKey: the routing key to use for the binding*  arguments: other properties (binding parameters)*/// TODO 5. 綁定交換機和隊列channel.queueBind(Constants.FANOUT_QUEUE1, Constants.FANOUT_EXCHANGE, "");channel.queueBind(Constants.FANOUT_QUEUE2, Constants.FANOUT_EXCHANGE, "");// TODO 6. 發送消息for (int i = 0; i < 10; i++) {String msg = "hello work queue mode~:" + i;channel.basicPublish(Constants.FANOUT_EXCHANGE, "", null, msg.getBytes(StandardCharsets.UTF_8));}System.out.println("消息發送成功");// TODO 7. 釋放資源channel.close(); // ! 先關閉 channelconnection.close();}
}

消費者1

package mq.fanout;import com.rabbitmq.client.*;
import mq.Constants.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();// TODO 2. 開啟信道Channel channel = connection.createChannel();// TODO 3. 聲明隊列,可以省略(如果生產者未聲明隊列的話,消費者也未聲明隊列則會報錯,因為不知道和哪個隊列綁定了)channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);// TODO 4. 接收消息DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:" + new String(body));}};channel.basicConsume(Constants.FANOUT_QUEUE1, true, consumer);// TODO 5. 釋放資源
//        channel.close(); // ! 先關閉 channel
//        connection.close();}
}

消費者2

package mq.fanout;import com.rabbitmq.client.*;
import mq.Constants.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();// TODO 2. 開啟信道Channel channel = connection.createChannel();// TODO 3. 聲明隊列,可以省略(如果生產者未聲明隊列的話,消費者也未聲明隊列則會報錯,因為不知道和哪個隊列綁定了)channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);// TODO 4. 接收消息DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:" + new String(body));}};channel.basicConsume(Constants.FANOUT_QUEUE2, true, consumer);// TODO 5. 釋放資源
//        channel.close(); // ! 先關閉 channel
//        connection.close();}
}

Routing(路由模式)

生產者

package mq.direct;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import mq.Constants.Constants;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();// TODO 2. 開啟隧道Channel channel = connection.createChannel();// TODO 3. 聲明交換機channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);// TODO 4. 聲明隊列channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);// TODO 5. 將隊列和交換機綁定channel.queueBind(Constants.DIRECT_QUEUE1, Constants.DIRECT_EXCHANGE, "a");channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "a");channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "b");channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "c");// TODO 6. 發送消息List<String> list = List.of("a", "b", "c", "d");Random random = new Random();for (int i = 0; i < 10; i++) {String routingKey = list.get(random.nextInt(3));String msg = "hello routing mode~:" + routingKey;System.out.println(msg);channel.basicPublish(Constants.DIRECT_EXCHANGE, routingKey, null, msg.getBytes(StandardCharsets.UTF_8));}System.out.println("消息發送成功");// TODO 7. 釋放資源channel.close(); // ! 先關閉 channelconnection.close();}
}

消費者1

package mq.direct;import com.rabbitmq.client.*;
import mq.Constants.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();// TODO 2. 開啟信道Channel channel = connection.createChannel();// TODO 3. 聲明隊列,可以省略(如果生產者未聲明隊列的話,消費者也未聲明隊列則會報錯,因為不知道和哪個隊列綁定了)channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);// TODO 4. 接收消息DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:" + new String(body));}};channel.basicConsume(Constants.DIRECT_QUEUE1, true, consumer);// TODO 5. 釋放資源
//        channel.close(); // ! 先關閉 channel
//        connection.close();}
}

消費者2

package mq.direct;import com.rabbitmq.client.*;
import mq.Constants.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();// TODO 2. 開啟信道Channel channel = connection.createChannel();// TODO 3. 聲明隊列,可以省略(如果生產者未聲明隊列的話,消費者也未聲明隊列則會報錯,因為不知道和哪個隊列綁定了)channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);// TODO 4. 接收消息DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:" + new String(body));}};channel.basicConsume(Constants.DIRECT_QUEUE2, true, consumer);// TODO 5. 釋放資源
//        channel.close(); // ! 先關閉 channel
//        connection.close();}
}

Topics(通配符模式)

生產者

package mq.topic;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import mq.Constants.Constants;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();// TODO 2. 開啟隧道Channel channel = connection.createChannel();// TODO 3. 聲明交換機channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);// TODO 4. 聲明隊列channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);// TODO 5. 將隊列和交換機綁定channel.queueBind(Constants.TOPIC_QUEUE1, Constants.TOPIC_EXCHANGE, "*.a.*");channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "*.*.b");channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "c.#");// TODO 6. 發送消息String msg1 = "hello topic mode~:ef.a.c";channel.basicPublish(Constants.TOPIC_EXCHANGE, "ef.a.c", null, msg1.getBytes(StandardCharsets.UTF_8)); // ? 轉發到 Q1String msg2 = "hello topic mode~:rr.a.b";channel.basicPublish(Constants.TOPIC_EXCHANGE, "rr.a.b", null, msg2.getBytes(StandardCharsets.UTF_8)); // ? 轉發到 Q1,Q2String msg3 = "hello topic mode~:c.com.ljh";channel.basicPublish(Constants.TOPIC_EXCHANGE, "c.com.ljh", null, msg3.getBytes(StandardCharsets.UTF_8)); // ? 轉發到 Q2System.out.println("消息發送成功");// TODO 7. 釋放資源channel.close(); // ! 先關閉 channelconnection.close();}
}

消費者1

package mq.topic;import com.rabbitmq.client.*;
import mq.Constants.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();// TODO 2. 開啟信道Channel channel = connection.createChannel();// TODO 3. 聲明隊列,可以省略(如果生產者未聲明隊列的話,消費者也未聲明隊列則會報錯,因為不知道和哪個隊列綁定了)channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);// TODO 4. 接收消息DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:" + new String(body));}};channel.basicConsume(Constants.TOPIC_QUEUE1, true, consumer);// TODO 5. 釋放資源
//        channel.close(); // ! 先關閉 channel
//        connection.close();}
}

消費者2

package mq.topic;import com.rabbitmq.client.*;
import mq.Constants.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();// TODO 2. 開啟信道Channel channel = connection.createChannel();// TODO 3. 聲明隊列,可以省略(如果生產者未聲明隊列的話,消費者也未聲明隊列則會報錯,因為不知道和哪個隊列綁定了)channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);// TODO 4. 接收消息DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:" + new String(body));}};channel.basicConsume(Constants.TOPIC_QUEUE2, true, consumer);// TODO 5. 釋放資源
//        channel.close(); // ! 先關閉 channel
//        connection.close();}
}

RPC(遠程過程調用模式)

服務端

package mq.rpc;import com.rabbitmq.client.*;
import mq.Constants.Constants;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;public class RPCServer {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();// TODO 2. 開啟隧道Channel channel = connection.createChannel();// TODO 2.1 聲明隊列channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);DefaultConsumer defaultConsumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String request = new String(body);System.out.println("[RPC Server 接收到請求]:" + request);String response = "針對請求:" + request + " 做出響應:" + "🫡";// TODO 4. 發送響應AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes(StandardCharsets.UTF_8));// TODO 5. 確認收到channel.basicAck(envelope.getDeliveryTag(), false);}};// TODO 3. 接收請求channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, defaultConsumer);}
}

客戶端

package mq.rpc;import com.rabbitmq.client.*;
import mq.Constants.Constants;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeoutException;public class RPCClient {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {// TODO 1. 建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();// TODO 2. 開啟隧道Channel channel = connection.createChannel();// TODO 2.1 聲明隊列channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);// TODO 3. 發送請求String correlationId = UUID.randomUUID().toString();AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().correlationId(correlationId) // ? 唯一標識,標識接收該 ID 的響應.replyTo(Constants.RPC_RESPONSE_QUEUE).build();String msg = "hello RPC mode~:" + correlationId;channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, properties, msg.getBytes(StandardCharsets.UTF_8));// TODO 4. 接收響應BlockingQueue<String> queue = new LinkedBlockingQueue<>(1);DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String resp = new String(body);System.out.println("接收到回調消息:" + resp);if (properties.getCorrelationId().equals(correlationId)) {queue.offer(resp);}}};channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);String res = queue.take();// ! 若沒有對應的消息,程序會在這里阻塞System.out.println("[RPC Client接收到符合 ID 的消息]:" + res);}
}

Publisher Confirms(發布確認模式)

發布確認

package mq.publisher.confirms;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import mq.Constants.Constants;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;public class PublisherConfirms {private static final int MAX_MESSAGE = 10000;static Connection createConnection() throws Exception {// TODO 1. 建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);return connectionFactory.newConnection();}public static void main(String[] args) throws Exception {// * Strategy #1: Publishing Messages Individually
//        publishingMessagesIndividually();// * Strategy #2: Publishing Messages in BatchespublishingMessagesInBatches();// * Strategy #3: Handling Publisher Confirms AsynchronouslyhandlingPublisherConfirmsAsynchronously();}private static void handlingPublisherConfirmsAsynchronously() throws Exception {try (Connection connection = createConnection()) {// TODO 2. 開啟信道Channel channel = connection.createChannel();// TODO 3. 開啟發布確認機制channel.confirmSelect();// TODO 4. 聲明隊列channel.queueDeclare(Constants.P_CONFIRMS_QUEUE3, true, false, false, null);SortedSet<Long> sortedSet = Collections.synchronizedSortedSet(new TreeSet<>());// TODO 5. 監聽來自 Broker 的 ack 或 nackchannel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {if (multiple) {sortedSet.headSet(deliveryTag + 1).clear();} else {sortedSet.remove(deliveryTag);}}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {if (multiple) {sortedSet.headSet(deliveryTag + 1).clear();} else {sortedSet.remove(deliveryTag);}// TODO 5.1 根據業務邏輯處理消息重傳}});Long start = System.currentTimeMillis();// TODO 6. 發送消息for (int i = 0; i < MAX_MESSAGE; i++) {String msg = "hello Publisher Confirms~:" + i;Long ackSeq = channel.getNextPublishSeqNo();sortedSet.add(ackSeq);channel.basicPublish("", Constants.P_CONFIRMS_QUEUE3, null, msg.getBytes(StandardCharsets.UTF_8));}while (!sortedSet.isEmpty()) {
//                Thread.sleep(10);}Long end = System.currentTimeMillis();System.out.printf("異步確認策略,消息條數:%d;總耗時:%d ms\n", MAX_MESSAGE, end - start);}}private static void publishingMessagesInBatches() throws Exception {try (Connection connection = createConnection()) {// TODO 2. 開啟信道Channel channel = connection.createChannel();// TODO 3. 開啟發布確認機制channel.confirmSelect();// TODO 4. 聲明隊列channel.queueDeclare(Constants.P_CONFIRMS_QUEUE2, true, false, false, null);Long start = System.currentTimeMillis();// TODO 5. 發送消息int batchSize = 100, outstandingMessageCnt = 0;for (int i = 0; i < MAX_MESSAGE; i++) {String msg = "hello Publisher Confirms~:" + i;channel.basicPublish("", Constants.P_CONFIRMS_QUEUE2, null, msg.getBytes(StandardCharsets.UTF_8));outstandingMessageCnt++;if (outstandingMessageCnt >= batchSize) {channel.waitForConfirms(5_000);outstandingMessageCnt = 0;}}if (outstandingMessageCnt > 0) {channel.waitForConfirms(5_000);}Long end = System.currentTimeMillis();System.out.printf("批量確認策略,消息條數:%d;總耗時:%d ms\n", MAX_MESSAGE, end - start);}}private static void publishingMessagesIndividually() throws Exception {try (Connection connection = createConnection()) {// TODO 2. 開啟信道Channel channel = connection.createChannel();// TODO 3. 開啟發布確認機制channel.confirmSelect();// TODO 4. 聲明隊列channel.queueDeclare(Constants.P_CONFIRMS_QUEUE1, true, false, false, null);Long start = System.currentTimeMillis();// TODO 5. 發布消息for (int i = 0; i < MAX_MESSAGE; i++) {String msg = "hello Publisher Confirms~:" + i;channel.basicPublish("", Constants.P_CONFIRMS_QUEUE1, null, msg.getBytes(StandardCharsets.UTF_8));// TODO 5.1 等待 5s 收到來自 broker 的確認消息channel.waitForConfirms(5_000);}Long end = System.currentTimeMillis();System.out.printf("單獨確認策略,消息條數:%d;總耗時:%d ms\n", MAX_MESSAGE, end - start);}}
}
em.out.printf("批量確認策略,消息條數:%d;總耗時:%d ms\n", MAX_MESSAGE, end - start);}}private static void publishingMessagesIndividually() throws Exception {try (Connection connection = createConnection()) {// TODO 2. 開啟信道Channel channel = connection.createChannel();// TODO 3. 開啟發布確認機制channel.confirmSelect();// TODO 4. 聲明隊列channel.queueDeclare(Constants.P_CONFIRMS_QUEUE1, true, false, false, null);Long start = System.currentTimeMillis();// TODO 5. 發布消息for (int i = 0; i < MAX_MESSAGE; i++) {String msg = "hello Publisher Confirms~:" + i;channel.basicPublish("", Constants.P_CONFIRMS_QUEUE1, null, msg.getBytes(StandardCharsets.UTF_8));// TODO 5.1 等待 5s 收到來自 broker 的確認消息channel.waitForConfirms(5_000);}Long end = System.currentTimeMillis();System.out.printf("單獨確認策略,消息條數:%d;總耗時:%d ms\n", MAX_MESSAGE, end - start);}}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/79744.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/79744.shtml
英文地址,請注明出處:http://en.pswp.cn/web/79744.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

(2)python開發經驗

文章目錄 1 pyside6加載ui文件2 使用pyinstaller打包 更多精彩內容&#x1f449;內容導航 &#x1f448;&#x1f449;Qt開發 &#x1f448;&#x1f449;python開發 &#x1f448; 1 pyside6加載ui文件 方法1&#xff1a; 直接加載ui文件 from PySide6.QtWidgets import QAp…

【C++】互斥鎖(Mutex)

在C中&#xff0c;互斥鎖&#xff08;Mutex&#xff09;是用于線程同步的重要工具&#xff0c;用于保護共享資源&#xff0c;防止多線程同時訪問導致的數據競爭&#xff08;Data Race&#xff09;問題。 以下是C中互斥鎖的核心用法和示例&#xff1a; 一、基本互斥鎖 std::mut…

Jsoup與HtmlUnit:兩大Java爬蟲工具對比解析

Jsoup&#xff1a;HTML解析利器 定位&#xff1a;專注HTML解析的輕量級庫&#xff08;也就是快&#xff0c;但動態頁面無法抓取&#xff09; 核心能力&#xff1a; DOM樹解析與CSS選擇器查詢 HTML凈化與格式化 支持元素遍歷與屬性提取 應用場景&#xff1a;靜態頁面數據抽…

小白成長之路-vim編輯

文章目錄 Vim一、命令模式二、插入模式3.a:進入插入模式&#xff0c;在當前光標的后一個字符插入![在這里插入圖片描述](https://i-blog.csdnimg.cn/direct/fd293c3832ed49e2974abfbb63eeb5bb.png)4.o: 在當前光標的下一行插入5.i:在當前光標所在字符插入&#xff0c;返回命令模…

[redis進階六]詳解redis作為緩存分布式鎖

目錄 一 什么是緩存 緩存總結板書: 二 使?Redis作為緩存 三 緩存的更新策略 1) 定期?成 2) 實時?成 四 面試重點:緩存預熱,緩存穿透,緩存雪崩 和緩存擊穿 1)緩存預熱 2)緩存穿透 3)緩存雪崩 4)緩存擊穿 五 分布式鎖 板書: 1)什么是分布式鎖 2)分布式鎖的基…

【MySQL】數據表插入數據

個人主頁&#xff1a;Guiat 歸屬專欄&#xff1a;MySQL 文章目錄 1. 插入數據概述1.1 插入數據的重要性1.2 插入數據的基本原則 2. 基本插入語句2.1 INSERT INTO語法2.2 插入多行數據2.3 不指定列名的插入2.4 插入NULL和默認值 3. 高級插入技術3.1 使用子查詢插入數據3.2 IGNOR…

軟考-軟件設計師中級備考 14、刷題 算法

一、考點歸納 1&#xff09;排序 2、查找 3、復雜度 4、經典問題 0 - 1 背包動態規劃0 - 1 背包問題具有最優子結構性質和重疊子問題性質。通過動態規劃可以利用一個二維數組來記錄子問題的解&#xff0c;避免重復計算&#xff0c;從而高效地求解出背包能裝下的最大價值。分…

【阿里云】阿里云 Ubuntu 服務器無法更新 systemd(Operation not permitted)的解決方法

零、前言 目前正在使用的Ubuntu服務器中&#xff0c;僅阿里云&#xff08;不止一臺&#xff09;出現了這個問題&#xff0c;因此我判定是阿里云服務器獨有的問題。如果你的服務器提供商不是阿里云&#xff0c;那么這篇文章可能對你沒有幫助。 如果已經因為升級錯誤導致依賴沖突…

css 點擊后改變樣式

背景&#xff1a; 期望實現效果&#xff1a;鼠標點擊之后&#xff0c;保持選中樣式。 實現思路&#xff1a;在css樣式中&#xff0c;:active 是一種偽類&#xff0c;用于表示用戶當前正在與被選定的元素進行交互。當用戶點擊或按住鼠標時&#xff0c;元素將被激活&#xff0c;此…

采用AI神經網絡降噪算法的語言降噪消回音處理芯片NR2049-P

隨著AI時代來臨.通話設備的環境噪音抑制也進入AI降噪算法時代. AI神經網絡降噪技術是一款革命性的語音處理技術&#xff0c;他突破了傳統單麥克風和雙麥克風降噪的局限性,利用采集的各種日常環境中的噪音樣本進行訓練學習.讓降噪算法具有自適應噪聲抑制功能&#xff0c;可以根…

不用聯網不用編程,PLC通過智能網關快速實現HTTP協議JSON格式與MES等系統平臺雙向數據通訊

智能網關IGT-DSER集成了多種PLC的原廠協議&#xff0c;方便實現各種PLC、智能儀表通過HTTP協議與MES等各種系統平臺通訊對接。PLC內不用編寫程序&#xff0c;設備不用停機&#xff0c;通過網關的參數配置軟件(下載地址)配置JSON文件的字段與PLC寄存器地址等參數即可。 …

如何將兩臺虛擬機進行搭橋

將兩臺虛擬機實現網絡互通&#xff08;“搭橋”&#xff09;需配置虛擬網絡&#xff0c;以下是基于 VMware Workstation 和 VirtualBox 的詳細操作指南&#xff08;以 Windows 系統為例&#xff0c;Linux 原理類似&#xff09;&#xff1a; 一、VMware Workstation 配置&#x…

Xianyu AutoAgent,AI閑魚客服機器人

Xianyu AutoAgent是一款專為閑魚平臺開發的智能客服機器人系統&#xff0c;旨在提供全天候的自動化服務。它具備多專家協同決策、智能議價和上下文感知對話等功能&#xff0c;能夠管理輕量級的對話記憶&#xff0c;利用完整的對話歷史為用戶提供更自然的交流體驗。 Xianyu Aut…

鍵盤輸出希臘字符方法

在不同操作系統中&#xff0c;輸出希臘字母的方法有所不同。以下是針對 Windows 和 macOS 系統的詳細方法&#xff0c;以及一些通用技巧&#xff1a; 1.Windows 系統 1.1 使用字符映射表 字符映射表是一個內置工具&#xff0c;可以方便地找到并插入希臘字母。 ? 步驟&#xf…

什么是SparkONYarn模式

1. 什么是 Spark on YARN&#xff1f; Spark on YARN 是 Apache Spark 的一種部署模式&#xff0c;允許 Spark 應用程序在 Hadoop YARN 集群上運行&#xff0c;充分利用 YARN 的資源管理和調度能力。這種模式將 Spark 與 Hadoop 生態深度集成&#xff0c;使企業能夠在同一集群…

【git】clone項目后續,github clone的網絡配置,大型項目git log 輸出txt,切換commit學習,goland遠程,自存檔

git網絡配置&#xff0c;解決git clone github速度奇慢 git config --global http.proxy http://127.0.0.1:7897 git config --global https.proxy http://127.0.0.1:7897git log輸出到文件&#xff08;便于checkout&#xff09; 這里有些字符如表情會亂碼&#xff0c;不知道…

Java游戲服務器開發流水賬(3)游戲數據的緩存簡介

簡介 游戲服務器數據緩存是一種在游戲服務器運行過程中&#xff0c;用于臨時存儲經常訪問的數據的技術手段&#xff0c;旨在提高游戲性能、降低數據庫負載以及優化玩家體驗。游戲開發中數據的緩存可以使用Java自身的內存也可以使用MemCache&#xff0c;Redis&#xff0c;注意M…

STL?vector!!!

一、前言 之前我們借助手撕string加深了類和對象相關知識&#xff0c;今天我們將一起手撕一個vector&#xff0c;繼續深化類和對象、動態內存管理、模板的相關知識 二、vector相關的前置知識 1、什么是vector&#xff1f; vector是一個STL庫中提供的類模板&#xff0c;它是存儲…

C++學習之路,從0到精通的征途:繼承

目錄 一.繼承的概念及定義 1.繼承的概念 2.繼承的定義 (1)繼承的定義格式 (2)繼承基類成員訪問方式的變化 二.基類與派生類間的轉換 1.派生類對象賦值給基類的引用/指針 2. 派生類對象直接賦值給基類對象 三.繼承的作用域 四.派生類的默認成員函數 1.構造函數 2.拷…

用vue和go實現登錄加密

前端使用CryptoJS默認加密方法&#xff1a; var pass CryptoJS.AES.encrypt(formData.password, key.value).toString()使用 CryptoJS.AES.encrypt() 時不指定加密模式和參數時&#xff0c;CryptoJS 默認會執行以下操作 var encrypted CryptoJS.AES.encrypt("明文&quo…