上一篇文章有講到rabbitmq的安裝、web管理端和springboot簡單集成rabbitmq
本文重點介紹rabbitmq相關api的使用
按照官網常用的五種模式的順序:HelloWorld、Work queues、Publish/Subscribe、Routing、Topics
模式簡單介紹
HelloWorld
一個生產者,一個隊列,一個消費者。
一個demo,實際很少使用。
Work queues
在多個消費者之間分配任務,競爭消費模式。
Publish/Subscribe
發布訂閱模式,同時向多個消費者發送消息。
Routing
選擇性的接收消息
Topics
基于表達式接收消息
模式具體使用(rabbitmqclient)
HelloWorld
創建maven項目并且引入依賴
<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.9.0</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version></dependency></dependencies>
創建工具類,用于處理連接和信道的創建,以及他們的關閉
package org.cc;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ConnectionUtils {public static Connection createConnection() throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();
// connectionFactory.setHost("localhost");//默認主機:localhost
// connectionFactory.setPort(5672);//默認端口5672
// connectionFactory.setUsername("guest");//默認用戶名:guest
// connectionFactory.setPassword("guest");//默認密碼:guest
// connectionFactory.setVirtualHost("/");//默認虛擬主機:/return connectionFactory.newConnection();}public static void closeConnection(Connection connection, Channel channel) throws IOException, TimeoutException {channel.close();connection.close();}
}
創建消費者
package org.cc;import com.rabbitmq.client.*;
import org.junit.Test;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class HelloWorldConsumer {@Testpublic void sendMsg() throws IOException, TimeoutException {Connection connection = ConnectionUtils.createConnection();Channel channel = connection.createChannel();/*聲明一個名稱是my helloworld queue,持久化,非獨享,非自動刪除的隊列durable – 是否持久化,為true時重啟rabbitmq服務,會保留原有的隊列exclusive – 是否獨享,為true時連接一旦斷開,會自動刪除隊列autoDelete 是否自動刪除,為true時一旦隊列被消費,會自動刪除隊列*///若隊列已存在,這些參數必須與隊列一致,若隊列不存在則創建channel.queueDeclare("my helloworld queue",true,false,false,null);channel.basicConsume("my helloworld queue",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("helloworld consumer接收到消息:"+new String(body));}});System.in.read();//保持消費者一直監聽隊列}
}
創建生產者
package org.cc;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;public class HelloWorldProducer {@Testpublic void sendMsg() throws IOException, TimeoutException {Connection connection = ConnectionUtils.createConnection();Channel channel = connection.createChannel();/*聲明一個名稱是my helloworld queue,持久化,非獨享,非自動刪除的隊列durable – 是否持久化,為true時重啟rabbitmq服務,會保留原有的隊列exclusive – 是否獨享,為true時連接一旦斷開,會自動刪除隊列autoDelete 是否自動刪除,為true時一旦隊列被消費,會自動刪除隊列*///若隊列已存在,這些參數必須與隊列一致,若隊列不存在則創建channel.queueDeclare("my helloworld queue",true,false,false,null);channel.basicPublish("","my helloworld queue",null,"helloworld消息內容".getBytes(StandardCharsets.UTF_8));ConnectionUtils.closeConnection(connection,channel);}
}
?若要保證rabbitmq重啟后消息仍然存在,生產者發送消息時需要設置props參數
channel.basicPublish("","my helloworld queue", MessageProperties.PERSISTENT_TEXT_PLAIN,"helloworld消息內容".getBytes(StandardCharsets.UTF_8));
?開啟手動ack,消費者接收到消息時,需要手動發送ack確認后消息才會真正從隊列中刪除
channel.basicConsume("my helloworld queue",false,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("helloworld consumer接收到消息:"+new String(body));channel.basicAck(envelope.getDeliveryTag(),false);}});
Work queues
創建消費者,與上面helloworld模式代碼基本一致,將原有的創建消費者的代碼重復一遍
package org.cc;import com.rabbitmq.client.*;
import org.junit.Test;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class HelloWorldConsumer {@Testpublic void sendMsg() throws IOException, TimeoutException {Connection connection = ConnectionUtils.createConnection();Channel channel = connection.createChannel();/*聲明一個名稱是my helloworld queue,持久化,非獨享,非自動刪除的隊列durable – 是否持久化,為true時重啟rabbitmq服務,會保留原有的隊列exclusive – 是否獨享,為true時連接一旦斷開,會自動刪除隊列autoDelete 是否自動刪除,為true時一旦隊列被消費,會自動刪除隊列*///若隊列已存在,這些參數必須與隊列一致,若隊列不存在則創建channel.queueDeclare("my helloworld queue",true,false,false,null);channel.basicConsume("my helloworld queue",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("helloworld consumer接收到消息:"+new String(body));}});System.in.read();//保持消費者一直監聽隊列}
}
創建生產者,與上面helloworld模式代碼基本一致,這里連續發送10條消息
package org.cc;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import org.junit.Test;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;public class WorkProducer {@Testpublic void sendMsg() throws IOException, TimeoutException {Connection connection = ConnectionUtils.createConnection();Channel channel = connection.createChannel();channel.queueDeclare("my work queue",true,false,false,null);for (int i = 0; i < 10; i++) {channel.basicPublish("","my work queue", MessageProperties.PERSISTENT_TEXT_PLAIN,("work消息內容"+i).getBytes(StandardCharsets.UTF_8));}ConnectionUtils.closeConnection(connection,channel);}
}
?從消費者的控制臺可以看到兩個消費者輪流接收到消息
Publish/Subscribe
消費者
package org.cc;import com.rabbitmq.client.*;
import org.junit.Test;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Subscriber {@Testpublic void receive() throws IOException, TimeoutException {Connection connection = ConnectionUtils.createConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("fanout exchange", BuiltinExchangeType.FANOUT);channel.queueDeclare("my fanout queue1",true,false,false,null);channel.queueBind("my fanout queue1","fanout exchange","");channel.basicConsume("my fanout queue1",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("my fanout queue1 consumer接收到消息:"+new String(body));}});Connection connection1 = ConnectionUtils.createConnection();Channel channel1 = connection1.createChannel();channel1.queueDeclare("my fanout queue2",true,false,false,null);channel.queueBind("my fanout queue2","fanout exchange","");channel1.basicConsume("my fanout queue2",true,new DefaultConsumer(channel1){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("my fanout queue2 consumer接收到消息:"+new String(body));}});System.in.read();//保持消費者一直監聽隊列}
}
生產者
package org.cc;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import org.junit.Test;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;public class Publisher {@Testpublic void sendMsg() throws IOException, TimeoutException {Connection connection = ConnectionUtils.createConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("fanout exchange", BuiltinExchangeType.FANOUT);channel.basicPublish("fanout exchange","", MessageProperties.PERSISTENT_TEXT_PLAIN,"fanout exchange消息內容".getBytes(StandardCharsets.UTF_8));ConnectionUtils.closeConnection(connection,channel);}
}
?隊列需要同交換機綁定,生產者向交換機發送消息
Routing
消費者?
package org.cc;import com.rabbitmq.client.*;
import org.junit.Test;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class RoutingKeyConsumer {@Testpublic void receive() throws IOException, TimeoutException {Connection connection = ConnectionUtils.createConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("direct exchange", BuiltinExchangeType.DIRECT);channel.queueDeclare("my direct queue1",true,false,false,null);channel.queueBind("my direct queue1","direct exchange","info");channel.basicConsume("my direct queue1",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("my direct queue1 consumer接收到消息:"+new String(body));}});Connection connection1 = ConnectionUtils.createConnection();Channel channel1 = connection1.createChannel();channel1.queueDeclare("my direct queue2",true,false,false,null);channel.queueBind("my direct queue2","direct exchange","info");channel.queueBind("my direct queue2","direct exchange","error");channel1.basicConsume("my direct queue2",true,new DefaultConsumer(channel1){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("my direct queue2 consumer接收到消息:"+new String(body));}});System.in.read();//保持消費者一直監聽隊列}
}
生產者
package org.cc;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import org.junit.Test;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;public class RoutingProducer {@Testpublic void sendMsg() throws IOException, TimeoutException {Connection connection = ConnectionUtils.createConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("direct exchange", BuiltinExchangeType.DIRECT);channel.basicPublish("direct exchange","info", MessageProperties.PERSISTENT_TEXT_PLAIN,"direct exchange info消息內容".getBytes(StandardCharsets.UTF_8));channel.basicPublish("direct exchange","error", MessageProperties.PERSISTENT_TEXT_PLAIN,"direct exchange error消息內容".getBytes(StandardCharsets.UTF_8));ConnectionUtils.closeConnection(connection,channel);}
}
Topics
交換機路由消息給隊列時基于表達式,*匹配1個,#配置0個或1個或多個
例如:當隊列1的路由值設置user.*,隊列2的路由值設置user.#時,向交換機分別發送四條消息,消息的路由值分別為user.insert、user.insert.a、user.、user
此時隊列1會收到路由值為user.insert和user.的消息,隊列1能收到上面全部四條消息
消費者代碼
package org.cc;import com.rabbitmq.client.*;
import org.junit.Test;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class TopicsConsumer {@Testpublic void receive() throws IOException, TimeoutException {Connection connection = ConnectionUtils.createConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("topic exchange", BuiltinExchangeType.TOPIC);channel.queueDeclare("my topic queue1",true,false,false,null);channel.queueBind("my topic queue1","topic exchange","user.*");channel.basicConsume("my topic queue1",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("my topic queue1 consumer接收到消息:"+new String(body));}});Connection connection1 = ConnectionUtils.createConnection();Channel channel1 = connection1.createChannel();channel1.queueDeclare("my topic queue2",true,false,false,null);channel.queueBind("my topic queue2","topic exchange","user.#");channel1.basicConsume("my topic queue2",true,new DefaultConsumer(channel1){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("my topic queue2 consumer接收到消息:"+new String(body));}});System.in.read();//保持消費者一直監聽隊列}
}
生產者代碼
package org.cc;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import org.junit.Test;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;public class TopicsProducer {@Testpublic void sendMsg() throws IOException, TimeoutException {Connection connection = ConnectionUtils.createConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("topic exchange", BuiltinExchangeType.TOPIC);channel.basicPublish("topic exchange","user.insert", MessageProperties.PERSISTENT_TEXT_PLAIN,"topic exchange user.insert消息內容".getBytes(StandardCharsets.UTF_8));channel.basicPublish("topic exchange","user.insert.a", MessageProperties.PERSISTENT_TEXT_PLAIN,"topic exchange user.insert.a消息內容".getBytes(StandardCharsets.UTF_8));channel.basicPublish("topic exchange","user.", MessageProperties.PERSISTENT_TEXT_PLAIN,"topic exchange user.消息內容".getBytes(StandardCharsets.UTF_8));channel.basicPublish("topic exchange","user", MessageProperties.PERSISTENT_TEXT_PLAIN,"topic exchange user消息內容".getBytes(StandardCharsets.UTF_8));ConnectionUtils.closeConnection(connection,channel);}
}
模式具體使用(springboot集成rabbitmq)
使用idea構建項目,選擇spring initializer,創建生產者項目springboot-rabbitmq-producer
??dependencies選擇如下
?application.properties設置如下
?使用同樣的方式創建消費者項目springboot-rabbitmq-consumer,將server.port設置為8081
當前springboot版本最新為2.6.3
HelloWorld
創建消費者并啟動消費者應用
package com.example.springbootrabbitmqconsumer.helloworld;import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** springboot-rabbitmq-producer** @author v_choncheng* @description* @create 2022-02-15 14:42*/
@Component
@RabbitListener(queuesToDeclare = @Queue("helloworld"))
public class HelloWorldConsumer {@RabbitHandlerpublic void receive(String msg) {System.out.println("消費者接受到消息" + msg);}}
創建生產者
package com.example.springbootrabbitmqproducer.helloworld;import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** springboot-rabbitmq-producer** @author v_choncheng* @description* @create 2022-02-15 14:42*/
@Configuration
public class HelloWorldProducer {@Beanpublic Queue createQueue() {return new Queue("helloworld");}
}
生產者工程測試類中增加測試方法
?運行此測試方法后可以看到消費者接收到一條消息
Work queues
創建消費者并啟動消費者應用
package com.example.springbootrabbitmqconsumer.workqueues;import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** springboot-rabbitmq-consumer** @author v_choncheng* @description* @create 2022-02-15 15:11*/
@Component
public class WorkQueuesConsumer {@RabbitListener(queuesToDeclare = @Queue("workqueues"))public void receive1(String msg) {System.out.println("消費者1接受到消息" + msg);}@RabbitListener(queuesToDeclare = @Queue("workqueues"))public void receive2(String msg) {System.out.println("消費者2接受到消息" + msg);}
}
生產者工程測試類中增加測試方法
運行此測試方法后可以看到消費者1、2輪流接收到消息
Publish/Subscribe
創建消費者并啟動消費者應用
package com.example.springbootrabbitmqconsumer.fanout;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** springboot-rabbitmq-consumer** @author v_choncheng* @description* @create 2022-02-15 15:23*/
@Component
public class FanoutConsumer {@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(type = ExchangeTypes.FANOUT, name = "fanoutexchange"), value = @Queue("fanoutqueues1")))public void receive1(String msg) {System.out.println("消費者1接受到消息" + msg);}@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(type = ExchangeTypes.FANOUT, name = "fanoutexchange"), value = @Queue("fanoutqueues2")))public void receive2(String msg) {System.out.println("消費者2接受到消息" + msg);}
}
生產者工程測試類中增加測試方法
?運行此測試方法后可以看到消費者1、2同時接收到消息
Routing
創建消費者并啟動消費者應用
package com.example.springbootrabbitmqconsumer.routing;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** springboot-rabbitmq-consumer** @author v_choncheng* @description* @create 2022-02-15 15:38*/
@Component
public class RoutingConsumer {@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(type = ExchangeTypes.DIRECT, name = "routingexchange"), value = @Queue("routingqueues1"), key = {"debug", "verbose", "notice", "warning"}))public void receive1(String msg) {System.out.println("消費者1接受到消息" + msg);}@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(type = ExchangeTypes.DIRECT, name = "routingexchange"), value = @Queue("routingqueues2"), key = {"debug", "verbose"}))public void receive2(String msg) {System.out.println("消費者2接受到消息" + msg);}
}
生產者工程測試類中增加測試方法
?運行此測試方法后可以看到消費者1接收四條消息、消費者2只接收到debug和verbose消息
Topics
創建消費者并啟動消費者應用
package com.example.springbootrabbitmqconsumer.topics;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** springboot-rabbitmq-consumer** @author v_choncheng* @description* @create 2022-02-15 15:38*/
@Component
public class TopicsConsumer {@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(type = ExchangeTypes.TOPIC, name = "topicexchange"), value = @Queue("topicqueues1"), key = {"user.*"}))public void receive1(String msg) {System.out.println("消費者1接受到消息" + msg);}@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(type = ExchangeTypes.TOPIC, name = "topicexchange"), value = @Queue("topicqueues2"), key = {"user.#", "verbose"}))public void receive2(String msg) {System.out.println("消費者2接受到消息" + msg);}
}
生產者工程測試類中增加測試方法
?運行此測試方法后可以看到消費者2接收四條消息、消費者1只接收到user.和user.insert消息