項目實戰 — 消息隊列(8){網絡通信設計②}

目錄

?一、客戶端設計

🍅 1、設計三個核心類

?🍅 2、完善Connection類

????????🎄 讀取請求和響應、創建channel

? ? ? ? ?🎄 添加掃描線程

????????🎄 處理不同的響應

? ? ? ? 🎄 關閉連接

🍅 3、完善Channel類

🎄 編寫createChannel()

🎄 編寫waitResult()和putRetuens()方法

🎄 編寫其他核心API

????????🎊 交換機

????????🎊 隊列

?????????🎊 綁定

?????????🎊發布消息

? ? ? ? 🎊 訂閱消息

? ? ? ? 🎊 確認消息

二、客戶端測試

🍅 1、準備工作和收尾工作

?🍅 2、測試connection

🍅 3、測試channnel的創建

🍅 4、測試交換機

?🍅 5、測試隊列

🍅 6、測試綁定


?一、客戶端設計

🍅 1、設計三個核心類

三個核心類:

(1)ConnectoonFactory連接工廠:這個類,持有服務器的地址,主要功能是創建出連接Connectiond對象

(2)Connection:表示一個TCP連接,持有Socket對象,寫入請求/讀取響應,管理多個Channel對象

(3)Channel:表示一個邏輯上的連接。當前設定的交互模型,一個TCP連接是可以進行復用的,一個客戶端可以有多個模塊,每個模塊都可以和brokerServer之間建立“邏輯上的連接”(channel),但是這幾個模塊的channel之間是互相不影響的。同時還需要提供一系列的方法,與服務器提供的核心API進行對應。

?先創建這三個核心的類

在包mqclient中創建這三個類。

@Data
public class ConnectionFactory {
//    BrokerServer的ip地址private String host;
//    BrokerServer端口號private int port;public Connection newConnection(){Connection connection = new Connection(host,port);return connection;}
}
@Data
public class Connection {private Socket socket = null;//    需要管理多個channel,使用哈希表把若干個channel組織起來private ConcurrentHashMap<String ,Channel> channelMap = new ConcurrentHashMap<>();private InputStream inputStream;private OutputStream outputStream;private DataInputStream dataInputStream;private DataOutputStream dataOutputStream;private ExecutorService callbackPool = null;public Connection(String host,int port) throws IOException {socket = new Socket(host,port);inputStream = socket.getInputStream();outputStream = socket.getOutputStream();dataInputStream = new DataInputStream(inputStream);dataOutputStream = new DataOutputStream(outputStream);}//    使用該方法分別處理,當前響應是一個針對控制請求的響應,還是服務器推送的響應private void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {//TODO}//    發送請求public void writeRequest(Request request) throws IOException {
//        TODO}//    讀取響應public Response readResponse() throws IOException {
//        TODO}//    通過這個方法,再connection中創建出一個channelpublic Channel createChannel(){// TODOreturn channel;}
//    關閉connectionpublic void close() {//TODO}
}
@Data
public class Channel {private String channelId;
//    當前channel屬于哪個連接private Connection connection;
//    用來存儲后續客戶端收到的服務器的響應private ConcurrentHashMap<String, BasicReturns> basicReturnsMap = new ConcurrentHashMap<>();
//    如果當前的Channel訂閱了某個隊列,此處就需要記錄對應的回調是什么
//    當該隊列的消息返回回來的時候,就調用回調
//    約定一個channel值能有一個回調private Consumer consumer = null;public Channel(String channelId, Connection connection) {this.channelId = channelId;this.connection = connection;}//    這個方法主要和服務器進行交互,
//    目的是為了告知服務器,此處客戶端創建了新的channelpublic boolean createChannel() {
//        TODOreturn true;}//    使用該方法阻塞等待服務器的響應private BasicReturns waitResult(String rid) {return null;}//    關閉channel,給服務器發送一個type == 0x2的請求public boolean close() throws IOException {//TODOreturn null;}//創建核心的API方法
}


?🍅 2、完善Connection類

這里完成發送請求、讀取響應、創建channel、處理響應、關閉連接

????????🎄 讀取請求和響應、創建channel

//    發送請求public void writeRequest(Request request) throws IOException {dataOutputStream.writeInt(request.getType());dataOutputStream.writeInt(request.getLength());dataOutputStream.write(request.getPayload());dataOutputStream.flush();System.out.println("[Connection] 發送請求! type=" + request.getType() + ", length=" + request.getLength());}//    讀取響應public Response readResponse() throws IOException {Response response = new Response();response.setType(dataInputStream.readInt());response.setLength(dataInputStream.readInt());byte[] payload = new byte[response.getLength()];int n = dataInputStream.read(payload);if (n != response.getLength()) {throw new IOException("讀取的響應數據不完整!");}response.setPayload(payload);System.out.println("[Connection] 收到響應! type=" + response.getType() + ", length=" + response.getLength());return response;}//    通過這個方法,再connection中創建出一個channel
//    此處的createChannel()方法在后面channel類中編寫以后,會拋異常,這里大家寫完channel之后回過來手動拋一下public Channel createChannel() {    //throws IOExceptionString channelId = "C-" + UUID.randomUUID();Channel channel = new Channel(channelId,this);
//        把channel對象放到Connection管理的channel的哈希表中channelMap.put(channelId,channel);
//        同時也需要把“創建channel”的這個消息也告訴服務器boolean ok = channel.createChannel();if (!ok){
//            創建channel失敗
//            刪除hash表中的鍵值對channelMap.remove(channelId);return null;}return channel;}

? ? ? ? ?🎄 添加掃描線程

在構造方法中,添加一個掃描線程,使用該線程,不停的從socket中讀取響應,再將這個響應交給對應的channnel。

   public Connection(String host,int port) throws IOException {socket = new Socket(host,port);inputStream = socket.getInputStream();outputStream = socket.getOutputStream();dataInputStream = new DataInputStream(inputStream);dataOutputStream = new DataOutputStream(outputStream);callbackPool = Executors.newFixedThreadPool(4);//        創建一個掃描線程
//        該線程負責不停的從socket中讀取響應數據,然后把這個響應數據再交給對應的channel負責處理Thread t = new Thread(() -> {try {while (!socket.isClosed()) {Response response = readResponse();dispatchResponse(response);}} catch (SocketException e) {// 連接正常斷開的. 此時這個異常直接忽略.System.out.println("[Connection] 連接正常斷開!");} catch (IOException | ClassNotFoundException | MqException e) {System.out.println("[Connection] 連接異常斷開!");e.printStackTrace();}});t.start();}

????????🎄 處理不同的響應

?使用該方法分別處理兩種不同的響應,當前響應是一個針對控制請求的響應,還是服務器推送的響應。

private void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {if (response.getType() == 0xc) {
//            服務器推送來的消息數據SubScribeReturns subScribeReturns = (SubScribeReturns) BinaryTool.fromBytes(response.getPayload());//            根據channelId找到對用的channel對象Channel channel = channelMap.get(subScribeReturns.getChannelId());if (channel == null) {throw new MqException("[Connection] 該消息對應的 channel 在客戶端中不存在! channelId =" + channel.getChannelId());}callbackPool.submit(() -> {try {channel.getConsumer().handleDelivery(subScribeReturns.getConsumerTag(), subScribeReturns.getBasicProperties(),subScribeReturns.getBody());} catch (MqException | IOException e) {e.printStackTrace();}});} else {
//            當前響應是針對控制請求的響應BasicReturns basicReturns = (BasicReturns) BinaryTool.fromBytes(response.getPayload());
//            把這個結果放到對應的channel的hash表中Channel channel = channelMap.get(basicReturns.getChannelId());if (channel == null) {throw new MqException("[Connection] 該消息對應的 channel 在客戶端中不存在! channelId =" + channel.getChannelId());}channel.putReturns(basicReturns);}}

? ? ? ? 🎄 關閉連接

public void close() {//        關閉connection,釋放持有的資源try {callbackPool.shutdownNow();channelMap.clear();inputStream.close();outputStream.close();socket.close();} catch (IOException e) {e.printStackTrace();}}

🍅 3、完善Channel類

🎄 編寫createChannel()

// ? ?這個方法主要和服務器進行交互,
// ? ?目的是為了告知服務器,此處客戶端創建了新的channel

//    這個方法主要和服務器進行交互,
//    目的是為了告知服務器,此處客戶端創建了新的channelpublic boolean createChannel() throws IOException {
//        對于創建channel來說,payload就是一個basicArgumentsBasicArguments basicArguments=  new BasicArguments();basicArguments.setChannelId(channelId);
//        rid表示這次請求的idbasicArguments.setRid(generateRid());byte[] payload = BinaryTool.toBytes(basicArguments);Request request = new Request();request.setType(0x1);request.setLength(payload.length);request.setPayload(payload);//       發送構造出的請求connection.writeRequest(request);
//        等待服務器的響應BasicReturns basicReturns = waitResult(basicArguments.getRid());return basicReturns.isOk();}private String generateRid(){return "R-" + UUID.randomUUID().toString();}

🎄 編寫waitResult()和putRetuens()方法

putRetuents()是為了將返回的響應放到對用的哈希表中

    public void putReturns(BasicReturns basicReturns) {basicReturnsMap.put(basicReturns.getRid(),basicReturns);synchronized (this){
//            當前不知道有多少個線程在等待上面的響應
//            把多有的等待的線程都喚醒notifyAll();}}

waitResult()方法的作用是為了阻塞等待服務器的響應。以下舉例說明:??

? 如下,假如有3個channel,按照123的順序發送了請求,所以應該是請求1先等待響應1,然后再是2和3。

但是,現在有一個情況,服務器這邊是多線程并發處理請求,服務器處理每個請求的時間不一樣,返回響應的順序也就不一樣。

如下圖,channel1等待的是響應1,但是先返回的響應卻是2和3。響應1還沒來,請求1就一直等。請求1,沒等到,后面的2和3也就拿不到響應

所以,為了解決這個問題,就創建了basicReturnsMap,將socket中收到的所有響應數據放到這個在前面創建的basicReturnsMap哈希表中。客戶端的的請求,就可以不斷的從這個哈希表中尋找是否存在和自己匹配的響應。

如果存在,就把相應取走;不存在,就繼續等待。

這個waitResult()方法就是,當請求對應的響應不再哈希表中時,就阻塞等待。

//    使用該方法阻塞等待服務器的響應private BasicReturns waitResult(String rid) {BasicReturns basicReturns = null;while ((basicReturns = basicReturnsMap.get(rid)) == null){
//            如果查詢結果為null,說明響應沒來
//            此時就需要阻塞等待
//            此處加鎖是為了保證wait/notify的是同一個對象synchronized (this){try {wait();}catch (InterruptedException e){e.printStackTrace();}}}
//        讀取成功之后,把這個響應從哈希表中刪除掉basicReturnsMap.remove(rid);return basicReturns;}


🎄 編寫其他核心API

????????🎊 交換機

//    創建交換機public  boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable) throws IOException {ExchangeDeclareArguments exchangeDeclareArguments = new ExchangeDeclareArguments();exchangeDeclareArguments.setRid(generateRid());exchangeDeclareArguments.setChannelId(channelId);exchangeDeclareArguments.setExchangeName(exchangeName);exchangeDeclareArguments.setExchangeType(exchangeType);exchangeDeclareArguments.setDurable(durable);byte[] payload = BinaryTool.toBytes(exchangeDeclareArguments);Request request = new Request();request.setType(0x3);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(exchangeDeclareArguments.getRid());return basicReturns.isOk();}//    刪除交換機public boolean exchangeDelete(String exchangeName) throws IOException {ExchangeDeleteArguments arguments = new ExchangeDeleteArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setExchangeName(exchangeName);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0x4);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}

????????🎊 隊列

 public boolean queueDeclare(String queueName,boolean durable) throws IOException {QueueDeclareArguments queueDeclareArguments = new QueueDeclareArguments();queueDeclareArguments.setRid(generateRid());queueDeclareArguments.setChannelId(channelId);queueDeclareArguments.setQueueName(queueName);queueDeclareArguments.setDurable(durable);byte[] payload = BinaryTool.toBytes(queueDeclareArguments);Request request = new Request();request.setType(0x5);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(queueDeclareArguments.getRid());return basicReturns.isOk();}public boolean queueDelete(String queueName) throws IOException {QueueDeleteArguments queueDeleteArguments = new QueueDeleteArguments();queueDeleteArguments.setRid(generateRid());queueDeleteArguments.setChannelId(channelId);queueDeleteArguments.setQueueName(queueName);byte[] payload = BinaryTool.toBytes(queueDeleteArguments);Request request = new Request();request.setType(0x6);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(queueDeleteArguments.getRid());return basicReturns.isOk();}

?????????🎊 綁定

//    創建綁定public  boolean queueBind(String  queueName,String exchangeName,String bindingKey) throws IOException {QueueBindArguments queueBindArguments = new QueueBindArguments();queueBindArguments.setRid(generateRid());queueBindArguments.setChannelId(channelId);queueBindArguments.setExchangeName(exchangeName);queueBindArguments.setQueueName(queueName);queueBindArguments.setBindingKey(bindingKey);byte[] payload = BinaryTool.toBytes(queueBindArguments);Request request = new Request();request.setType(0x7);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(queueBindArguments.getRid());return basicReturns.isOk();}//    刪除綁定public boolean queueUnbind(String queueName,String exchangeName) throws IOException {QueueUnbindArguments queueUnbindArguments = new QueueUnbindArguments();queueUnbindArguments.setRid(generateRid());queueUnbindArguments.setChannelId(channelId);queueUnbindArguments.setQueueName(queueName);queueUnbindArguments.setExchangeName(exchangeName);byte[] payload = BinaryTool.toBytes(queueUnbindArguments);Request request = new Request();request.setType(0x8);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(queueUnbindArguments.getRid());return basicReturns.isOk();}

?????????🎊發布消息

//  發布消息public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) throws IOException {BasicPublishArguments arguments = new BasicPublishArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setExchangeName(exchangeName);arguments.setRoutingKey(routingKey);arguments.setBasicProperties(basicProperties);arguments.setBody(body);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0x9);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}

? ? ? ? 🎊 訂閱消息

//    訂閱消息public boolean basicConsume(String queueName, boolean autoAck, Consumer consumer) throws MqException, IOException {
//        先設置回調.if (this.consumer != null) {throw new MqException("該 channel 已經設置過消費消息的回調了, 不能重復設置!");}this.consumer = consumer;BasicConsumeArguments arguments = new BasicConsumeArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);
//        此處 consumerTag 使用 channelId 來表示arguments.setConsumerTag(channelId);   arguments.setQueueName(queueName);arguments.setAutoAck(autoAck);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0xa);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}

? ? ? ? 🎊 確認消息

//    確認消息public boolean basicAck(String queueName, String messageId) throws IOException {BasicAckArguments arguments = new BasicAckArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setQueueName(queueName);arguments.setMessageId(messageId);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0xb);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}

二、客戶端測試

🍅 1、準備工作和收尾工作

private BrokerServer brokerServer = null;private ConnectionFactory factory = null;private Thread t = null;@BeforeEachpublic void setUp() throws IOException {// 1. 先啟動服務器TigerMqApplication.context = SpringApplication.run(TigerMqApplication.class);brokerServer = new BrokerServer(9090);t = new Thread(() -> {// 這個 start 方法會進入一個死循環. 使用一個新的線程來運行 start 即可!try {brokerServer.start();} catch (IOException e) {e.printStackTrace();}});t.start();// 2. 配置 ConnectionFactoryfactory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);}@AfterEachpublic void tearDown() throws IOException {// 停止服務器brokerServer.stop();// t.join();TigerMqApplication.context.close();// 刪除必要的文件File file = new File("./data");FileUtils.deleteDirectory(file);factory = null;}
}

?🍅 2、測試connection

@Testpublic void testConnection() throws IOException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);}
打印日志:
[DataBaseManger]創建表完成
[DataBaseManger]創建初始數據已經完成
[DataBaseManger]數據庫初始化完成
[BrokerServer] 啟動!
[BrokerServer] 服務器停止運行!

🍅 3、測試channnel的創建

 @Testpublic void testChannel() throws IOException{Connection connection = connectionFactory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);}

🍅 4、測試交換機

@Testpublic void testExchange() throws IOException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true);Assertions.assertTrue(ok);ok = channel.exchangeDelete("testExchange");Assertions.assertTrue(ok);channel.close();connection.close();}
[DataBaseManger]創建表完成
[DataBaseManger]創建初始數據已經完成
[DataBaseManger]數據庫初始化完成
[BrokerServer] 啟動!
[connection]發送請求!type = 1,length = 188
[Request] rid=R-ff8a7be0-8138-4334-b496-647b472349fa, channelId=C-4cc5b5a4-6b73-435e-b7cf-8d9e2a2ec72f, type=1, length=188
[BrokerServer] 創建 channel 完成! channelId=C-4cc5b5a4-6b73-435e-b7cf-8d9e2a2ec72f
[Response] rid=R-ff8a7be0-8138-4334-b496-647b472349fa, channelId=C-4cc5b5a4-6b73-435e-b7cf-8d9e2a2ec72f, type=1, length=192
[Connection]收到響應!type = 1,length = 192
[connection]發送請求!type = 3,length = 412
[Request] rid=R-9c7ddbe5-e0c1-42b2-a7b3-e456c5c8e457, channelId=C-4cc5b5a4-6b73-435e-b7cf-8d9e2a2ec72f, type=3, length=412
[MemoryDataCenter]新交換機添加成功!exchangeName = defaulttestExchange
[VirtualHost] 交換機創建完成!exchangeName = defaulttestExchange
[Response] rid=R-9c7ddbe5-e0c1-42b2-a7b3-e456c5c8e457, channelId=C-4cc5b5a4-6b73-435e-b7cf-8d9e2a2ec72f, type=3, length=192
[Connection]收到響應!type = 3,length = 192
[connection]發送請求!type = 4,length = 288
[Request] rid=R-76865d42-f11f-4a57-9e99-d66f5bb6d228, channelId=C-4cc5b5a4-6b73-435e-b7cf-8d9e2a2ec72f, type=4, length=288
[MemoryDataCenter]交換機刪除成功! exchangeName = defaulttestExchange
[VirtualHost] 交換機刪除成功!exchangeName = defaulttestExchange
[Response] rid=R-76865d42-f11f-4a57-9e99-d66f5bb6d228, channelId=C-4cc5b5a4-6b73-435e-b7cf-8d9e2a2ec72f, type=4, length=192
[Connection]收到響應!type = 4,length = 192
[connection]發送請求!type = 2,length = 188
[Request] rid=R-49132d69-cc39-4030-9293-c9c4d3c3f0d1, channelId=C-4cc5b5a4-6b73-435e-b7cf-8d9e2a2ec72f, type=2, length=188
[BrokerServer] 銷毀 channel 完成! channelId=C-4cc5b5a4-6b73-435e-b7cf-8d9e2a2ec72f
[Response] rid=R-49132d69-cc39-4030-9293-c9c4d3c3f0d1, channelId=C-4cc5b5a4-6b73-435e-b7cf-8d9e2a2ec72f, type=2, length=192
[Connection]收到響應!type = 2,length = 192
[Connection] 連接正常斷開!
[BrokerServer] connection 關閉! 客戶端的地址: /127.0.0.1:54675
[BrokerServer]清理session完成~ 被清理的channeId = []
[BrokerServer] 服務器停止運行!
2023-08-13 17:10:28.619  INFO 38940 --- [           main] o.apache.catalina.core.StandardService   : Stopping service [Tomcat]
2023-08-13 17:10:28.704  INFO 38940 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2023-08-13 17:10:28.727  INFO 38940 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.Process finished with exit code 0

?🍅 5、測試隊列

@Testpublic void testQueue() throws IOException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);boolean ok = channel.queueDeclare("testQueue", true);Assertions.assertTrue(ok);ok = channel.queueDelete("testQueue");Assertions.assertTrue(ok);channel.close();connection.close();}
[DataBaseManger]創建表完成
[DataBaseManger]創建初始數據已經完成
[DataBaseManger]數據庫初始化完成
[BrokerServer] 啟動!
[connection]發送請求!type = 1,length = 188
[Request] rid=R-86546d10-c911-4cbc-be4c-ac2c9d04ff38, channelId=C-91c1e861-6a7b-49de-a102-27c5437e0a40, type=1, length=188
[BrokerServer] 創建 channel 完成! channelId=C-91c1e861-6a7b-49de-a102-27c5437e0a40
[Response] rid=R-86546d10-c911-4cbc-be4c-ac2c9d04ff38, channelId=C-91c1e861-6a7b-49de-a102-27c5437e0a40, type=1, length=192
[Connection]收到響應!type = 1,length = 192
[connection]發送請求!type = 5,length = 349
[Request] rid=R-2b3df748-be8a-4aab-9dc7-8c37abf93b91, channelId=C-91c1e861-6a7b-49de-a102-27c5437e0a40, type=5, length=349
[MemoryDataCenter]隊列刪除成功!queueName = defaulttestQueue
[VirtualHost]隊列創建成功!queueName = defaulttestQueue
[Response] rid=R-2b3df748-be8a-4aab-9dc7-8c37abf93b91, channelId=C-91c1e861-6a7b-49de-a102-27c5437e0a40, type=5, length=192
[Connection]收到響應!type = 5,length = 192
[connection]發送請求!type = 6,length = 279
[Request] rid=R-057a79a8-20db-408f-82db-eb9e9145a48b, channelId=C-91c1e861-6a7b-49de-a102-27c5437e0a40, type=6, length=279
[MemoryDataCenter]刪除隊列成功!queueName = defaulttestQueue
[VirtualHost]刪除隊列成功!queueName = defaulttestQueue
[Response] rid=R-057a79a8-20db-408f-82db-eb9e9145a48b, channelId=C-91c1e861-6a7b-49de-a102-27c5437e0a40, type=6, length=192
[Connection]收到響應!type = 6,length = 192
[connection]發送請求!type = 2,length = 188
[Request] rid=R-73a48380-78e9-448f-9566-4cdc7da17bda, channelId=C-91c1e861-6a7b-49de-a102-27c5437e0a40, type=2, length=188
[BrokerServer] 銷毀 channel 完成! channelId=C-91c1e861-6a7b-49de-a102-27c5437e0a40
[Response] rid=R-73a48380-78e9-448f-9566-4cdc7da17bda, channelId=C-91c1e861-6a7b-49de-a102-27c5437e0a40, type=2, length=192
[Connection]收到響應!type = 2,length = 192
[Connection] 連接正常斷開!
[BrokerServer] connection 關閉! 客戶端的地址: /127.0.0.1:54945
[BrokerServer]清理session完成~ 被清理的channeId = []
[BrokerServer] 服務器停止運行!
2023-08-13 17:16:54.304  INFO 72124 --- [           main] o.apache.catalina.core.StandardService   : Stopping service [Tomcat]
2023-08-13 17:16:54.333  INFO 72124 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2023-08-13 17:16:54.363  INFO 72124 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.Process finished with exit code 0

🍅 6、測試綁定

 @Testpublic void testBinding() throws IOException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true);Assertions.assertTrue(ok);ok = channel.queueDeclare("testQueue", true);Assertions.assertTrue(ok);ok = channel.queueBind("testQueue", "testExchange", "testBindingKey");Assertions.assertTrue(ok);ok = channel.queueUnbind("testQueue", "testExchange");Assertions.assertTrue(ok);channel.close();connection.close();}
[DataBaseManger]創建表完成
[DataBaseManger]創建初始數據已經完成
[DataBaseManger]數據庫初始化完成
[BrokerServer] 啟動!
[connection]發送請求!type = 1,length = 188
[Request] rid=R-071477e3-7115-42e7-9370-7995fa36daab, channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52, type=1, length=188
[BrokerServer] 創建 channel 完成! channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52
[Response] rid=R-071477e3-7115-42e7-9370-7995fa36daab, channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52, type=1, length=192
[Connection]收到響應!type = 1,length = 192
[connection]發送請求!type = 3,length = 412
[Request] rid=R-cc632d4d-f06f-4b70-88dd-63dd94c666f2, channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52, type=3, length=412
[MemoryDataCenter]新交換機添加成功!exchangeName = defaulttestExchange
[VirtualHost] 交換機創建完成!exchangeName = defaulttestExchange
[Response] rid=R-cc632d4d-f06f-4b70-88dd-63dd94c666f2, channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52, type=3, length=192
[Connection]收到響應!type = 3,length = 192
[connection]發送請求!type = 5,length = 349
[Request] rid=R-478d13e5-b999-4ac4-96cf-e0c8df829152, channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52, type=5, length=349
[MemoryDataCenter]隊列刪除成功!queueName = defaulttestQueue
[VirtualHost]隊列創建成功!queueName = defaulttestQueue
[Response] rid=R-478d13e5-b999-4ac4-96cf-e0c8df829152, channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52, type=5, length=192
[Connection]收到響應!type = 5,length = 192
[connection]發送請求!type = 7,length = 347
[Request] rid=R-77076a0e-faa7-4ecb-867b-48c7e33d720d, channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52, type=7, length=347
[MemoryDataCenter]新綁定添加成功!exchangeName = defaulttestQueue,queueName = defaulttestQueue
[VirtualHost]綁定創建成功! exchangeName = defaulttestExchangequeueName = defaulttestQueue
[Response] rid=R-77076a0e-faa7-4ecb-867b-48c7e33d720d, channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52, type=7, length=192
[Connection]收到響應!type = 7,length = 192
[connection]發送請求!type = 8,length = 314
[Request] rid=R-f5eeb7c0-0e4c-4339-8e3b-057354e27380, channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52, type=8, length=314
[MemoryDataCenter]綁定刪除成功!exchangeName = defaulttestQueue,queueName = defaulttestQueue
[VirtualHost]刪除綁定成功
[Response] rid=R-f5eeb7c0-0e4c-4339-8e3b-057354e27380, channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52, type=8, length=192
[Connection]收到響應!type = 8,length = 192
[Request] rid=R-c3335cd6-d02e-440e-9aea-5275bf445412, channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52, type=2, length=188
[BrokerServer] 銷毀 channel 完成! channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52
[Response] rid=R-c3335cd6-d02e-440e-9aea-5275bf445412, channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52, type=2, length=192
[Connection]收到響應!type = 2,length = 192
[connection]發送請求!type = 2,length = 188
[BrokerServer] connection 關閉! 客戶端的地址: /127.0.0.1:55256
[BrokerServer]清理session完成~ 被清理的channeId = []
[Connection] 連接正常斷開!
[BrokerServer] 服務器停止運行!
2023-08-13 17:22:34.611  INFO 74604 --- [           main] o.apache.catalina.core.StandardService   : Stopping service [Tomcat]
2023-08-13 17:22:34.646  INFO 74604 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2023-08-13 17:22:34.691  INFO 74604 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.Process finished with exit code 0

🍅 7、測試消息的相關操作

 @Testpublic void testMessage() throws IOException, MqException, InterruptedException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true);Assertions.assertTrue(ok);ok = channel.queueDeclare("testQueue", true, false);Assertions.assertTrue(ok);byte[] requestBody = "hello".getBytes();ok = channel.basicPublish("testExchange", "testQueue", null, requestBody);Assertions.assertTrue(ok);ok = channel.basicConsume("testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[消費數據] 開始!");System.out.println("consumerTag=" + consumerTag);System.out.println("basicProperties=" + basicProperties);Assertions.assertArrayEquals(requestBody, body);System.out.println("[消費數據] 結束!");}});Assertions.assertTrue(ok);Thread.sleep(500);channel.close();connection.close();}
[DataBaseManger]創建表完成
[DataBaseManger]創建初始數據已經完成
[DataBaseManger]數據庫初始化完成
[BrokerServer] 啟動!
[Connection] 發送請求! type=1, length=188
[Request] rid=R-143ae2ea-f258-4874-bff4-be3f719d44ed, channelId=C-0977501d-4608-4428-ae5c-db2738c02068, type=1, length=188
[BrokerServer] 創建 channel 完成! channelId=C-0977501d-4608-4428-ae5c-db2738c02068
[Response] rid=R-143ae2ea-f258-4874-bff4-be3f719d44ed, channelId=C-0977501d-4608-4428-ae5c-db2738c02068, type=1, length=192
[Connection] 收到響應! type=1, length=192
[Connection] 發送請求! type=3, length=512
[Request] rid=R-f4acb000-ae7f-44b5-829d-aef69d8a7394, channelId=C-0977501d-4608-4428-ae5c-db2738c02068, type=3, length=512
[MemoryDataCenter]新交換機添加成功!exchangeName = defaulttestExchange
[VirtualHost] 交換機創建完成!exchangeName = defaulttestExchange
[Response] rid=R-f4acb000-ae7f-44b5-829d-aef69d8a7394, channelId=C-0977501d-4608-4428-ae5c-db2738c02068, type=3, length=192
[Connection] 收到響應! type=3, length=192
[Connection] 發送請求! type=5, length=349
[Request] rid=R-311dd85c-c95b-436e-8d4d-85a3c38b445e, channelId=C-0977501d-4608-4428-ae5c-db2738c02068, type=5, length=349
[MemoryDataCenter]隊列刪除成功!queueName = defaulttestQueue
[VirtualHost]隊列創建成功!queueName = defaulttestQueue
[Response] rid=R-311dd85c-c95b-436e-8d4d-85a3c38b445e, channelId=C-0977501d-4608-4428-ae5c-db2738c02068, type=5, length=192
[Connection] 收到響應! type=5, length=192
[Connection] 發送請求! type=9, length=429
[Request] rid=R-b1088753-1ee7-43ff-ae1f-85679ad4e48d, channelId=C-0977501d-4608-4428-ae5c-db2738c02068, type=9, length=429
[MemoryDataCenter]新消息添加成功!messageId = M-b688741f-808b-4cfa-9d7a-1a4f51d28c0b
[MemoryDataCenter]消息被投遞到到隊列中! messageId = M-b688741f-808b-4cfa-9d7a-1a4f51d28c0b
[Response] rid=R-b1088753-1ee7-43ff-ae1f-85679ad4e48d, channelId=C-0977501d-4608-4428-ae5c-db2738c02068, type=9, length=192
[Connection] 收到響應! type=9, length=192
[Connection] 發送請求! type=10, length=315
[Request] rid=R-bf16b934-229a-4b03-8d8c-50416fec0aa6, channelId=C-0977501d-4608-4428-ae5c-db2738c02068, type=10, length=315
[MemoryDataCenter]消息從隊列中取出!messageId = M-b688741f-808b-4cfa-9d7a-1a4f51d28c0b
[VirtualHost]basicConsume成功! queueName = defaulttestQueue
[Response] rid=R-bf16b934-229a-4b03-8d8c-50416fec0aa6, channelId=C-0977501d-4608-4428-ae5c-db2738c02068, type=10, length=192
[Connection] 收到響應! type=10, length=192
[MemoryDataCenter]消息進入待確認隊列!messageId = M-b688741f-808b-4cfa-9d7a-1a4f51d28c0b
[Connection] 收到響應! type=12, length=520
[MemoryDataCenter]消息從待確認隊列刪除!messageId = M-b688741f-808b-4cfa-9d7a-1a4f51d28c0b
[MemoryDataCenter]消息被移除!messageId = M-b688741f-808b-4cfa-9d7a-1a4f51d28c0b
[ConsumerManager]消費被成功消費!queueName = defaulttestQueue
[消費數據] 開始!
consumerTag=C-0977501d-4608-4428-ae5c-db2738c02068
basicProperties=BasicProperties(messageId=M-b688741f-808b-4cfa-9d7a-1a4f51d28c0b, routingKey=testQueue, deliverMode=1)
[消費數據] 結束!
[Connection] 發送請求! type=2, length=188
[Request] rid=R-024d2c5b-dfd0-4944-983b-2cfab56350d4, channelId=C-0977501d-4608-4428-ae5c-db2738c02068, type=2, length=188
[BrokerServer] 銷毀 channel 完成! channelId=C-0977501d-4608-4428-ae5c-db2738c02068
[Response] rid=R-024d2c5b-dfd0-4944-983b-2cfab56350d4, channelId=C-0977501d-4608-4428-ae5c-db2738c02068, type=2, length=192
[Connection] 收到響應! type=2, length=192
[Connection] 連接正常斷開!
[BrokerServer] connection 關閉! 客戶端的地址: /127.0.0.1:58925
[BrokerServer]清理session完成~ 被清理的channeId = []
[BrokerServer] 服務器停止運行!
2023-08-13 18:19:24.906  INFO 75700 --- [           main] o.apache.catalina.core.StandardService   : Stopping service [Tomcat]
2023-08-13 18:19:24.929  INFO 75700 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2023-08-13 18:19:24.935  INFO 75700 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.Process finished with exit code 0

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/35510.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/35510.shtml
英文地址,請注明出處:http://en.pswp.cn/news/35510.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

廣州華銳互動:VR3D課程在線教育平臺為職業院校提供沉浸式的虛擬現實學習體驗

隨著科技的飛速發展&#xff0c;虛擬現實(VR)和增強現實(AR)技術已經逐漸滲透到我們生活的各個領域。其中&#xff0c;VR3D課程在線教育平臺作為一種新興的教育方式&#xff0c;正在逐漸改變我們的學習方式和體驗。本文將詳細介紹VR3D課程在線教育平臺的應用前景及特點。 VR3D課…

VFP現代物流企業管理系統的設計與實現

摘要: 隨著計算機技術的廣泛應用,在現代流通企業管理中引入計算機管理技術,成為一個值得深入研究的問題。 本文首先概要的論述了數據庫的有關知識與現狀。之后,對當代計算機數據庫技術的現狀,尤其是對關系型數據庫作了系統的描述。在此基礎上,論文重點對數據庫的開發環境 …

【設計模式】工廠模式

工廠模式 工廠模式&#xff08;Factory Pattern&#xff09;是 Java 中最常用的設計模式之一。這種類型的設計模式屬于創建型模式&#xff0c;它提供了一種創建對象的最佳方式。 工廠模式提供了一種將對象的實例化過程封裝在工廠類中的方式。通過使用工廠模式&#xff0c;可以…

Mysql整理二 - 常見查詢語句面試題(附原表)

表結構&#xff0c;創建原表的代碼在最后 -- cid課程id; tid老師id; sid學生id; select * from t_mysql_course; select * from t_mysql_score; select * from t_mysql_student; select * from t_mysql_teacher; 1. 查詢" 01 “課程比” 02 "課程成績高的學生的信息…

uniapp軟鍵盤談起遮住輸入框和頭部被頂起的問題解決

推薦&#xff1a; pages.json中配置如下可解決頭部被頂起和表單被遮住的問題。 { "path": "pages/debug/protocol/tagWord", "style": { "app-plus": { "soft…

01.setup,reactive,ref,computed,watch學習---2023新版Vue3基礎入門到實戰項目

1.Vue3組合式 2.創建vue3項目 2.1認識create-vue create-vue是官方新的腳手架工具&#xff0c;vite下一代構建工具 node -v >16 npm init vuelatest 2.2 setup 原始寫法 <script> export default {//執行時機比beforeCreate早//獲取不到this//數據和函數必須ret…

Java實戰:高效提取PDF文件指定坐標的文本內容

前言 臨時接到一個緊急需要處理的事項。業務側一個同事有幾千個PDF文件需要整理&#xff1a;需要從文件中的指定位置獲取對應的編號和地址。 要的急&#xff0c;工作量大。所以就問到技術部有沒有好的解決方案。 問技術的話就只能寫個demo跑下了。 解決辦法 1. 研究下PDF文檔…

案例15 Spring Boot入門案例

1. 選擇Spring Initializr快速構建項目 ? 2. 設置項目信息 ? 3. 選擇依賴 ? 4. 設置項目名稱 ? 5. 項目結構 ? 6. 項目依賴 自動配置了Spring MVC、內置了Tomcat、配置了Logback(日志)、配置了JSON。 ? 7. 創建HelloController類 com.wfit.boot.hello目錄下創建HelloCo…

Less和Sass的原理和用法

一、原理 1.1 Less定義&#xff1a;是一種動態的樣式語言,使CSS變成一種動態的語言特性&#xff0c;如變量、繼承、運算、函數。Less既可以在客戶端上面運行(支持IE6以上版本、Webkit、Firefox),也可以在服務端運行(Node.js) 1.2 SaSS定義&#xff1a;是一種動態樣式語言&#…

開發過程中遇到的問題以及解決方法

鞏固基礎&#xff0c;砥礪前行 。 只有不斷重復&#xff0c;才能做到超越自己。 能堅持把簡單的事情做到極致&#xff0c;也是不容易的。 開發過程中遇到的問題以及解決方法 簡單易用的git命令 git命令&#xff1a; 查看有幾個分支&#xff1a;git branch -a 切換分支&#…

Azure創建第一個虛擬機

首先&#xff0c;登錄到 Azure 門戶 (https://portal.azure.com/)。在 Azure 門戶右上角&#xff0c;點擊“虛擬機”按鈕&#xff0c;并點擊創建&#xff0c;創建Azure虛擬機。 在虛擬機創建頁面中&#xff0c;選擇所需的基本配置&#xff0c;包括虛擬機名稱、操作系統類型和版…

【JVM】JVM 調優的參數都有哪些?

文章目錄 1. 設置堆空間大小2. 虛擬機棧的設置3. 年輕代中Eden區和兩個Survivor區的大小比例4. 年輕代晉升老年代閾值5. 設置垃圾回收收集器 1. 設置堆空間大小 設置堆的初始大小和最大大小&#xff0c;為了防止垃圾收集器在初始大小、最大大小之間收縮堆而產生額外的時間&…

python編程小游戲簡單的,python小游戲編程100例

大家好&#xff0c;給大家分享一下python編程小游戲簡單的&#xff0c;很多人還不知道這一點。下面詳細解釋一下。現在讓我們來看看&#xff01; 不會python就不能用python開發入門級的小游戲&#xff1f; 當然不是&#xff0c;我收集了十個python入門小游戲的源碼和教程&#…

分支語句和循環語句(1)

這篇文章我們詳細的把分支語句和循環語句給大家進行講解。 分支語句&#xff1a; if switch 循環語句&#xff1a; while for do while goto語句&#xff1a; 1.什么是語句&#xff1f; C語句可分為以下五類&#xff1a; 1. 表達式語句 2. 函數調用語句 3. 控制…

qt自己實現方便的線程管理類

看本博客之前&#xff0c;可以先看看我這篇多線程博客&#xff1a;qt多線程使用方式_我是標同學的博客-CSDN博客

ORCA優化器淺析——CDXLOperator Base class for operators in a DXL tree

如上圖所示&#xff0c;CDXLOperator作為Base class for operators in a DXL tree&#xff0c;其子類CDXLLogical、CDXLScalar、CDXLPhysical作為邏輯節點、物理節點和Scalar節點的DXL表示類&#xff0c;因此其包含了這些類的共同部分特性&#xff0c;比如獲取其DXL節點表示的函…

Qt 文件對話框使用 Deepin風格

當你在Deepin或UOS 上開發 Qt 程序時&#xff0c;如果涉及到文件對話框功能&#xff0c;那么就會遇到調用原生窗口的問題。 如果你使用的是官方的Qt版本&#xff0c;那么在Deepin或者UOS系統上&#xff0c;彈出的文件對話框會是如下這樣&#xff1a; 而Deepin或UOS系統提供的默…

可視化高級繪圖技巧100篇-總論

前言 優秀的數據可視化作品可以用三個關鍵詞概括&#xff1a;準確、清晰、優雅。 準確&#xff1a;精準地反饋數據的特征信息&#xff08;既不遺漏也不冗余&#xff0c;不造成讀者疏漏&誤讀細節&#xff09; 清晰&#xff1a;獲取圖表特征信息的時間越短越好 優雅&…

Gitlab CI/CD筆記-第二天-主機套接字進行構建并push鏡像。

一、安裝gitlab-runner 1.可以是linux也可以是docker的 2.本文說的是docker安裝部署的。 二、直接上.gitlab-ci.yml stages: # List of stages for jobs, and their order of execution - build-image build-image-job: stage: build-image image: harbor.com:543/docke…

企業計算機服務器中了360后綴勒索病毒怎么辦,勒索病毒解密數據恢復

隨著計算機技術的不斷發展&#xff0c;企業的辦公系統得到了很大提升&#xff0c;但是隨之而來的網絡安全威脅也不斷增加&#xff0c;勒索病毒的攻擊事件時有發生。近期&#xff0c;我們收到某地連鎖超市的求助&#xff0c;企業的計算機服務器遭到了360后綴勒索病毒攻擊&#x…