消息隊列服務器核心功能就是,提供了虛擬主機,交換機, 隊列,消息等概念的管理,實現三種典型的消息轉發方式,可以實現跨主機/服務器之間的生產者消費模型。
這里,就編寫一個demo,實現跨主機的生產者消費者模型。
🍅 完善服務器的啟動類
@SpringBootApplication
public class TigerMqApplication {public static ConfigurableApplicationContext context;public static void main(String[] args) throws IOException {context = SpringApplication.run(TigerMqApplication.class, args);BrokerServer brokerServer = new BrokerServer(9090);brokerServer.start();}
}
🍅 創建Demo程序
?
/*
* 表示一個生產者
* */
public class DemoProducer {public static void main(String[] args) throws IOException, InterruptedException {System.out.println("啟動生產者");ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 創建交換機和隊列channel.exchangeDeclare("testExchange", ExchangeType.DIRECT,true,false,null );channel.queueDeclare("testQueue",true,false,false,null);// 創建一個消息并且發送byte[] body = "hello,TigerMQ".getBytes();boolean ok = channel.basicPublish("testExchange","testQueue",null,body);System.out.println("消息投遞完成!ok = " + ok);Thread.sleep(500);channel.close();connection.close();}
}
/*
* 表示一個消費者
* */
public class DemoConsumer {public static void main(String[] args) throws IOException, MqException, InterruptedException {System.out.println("啟動消費者!");ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("testExchange", ExchangeType.DIRECT,true,false,null);channel.queueDeclare("testQueue",true,false,false,null);channel.basicConsume("testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[消費數據]開始!");System.out.println("consumerTag = " + consumerTag);System.out.println("basicProperties = " + basicProperties);String bodyString = new String(body,0, body.length);System.out.println("body = " + bodyString);System.out.println("[消費數據]結束!");}});// 一直等待消費while (true){Thread.sleep(500);}}
}
🍅 啟動服務器和客戶端程序
啟動服務器
?啟動生產者和消費者
啟動生產者
[Connection] 發送請求! type=1, length=188
[Connection] 收到響應! type=1, length=192
[Connection] 發送請求! type=3, length=512
[Connection] 收到響應! type=3, length=192
[Connection] 發送請求! type=5, length=349
[Connection] 收到響應! type=5, length=192
[Connection] 發送請求! type=9, length=437
[Connection] 收到響應! type=9, length=192
消息投遞完成!ok = true
[Connection] 發送請求! type=2, length=188
[Connection] 收到響應! type=2, length=192
[Connection] 連接正常斷開!Process finished with exit code 0
啟動消費者!
[Connection] 發送請求! type=1, length=188
[Connection] 收到響應! type=1, length=192
[Connection] 發送請求! type=3, length=512
[Connection] 收到響應! type=3, length=192
[Connection] 發送請求! type=5, length=349
[Connection] 收到響應! type=5, length=192
[Connection] 發送請求! type=10, length=315
[Connection] 收到響應! type=10, length=192
[Connection] 收到響應! type=12, length=528
[消費數據]開始!
consumerTag = C-4e9d5324-c197-462a-a0a5-31ffe3bf929a
basicProperties = BasicProperties(messageId=M-69e805c0-8298-4e8f-b737-001c340e18d5, routingKey=testQueue, deliverMode=1)
body = hello,TigerMQ
[消費數據]結束!