rabbitmq的七種模式?
?Hello word
客戶端引入依賴
<!--rabbitmq 依賴客戶端--><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.8.0</version></dependency>
生產者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {//創建一個連接工廠ConnectionFactory factory = new ConnectionFactory();factory.setHost("43.139.59.28");factory.setUsername("guest");factory.setPassword("guest");//channel 實現了自動 close 接口 自動關閉 不需要顯示關閉try(Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {/*** 生成一個隊列* 1.隊列名稱* 2.隊列里面的消息是否持久化 默認消息存儲在內存中* 3.該隊列是否只供一個消費者進行消費 是否進行共享 true 可以多個消費者消費* 4.是否自動刪除 最后一個消費者端開連接以后 該隊列是否自動刪除 true 自動刪除* 5.其他參數*/channel.queueDeclare(QUEUE_NAME,false,false,false,null);String message="hello world!!!!";/*** 發送一個消息* 1.發送到那個交換機* 2.路由的 key 是哪個* 3.其他的參數信息* 4.發送消息的消息體*/channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("消息發送完畢");}}
}
消費者
import com.rabbitmq.client.*;public class Consumer {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("43.139.59.28");factory.setUsername("guest");factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();System.out.println("等待接收消息....");//推送的消息如何進行消費的接口回調DeliverCallback deliverCallback=(consumerTag, delivery)->{String message= new String(delivery.getBody());System.out.println(message);};//取消消費的一個回調接口 如在消費的時候隊列被刪除掉了CancelCallback cancelCallback=(consumerTag)->{System.out.println("消息消費被中斷");};/*** 消費者消費消息* 1.消費哪個隊列* 2.消費成功之后是否要自動應答 true 代表自動應答 false 手動應答* 3.消費者未成功消費的回調*/channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}
獲取消息
?工作隊列
封裝獲取getChannel的工具類
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class RabbitMqUtils {//得到一個連接的 channelpublic static Channel getChannel() throws Exception{//創建一個連接工廠ConnectionFactory factory = new ConnectionFactory();factory.setHost("43.139.59.28");factory.setUsername("guest");factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();return channel;}
}
接收消息工作1線程
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Worker01 {private static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();DeliverCallback deliverCallback=(consumerTag, delivery)->{String receivedMessage = new String(delivery.getBody());System.out.println("接收到消息:"+receivedMessage);};CancelCallback cancelCallback=(consumerTag)->{System.out.println(consumerTag+"消費者取消消費接口回調邏輯");};System.out.println("C1 消費者啟動等待消費......");channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}
?接收消息工作線程2
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Worker02 {private static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();DeliverCallback deliverCallback=(consumerTag, delivery)->{String receivedMessage = new String(delivery.getBody());System.out.println("接收到消息:"+receivedMessage);};CancelCallback cancelCallback=(consumerTag)->{System.out.println(consumerTag+"消費者取消消費接口回調邏輯");};System.out.println("C2 消費者啟動等待消費......");channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}
發送10次消息
線程1和線程2平分消息
發布訂閱?
?RabbitMQ 消息傳遞模型的核心思想是: 生產者生產的消息從不會直接發送到隊列。實際上,通常生產者甚至都不知道這些消息傳遞傳遞到了哪些隊列中。
總共有以下類型: 直接(direct), 主題 (topic) , 標題 (headers) , 扇出 (fanout)
fanout
接收者1?
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class ReceiveLogs01 {private static final String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");/*** 生成一個臨時的隊列 隊列的名稱是隨機的* 當消費者斷開和該隊列的連接時 隊列自動刪除*/String queueName = channel.queueDeclare().getQueue();//把該臨時隊列綁定我們的 exchange 其中 routingkey(也稱之為 binding key)為空字符串channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println("等待接收消息,把接收到的消息打印在屏幕.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("控制臺打印接收到的消息"+message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}
}
接收者2
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.apache.commons.io.FileUtils;import java.io.File;public class ReceiveLogs02 {private static final String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");/*** 生成一個臨時的隊列 隊列的名稱是隨機的* 當消費者斷開和該隊列的連接時 隊列自動刪除*/String queueName = channel.queueDeclare().getQueue();//把該臨時隊列綁定我們的 exchange 其中 routingkey(也稱之為 binding key)為空字符串channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println("等待接收消息,把接收到的消息寫到文件.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");
// File file = new File("C:\\work\\rabbitmq_info.txt");
// FileUtils.writeStringToFile(file,message,"UTF-8");System.out.println("數據寫入文件成功"+message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}
}
?發送者
import com.rabbitmq.client.Channel;import java.util.Scanner;public class EmitLog {private static final String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {/*** 聲明一個 exchange* 1.exchange 的名稱* 2.exchange 的類型*/channel.exchangeDeclare(EXCHANGE_NAME, "fanout");Scanner sc = new Scanner(System.in);System.out.println("請輸入信息");while (sc.hasNext()) {String message = sc.nextLine();channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));System.out.println("生產者發出消息" + message);}}}
}
?結果
?Direct
接收者1 ,寫入錯誤日志
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.apache.commons.io.FileUtils;import java.io.File;public class ReceiveLogsDirect01 {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);String queueName = "disk";channel.queueDeclare(queueName, false, false, false, null);channel.queueBind(queueName, EXCHANGE_NAME, "error");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");message="接收綁定鍵:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message;
// File file = new File("C:\\work\\rabbitmq_info.txt");
// FileUtils.writeStringToFile(file,message,"UTF-8");System.out.println("錯誤日志已經接收"+message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}
}
?接收者2,打印控制臺信息
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class ReceiveLogsDirect02 {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);String queueName = "console";channel.queueDeclare(queueName, false, false, false, null);channel.queueBind(queueName, EXCHANGE_NAME, "info");channel.queueBind(queueName, EXCHANGE_NAME, "warning");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" 接收綁定鍵 :"+delivery.getEnvelope().getRoutingKey()+", 消息:"+message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}
}
?發送者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;import java.util.HashMap;
import java.util.Map;public class EmitLogDirect {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] argv) throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//創建多個 bindingKeyMap<String, String> bindingKeyMap = new HashMap<>();bindingKeyMap.put("info","普通 info 信息");bindingKeyMap.put("warning","警告 warning 信息");bindingKeyMap.put("error","錯誤 error 信息");//debug 沒有消費這接收這個消息 所有就丟失了bindingKeyMap.put("debug","調試 debug 信息");for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){String bindingKey = bindingKeyEntry.getKey();String message = bindingKeyEntry.getValue();channel.basicPublish(EXCHANGE_NAME,bindingKey, null, message.getBytes("UTF-8"));System.out.println("生產者發出消息:" + message);}}}
}
接收到信息
Topics ?
主題1?
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class ReceiveLogsTopic01 {private static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, "topic");//聲明 Q1 隊列與綁定關系String queueName="Q1";channel.queueDeclare(queueName, false, false, false, null);channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" 接收隊列 :"+queueName+" 綁 定 鍵:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}
}
?主題2
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class ReceiveLogsTopic02 {private static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, "topic");//聲明 Q2 隊列與綁定關系String queueName="Q2";channel.queueDeclare(queueName, false, false, false, null);channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" 接收隊列 :"+queueName+" 綁 定 鍵:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}
}
發送者
import com.rabbitmq.client.Channel;import java.util.HashMap;
import java.util.Map;public class EmitLogTopic {private static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] argv) throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "topic");/*** Q1-->綁定的是* 中間帶 orange 帶 3 個單詞的字符串(*.orange.*)* Q2-->綁定的是* 最后一個單詞是 rabbit 的 3 個單詞(*.*.rabbit)* 第一個單詞是 lazy 的多個單詞(lazy.#)**/Map<String, String> bindingKeyMap = new HashMap<>();bindingKeyMap.put("quick.orange.rabbit","被隊列 Q1Q2 接收到");bindingKeyMap.put("lazy.orange.elephant","被隊列 Q1Q2 接收到");bindingKeyMap.put("quick.orange.fox","被隊列 Q1 接收到");bindingKeyMap.put("lazy.brown.fox","被隊列 Q2 接收到");bindingKeyMap.put("lazy.pink.rabbit","雖然滿足兩個綁定但只被隊列 Q2 接收一次");bindingKeyMap.put("quick.brown.fox","不匹配任何綁定不會被任何隊列接收到會被丟棄");bindingKeyMap.put("quick.orange.male.rabbit","是四個單詞不匹配任何綁定會被丟棄");bindingKeyMap.put("lazy.orange.male.rabbit","是四個單詞但匹配 Q2");for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){String bindingKey = bindingKeyEntry.getKey();String message = bindingKeyEntry.getValue();channel.basicPublish(EXCHANGE_NAME,bindingKey, null, message.getBytes("UTF-8"));System.out.println("生產者發出消息" + message);}}}
}
?結果