RabbitMinQ(模擬實現消息隊列項目)02

目錄

十.整合數據庫和文件數據

創建DiskDataManager類

十一.內存結構設計

創建MeneryDataCenter類:

實現集合操作:

對MemoryDataCenter類功能測試:

十二.整合內存和磁盤數據

創建VirtualHost類:

Exchange:

MSGQueue:

Binding:

創建Router類

對Router類的TOPIC匹配進行測試:

發送消息:

創建ConsumerManager類:

訂閱消息:

創建ConsumerEnv類:

創建Consumer接口:

為MSGQueue類新增加二個屬性和方法,用于管理訂閱隊列的消費者集合:

在ConsumerManager類中實現添加消費者方法:

十三.網絡通信協議設計

設計應用層協議:

創建request類:

創建response類:

創建參數父類:

創建響應父類:

創建設備功能的參數類:

十四.實現BrokerServer

十五.實現客戶端

創建ConnectionFactory.

創建Connection類:

創建Channel類:

客戶端代碼測試:

十六.完成

成果測試:

啟動消息隊列服務器:

創建生產者 發送消息:

創建消費者消費消息:


十.整合數據庫和文件數據

上面的代碼中,使用數據庫存儲了Exchange,Queue,Binding,使用文件存儲了Message,

下面對數據庫和文件中的數據進行整合.進行統一管理.

創建DiskDataManager類

/*** 對數據庫中的Exchange,Queue,Binding和文件中的Message數據進行整合* 統一管理,后續上層代碼直接調用該類中的方法即可,無需再向下層數據結構調用*/
public class DiskDataManager {private DataBaseManager dataBaseManager = new DataBaseManager();private MessageFileManager messageFileManager = new MessageFileManager();public void init() throws JsonProcessingException {dataBaseManager.init();messageFileManager.init();}//交換機://添加交換機public void insertExchange(Exchange exchange){dataBaseManager.insertExchange(exchange);}//刪除交換機public void deleteExchange(String exchangeName){dataBaseManager.deleteExchange(exchangeName);}//查找交換機public List<Exchange> selectAllExchanges(){return dataBaseManager.selectAllExchanges();}//隊列//添加隊列public void insertQueue(MSGQueue queue) throws IOException, MqException {dataBaseManager.insertQueue(queue);//創建隊列后,不僅要將隊列寫入到數據庫中,還要創建出對應的目錄和文件messageFileManager.createQueueFile(queue.getName());}//刪除隊列public void deleteQueue(String queueName) throws IOException {dataBaseManager.deleteQueue(queueName);//刪除隊列后,還要講對應的目錄和文件刪除messageFileManager.destoryQueueFile(queueName);}//查找隊列public List<MSGQueue> selectAllQueues(){return dataBaseManager.selectAllQueues();}//綁定關系//添加綁定關系public void insertBinding(Binding binding){dataBaseManager.insertBinding(binding);}//刪除綁定關系public void deleteBinding(String bingingKey){dataBaseManager.deleteBindings(bingingKey);}//查找綁定public List<Binding> selectAllBindings(){return dataBaseManager.selectAllBindings();}//消息//發送消息public void sendMessage(MSGQueue queue, Message message) throws IOException, MqException {messageFileManager.sendMessage(queue,message);}//刪除消息public void deleteMessageFromQueue(MSGQueue queue,Message message) throws IOException, ClassNotFoundException, MqException {messageFileManager.deleteMessageFromFile(queue,message);//刪除消息后,查看是否需要進行GCif(messageFileManager.checkGC(queue.getName())){messageFileManager.GC(queue);}}//加載所有的消息到內存中public List<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {return messageFileManager.loadAllMessage(queueName);}
}

十一.內存結構設計

將數據存儲到數據庫和文件,是為了實現其持久性,但數據還是要存儲在內存上的,這樣才能更快的訪問到數據.

創建MeneryDataCenter類:

這里通過設計不同的數據集合來存儲數據在內存中.

/*** 將數據存儲在內存中,創建不同的數據集合來管理* 要管理的數據有:* 交換機* 隊列* 綁定關系* 消息* 隊列中的消息集合* 待確認消息隊列中的消息集合*/
public class MemoryDataCenter {//key:exchangeNameprivate ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();//key:queueNameprivate ConcurrentHashMap<String, MSGQueue> queueMap = new ConcurrentHashMap<>();
//   key1:exchangeName key2:queueNameprivate ConcurrentHashMap<String,ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();//key: messageIdprivate ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();
//  key:queueName  List:messageprivate ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>();//存儲在手動確認模式下,管理待確認的消息和隊列,在未收到確認消息時,要先將數據存儲到這個數據集合中,
//  key1:queueName   key2:messageIdprivate ConcurrentHashMap<String,ConcurrentHashMap<String,Message>> WaitAckQueueMessageMap = new ConcurrentHashMap<>();
}

實現集合操作:

 //交換機://插入public void insertExchange(Exchange exchange){exchangeMap.put(exchange.getName(),exchange);System.out.println("[MemoryDataCenter] 新增交換機成功 exchangeName:"+exchange.getName());}//刪除public void deleteExchange(String exchangeName){exchangeMap.remove(exchangeName);System.out.println("[MemoryDataCenter] 刪除交換機成功 exchangeName:"+exchangeName);}//查找public Exchange getExchange(String exchangeName){Exchange exchange = exchangeMap.get(exchangeName);return exchange;}//隊列//插入public void insertQueue(MSGQueue queue){queueMap.put(queue.getName(),queue);System.out.println("[MemoryDataCenter] 新增隊列成功! queueName: "+queue.getName());}//刪除public void deleteQueue(String queueName){queueMap.remove(queueName);System.out.println("[MemoryDataCenter] 隊列刪除成功! queueName: "+queueName);}//查找public MSGQueue getQueue(String queueName){return queueMap.get(queueName);}//綁定關系://新增public void insertBinding(Binding binding) throws MqException {
//        //綁定關系不存在時,創建一個,存在時,進行覆蓋
//        ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getQueueName());
//        if(bindingMap==null){
//            bindingMap = new ConcurrentHashMap<>();
//        }
//        bindingMap.put(binding.getQueueName(),binding);
//        bindingsMap.put(binding.getExchangeName(),bindingMap);//這個方法是ConcurrentMap方法用來判斷對應的哈希表是否存在,不存在就執行第二個參數,存在就直接賦值,和上面的邏輯是一樣的//且該方法是原子的,不存在線程安全問題ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),f -> new ConcurrentHashMap<>());//此處可能會存在線程安全問題,以綁定關系為基準進行上鎖synchronized (binding){Binding binding1 = bindingMap.get(binding.getQueueName());//當綁定關系已經存在時,拋出異常,只有新的綁定插入時,才會成功if(binding1!=null){throw new MqException("[MemoryDataCenter] 綁定已存在 exchangeName: "+binding.getExchangeName()+" ,queueName: "+binding.getQueueName());}bindingMap.put(binding.getQueueName(),binding);}System.out.println("[MemoryDataCenter] 新的綁定創建成功! +exchangeName:"+binding.getExchangeName()+" ,queueName: "+binding.getQueueName()+" ,bindingKey: "+binding.getBindingKey());}//刪除public void deleteBinding(Binding binding) throws MqException {//先判斷交換機是否存在綁定,不存在時,無法刪除.拋出異常ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName());if(bindingMap==null){throw new MqException("[MemoryDataCenter] 無綁定關系,刪除失敗 exchangeName: "+binding.getExchangeName()+" ,queueName: "+binding.getQueueName());}bindingMap.remove(binding.getQueueName());System.out.println("[MemoryDataCenter] 綁定刪除成功 exchangeName: "+binding.getExchangeName()+" ,queueName: "+binding.getQueueName()+" ,bindingKey:"+binding.getBindingKey());}//查找public Binding getBinding(String exchangeName,String queueName) throws MqException {ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName);if(bindingMap==null){return null;}Binding binding = bindingMap.get(queueName);return binding;}//消息//插入public void insertMessage(Message message){messageMap.put(message.getMessageId(),message);System.out.println("[MemoryDataCenter] 新增消息成功 messageID:"+message.getMessageId());}//刪除public void deleteMessage(String messageId){messageMap.remove(messageId);System.out.println("[MemoryDataCenter] 消息刪除陳功 messageId: "+messageId);}//查找public Message getMessage(String messageId){return messageMap.get(messageId);}//隊列消息集合//發送消息到指定隊列public void sendMessage(MSGQueue queue,Message message){//1.先查找隊列對應的集合是否存在,不存在時創建消息集合LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queue.getName(), f -> new LinkedList<>());//這里當多個線程同時執行插入操作時,可能會覆蓋消息,要以集合為維度進行上鎖synchronized(messages){messages.add(message);}//將消息也存入到消息集合中messageMap.put(message.getMessageId(),message);System.out.println("[MemoryDataCenter] 發送消息到隊列成功 queueName:"+queue.getName()+" ,messageId:"+message.getMessageId());}//從隊列中取消息public Message pollMessage(String queueName){LinkedList<Message> messages = queueMessageMap.get(queueName);if(messages==null || messages.isEmpty()){return null;}Message message = messages.remove(0);System.out.println("[MemoryDataCenter] 從隊列中取消息成功 queueName:"+queueName+" ,messageId:"+message.getMessageId());return message;}//獲取隊列中的消息個數public int getMessageCountFromQueue(String queueName){LinkedList<Message> messages = queueMessageMap.get(queueName);if(messages==null) return 0;//此處獲取集合中元素個數可能存在線程安全問題,對集合進行上鎖synchronized(messages){return messages.size();}}//待確認消息集合//發送消息到待確認消息集合public void sendWaitMessage(String queueName,Message message){ConcurrentHashMap<String, Message> waitMessagesMap = queueMessageWaitAckMap.computeIfAbsent(queueName, f -> new ConcurrentHashMap<>());//此處向待確認消息集合中插入數據時,也可能存在線程安全問題,以集合為維度加鎖synchronized (waitMessagesMap){waitMessagesMap.put(message.getMessageId(),message);}System.out.println("[MemoryDataCenter] 發送待確認消息到隊列成功 queueName:"+queueName+" ,messageId:"+message.getMessageId());}//從隊列中取待確認消息public Message pollWaitMessage(String queueName,String messageId){ConcurrentHashMap<String, Message> waitMessagesMap = queueMessageWaitAckMap.get(queueName);if(waitMessagesMap==null){return null;}Message message = waitMessagesMap.get(messageId);if(message==null){return null;}System.out.println("[MemoryDataCenter] 從隊列中取代確認消息成功 messageId:"+messageId+" ,queueName:"+queueName);return message;}//從隊列中刪除待確認消息public void deleteWaitMessage(String queueName,String messageId){ConcurrentHashMap<String, Message> waitMessagesMap = queueMessageWaitAckMap.get(queueName);if(waitMessagesMap==null){System.out.println("[MemoryDataCenter] 待確認消息隊列不存在,消息刪除失敗 messageId:"+messageId+" ,queueName:"+queueName);}waitMessagesMap.remove(messageId);System.out.println("[MemoryDataCenter] 待確認消息刪除成功 messageId:"+messageId+" ,queueName:"+queueName);}//恢復所有硬盤中的數據//當服務器重啟后,內存中的數據都不存在了,要從磁盤中獲取數據public void recovery(DiskDataManager diskDataManager) throws IOException, MqException, ClassNotFoundException {//先將內存中的集合都清空,防止存在殘留數據exchangeMap.clear();queueMap.clear();bindingsMap.clear();messageMap.clear();queueMessageMap.clear();queueMessageWaitAckMap.clear();//恢復交換機數據List<Exchange> exchanges = diskDataManager.selectAllExchanges();for(Exchange e:exchanges){String exchangeName = e.getName();exchangeMap.put(exchangeName,e);}//恢復隊列數據List<MSGQueue> queues = diskDataManager.selectAllQueues();for(MSGQueue q:queues){String queueName = q.getName();queueMap.put(queueName,q);}//恢復綁定關系List<Binding> bindings = diskDataManager.selectAllBindings();for(Binding b:bindings){ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(b.getExchangeName(), f -> new ConcurrentHashMap<>());bindingMap.put(b.getQueueName(),b);}//恢復消息for(MSGQueue q:queueMap.values()){List<Message> messages = diskDataManager.loadAllMessageFromQueue(q.getName());for(Message m:messages){messageMap.put(m.getMessageId(),m);}}//對于未確認消息,當服務器重啟后,服務器中所有的消息都要重新發送,未被確認的消息就都成了未被取走的消息了,//對于未確認的消息, 就不需要回復這些數據了}

對MemoryDataCenter類功能測試:

@SpringBootTest
public class MemoryDataCenter {private MemoryDataCenter memoryDataCenter;@BeforeEachpublic void setUp(){memoryDataCenter = new MemoryDataCenter();System.out.println("前置工作已經準備后!");}@AfterEachpublic void tearDown(){memoryDataCenter = null;System.out.println("收尾工作以完成!");}
}

測試功能:

//測試交換機相關操作private Exchange createExchange(String exchangeName){Exchange exchange = new Exchange();exchange.setName(exchangeName);exchange.setType(ExchangeType.DIRECT);exchange.setDurable(true);exchange.setAutoDelete(false);return exchange;}@Testvoid testExchange(){Exchange exchange = createExchange("exchangeTest");memoryDataCenter.insertExchange(exchange);Exchange act = memoryDataCenter.getExchange(exchange.getName());Assertions.assertEquals(exchange.getName(),act.getName());Assertions.assertEquals(exchange.getType(),act.getType());Assertions.assertEquals(exchange.isDurable(),act.isDurable());Assertions.assertEquals(exchange.isAutoDelete(),act.isAutoDelete());memoryDataCenter.deleteExchange(exchange.getName());act = memoryDataCenter.getExchange(exchange.getName());Assertions.assertNull(act);}//測試隊列相關操作private MSGQueue createQueue(String queueName){MSGQueue queue = new MSGQueue();queue.setName(queueName);queue.setDurable(true);queue.setAutoDelete(false);return queue;}@Testvoid testQueue(){MSGQueue queue = createQueue("queueTest");memoryDataCenter.insertQueue(queue);MSGQueue act = memoryDataCenter.getQueue(queue.getName());Assertions.assertEquals(queue.getName(),act.getName());Assertions.assertEquals(queue.isDurable(),act.isDurable());Assertions.assertEquals(queue.isAutoDelete(),act.isAutoDelete());memoryDataCenter.deleteQueue(queue.getName());act  = memoryDataCenter.getQueue(queue.getName());Assertions.assertNull(act);}//測試綁定關系相關操作private Binding createBinding(String exchangeName,String queueName,String bindingKey){Binding binding = new Binding();binding.setExchangeName(exchangeName);binding.setQueueName(queueName);binding.setBindingKey(bindingKey);return binding;}@Testvoid testBinding() throws MqException {//要先創建隊列和交換機Exchange exchange = createExchange("exchangeTest");MSGQueue queue = createQueue("queueTest");Binding binding = createBinding(exchange.getName(), queue.getName(), "bindingKeyTest");memoryDataCenter.insertBinding(binding);Binding act = memoryDataCenter.getBinding(exchange.getName(), queue.getName());Assertions.assertEquals(binding.getExchangeName(),act.getExchangeName());Assertions.assertEquals(binding.getQueueName(),act.getQueueName());Assertions.assertEquals(binding.getBindingKey(),act.getBindingKey());memoryDataCenter.deleteBinding(binding);act = memoryDataCenter.getBinding(act.getExchangeName(),act.getQueueName());Assertions.assertNull(act);}//測試消息操作private Message createMessage(String body){Message message = new Message();return message.createMessageById("routingKeyTest",null,body.getBytes());}@Testpublic void testMessage(){Message expectedMessage = createMessage("testMessage");memoryDataCenter.insertMessage(expectedMessage);Message actualMessage = memoryDataCenter.getMessage(expectedMessage.getMessageId());Assertions.assertEquals(expectedMessage,actualMessage);//刪除消息memoryDataCenter.deleteMessage(expectedMessage.getMessageId());actualMessage = memoryDataCenter.getMessage(expectedMessage.getMessageId());Assertions.assertNull(actualMessage);}//測試隊列中的消息集合@Testvoid testQueueMessage(){MSGQueue queue = createQueue("queueTest");Message message = createMessage("hello");memoryDataCenter.sendMessage(queue,message);int n = memoryDataCenter.getMessageCountFromQueue(queue.getName());Assertions.assertEquals(1,n);Message act = memoryDataCenter.pollMessage(queue.getName());n = memoryDataCenter.getMessageCountFromQueue(queue.getName());Assertions.assertEquals(0,n);Assertions.assertEquals(message.getMessageId(),act.getMessageId());Assertions.assertArrayEquals(message.getBody(),act.getBody());Assertions.assertEquals(message.getIsVaild(),act.getIsVaild());Assertions.assertEquals(message.getDeliveryMode(),act.getDeliveryMode());}//測試待確認隊列集合@Testvoid testWaitMessageQueue(){MSGQueue queue = createQueue("queueTest");Message message = createMessage("hello");memoryDataCenter.sendWaitMessage(queue.getName(), message);Message act = memoryDataCenter.pollWaitMessage(queue.getName(), message.getMessageId());Assertions.assertEquals(message.getMessageId(),act.getMessageId());Assertions.assertArrayEquals(message.getBody(),act.getBody());Assertions.assertEquals(message.getIsVaild(),act.getIsVaild());Assertions.assertEquals(message.getDeliveryMode(),act.getDeliveryMode());memoryDataCenter.deleteWaitMessage(queue.getName(), message.getMessageId());act = memoryDataCenter.pollWaitMessage(queue.getName(), message.getMessageId());Assertions.assertNull(act);}//測試加載磁盤所有數據到內存@Testvoid testRecovery() throws IOException, MqException, ClassNotFoundException {//這里需要使用到mybatis,需要進行了類加載,先啟動SpringApplicationMq02Application.context = SpringApplication.run(Mq02Application.class);//在磁盤上構造好數據:DiskDataManager diskDataCenter = new DiskDataManager();diskDataCenter.init("");//創建交換機:Exchange exchange = createExchange("testExchange");diskDataCenter.insertExchange(exchange);//創建隊列:MSGQueue queue = createQueue("testQueue");diskDataCenter.insertQueue(queue);//創建綁定Binding binding = new Binding();binding.setExchangeName(exchange.getName());binding.setQueueName(queue.getName());binding.setBindingKey("bindingKey");diskDataCenter.insertBinding(binding);//創建消息Message message = createMessage("testContext");diskDataCenter.sendMessage(queue,message);//執行恢復:memoryDataCenter.recovery(diskDataCenter);//結果比對://交換機:Exchange actualExchange = memoryDataCenter.getExchange(exchange.getName());Assertions.assertEquals(exchange.getName(),actualExchange.getName());Assertions.assertEquals(exchange.getType(),actualExchange.getType());Assertions.assertEquals(exchange.isDurable(),actualExchange.isDurable());Assertions.assertEquals(exchange.isAutoDelete(),actualExchange.isAutoDelete());//隊列:MSGQueue actualQueue = memoryDataCenter.getQueue(queue.getName());Assertions.assertEquals(queue.getName(),actualQueue.getName());Assertions.assertEquals(queue.isDurable(),actualQueue.isDurable());Assertions.assertEquals(queue.isAutoDelete(),actualQueue.isAutoDelete());//綁定:Binding actulaBinding = memoryDataCenter.getBinding(exchange.getName(), queue.getName());Assertions.assertEquals(binding.getExchangeName(),actulaBinding.getExchangeName());Assertions.assertEquals(binding.getQueueName(),actulaBinding.getQueueName());Assertions.assertEquals(binding.getBindingKey(),actulaBinding.getBindingKey());//消息:Message actualMessage = memoryDataCenter.getMessage(message.getMessageId());Assertions.assertEquals(message.getMessageId(),actualMessage.getMessageId());Assertions.assertEquals(message.getDeliveryMode(),actualMessage.getDeliveryMode());Assertions.assertEquals(message.getRoutingKey(),actualMessage.getRoutingKey());Assertions.assertArrayEquals(message.getBody(),actualMessage.getBody());//   清除文件//清理之前要先關閉文件Mq02Application.context.close();File file = new File("./data");FileUtils.deleteDirectory(file);}

十二.整合內存和磁盤數據

將內存和磁盤上的數據進行整合,用"虛擬機"這個概念將其整合起來.?不同虛擬機中的交換機 隊列,綁定關系,消息都是不互通的. 此處為了簡化,僅實現單臺虛擬主機,但在數據結構上設置不同虛擬主句名
?為區分不同的虛擬主機上的設備,通過配置設備名區別:(以虛擬機名為前綴)
?規定:
?* ?exchangeName = virtualHostName+exchangeName;
?* ?queueName = virtualHostName+queueName;
并且將調用的方法拋出的異常都在這個類中進行處理,不再向上拋出

創建VirtualHost類:

@Data
public class VirtualHost {private String virtualHostName;private MemoryDataCenter memoryDataCenter = new MemoryDataCenter();private DiskDataManager diskDataManager = new DiskDataManager();public VirtualHost(String virtualHostName){this.virtualHostName = virtualHostName;//初始化磁盤數據:diskDataManager.init();//初始化內存數據try {memoryDataCenter.recovery(diskDataManager);} catch (IOException | MqException | ClassNotFoundException e) {System.out.println("[VirtualHost] 內存數據恢復失敗");e.printStackTrace();}}
}

Exchange的聲明和刪除:

 //在對交換機在內存和磁盤上插入和刪除數據時,可能存在線程安全問題,要以交換機為維度對其上鎖//交換機鎖對象:private final Object exchangeLocker = new Object();//交換機操作://創建交換機,//創建后,將其保存到內存和磁盤上public boolean exchangeDeclare(String exchangeName, ExchangeType type, boolean durable,boolean autoDelete, Map<String,Object> args){//   先根據約定 設置交換機名exchangeName = virtualHostName + exchangeName;synchronized(exchangeLocker){//先在內存上查找,若已存在,則直接返回Exchange exchange = memoryDataCenter.getExchange(exchangeName);if(exchange!=null){System.out.println("[VirtualHost] 交換機已經存在,不再創建 exchangeName:"+exchangeName);return true;}exchange = new Exchange();exchange.setName(exchangeName);exchange.setType(type);exchange.setDurable(durable);exchange.setAutoDelete(autoDelete);//這里對args參數的設置.要在Exchange類中再為args關于Map參數添加set和get方法exchange.setArgs(args);//先存入數據庫,再存入內存中,//這個順序是:插入數據庫操作比較容易出現異常,存內存出現異常的可能小較小//         若插入數據庫失敗,則不再存入內存中;//         若是轉換順序,當存數據庫出現異常時,還要將內存中的數據再刪了,比較麻煩if(durable){//當交換機設置為持久化時,將其存入內存:diskDataManager.insertExchange(exchange);}//存入內存memoryDataCenter.insertExchange(exchange);System.out.println("[VirtualHost] 交換機創建成功 exchangeName:"+exchangeName);return true;}}//刪除交換機//在內存和磁盤上將數據刪除public boolean exchangeDelete(String exchangeName){exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker){Exchange exchange = memoryDataCenter.getExchange(exchangeName);if(exchange==null){throw new MqException("[VirtualHost] 要刪除的交換機不存在 exchangeName:"+exchangeName);}//刪除內存數據:memoryDataCenter.deleteExchange(exchangeName);//刪除磁盤數據:boolean durable = exchange.isDurable();if(durable){diskDataManager.deleteExchange(exchangeName);}System.out.println("[VirtualHost] 交換機刪除成功 exchangeName:"+exchangeName);return true;}} catch (MqException e) {System.out.println("[VirtualHost] 交換機刪除失敗 exchangeName:"+exchangeName);e.printStackTrace();}return false;}

在Exchange類中關于args屬性上,再增加關于Map參數類型的set方法:

 public void setArgs(Map<String,Object> args){this.args = args;}

MSGQueue的聲明和刪除:

//在對隊列在內存和磁盤上插入和刪除數據時,可能存在線程安全問題,要以隊列為維度對其上鎖//創建 隊列鎖對象:private final Object queueLocker = new Object();/**隊列* 創建隊列:創建隊列并將其存入到磁盤和內存中*/public boolean queueDeclare(String queueName,boolean isDurable,boolean autoDelete, Map<String,Object> args){queueName = virtualHostName+queueName;try {synchronized(queueLocker){MSGQueue existsQueue = memoryDataCenter.getQueue(queueName);if(existsQueue!=null){System.out.println("[VirtualHost] 隊列已經存在 queueName:"+queueName);return true;}MSGQueue queue = new MSGQueue();queue.setName(queueName);queue.setDurable(isDurable);queue.setAutoDelete(autoDelete);//此處在MSGQueue類中,針對args屬性,要實現關于Map類型的set方法queue.setArgs(args);//存入磁盤if(isDurable) {diskDataManager.insertQueue(queue);}//存入內存memoryDataCenter.insertQueue(queue);System.out.println("[VirtualHost] 創建隊列成功 !");}return true;} catch (IOException | MqException e) {System.out.println("[VirtualHost] 創建隊列失敗 queueName:"+queueName);e.printStackTrace();return false;}}//刪除隊列:從磁盤和內存中 刪除隊列public boolean queueDelete(String queueName){queueName = virtualHostName+queueName;try{synchronized(queueLocker){MSGQueue existsQueue = memoryDataCenter.getQueue(queueName);if(existsQueue==null){throw new MqException("[VirtualHost] 隊列不存在,刪除隊列失敗 queueName:"+queueName);}if(existsQueue.isDurable()){diskDataManager.deleteQueue(queueName);}memoryDataCenter.deleteQueue(queueName);System.out.println("[VirtualHost] 刪除隊列成功 queueName:"+queueName);}return true;}catch (Exception e) {System.out.println("[VirtualHost] 隊列刪除失敗! queueName:" +queueName);e.printStackTrace();return false;}}

同樣,在MSGQueue類中關于args屬性上,再增加關于Map參數類型的set方法:

 public void setArgs(Map<String,Object> args){this.args = args;}

Binding的創建和刪除:


//  該類實現和綁定相關的操作private Router router = new Router();//綁定的插入和刪除//插入綁定public boolean bindingDeclare(String exchangeName,String queueName,String bindingKey){exchangeName = virtualHostName + exchangeName;queueName = virtualHostName + queueName;try{//1.驗證綁定是否存在,不存在再創建Binding binding = memoryDataCenter.getBinding(exchangeName, queueName);if(binding!=null){throw new MqException("[VirtualHost] 綁定已存在 exchangeName:"+exchangeName+" ,queueName:"+queueName);}//這里再創建一個類router,實現關于綁定相關的操作//2.判斷bindingKey格式是否正確boolean ok = router.checkBindingKey(bindingKey);if(!ok){throw new MqException("[VirtualHost] 綁定格式有誤 bindingKey:"+bindingKey);}//3.創建綁binding = new Binding();binding.setExchangeName(exchangeName);binding.setQueueName(queueName);binding.setBindingKey(bindingKey);//4.驗證綁定的隊列和交換機是否存在MSGQueue queue = memoryDataCenter.getQueue(queueName);if(queue==null){throw new MqException("[VirtualHost] 要綁定的隊列不存在 queueName:"+queueName+" ,bindingKey:"+bindingKey);}Exchange exchange = memoryDataCenter.getExchange(exchangeName);if(exchange==null){throw new MqException("[VirtualHost] 要綁定的交換機不存在 exchangeName:"+exchangeName+" ,bindingKey:"+bindingKey);}//5,存入磁盤//當隊列和交換機同時設置持久化時,將該綁定關系存入磁盤if(queue.isDurable() && exchange.isDurable()){diskDataManager.insertBinding(binding);}//6.存入內存memoryDataCenter.insertBinding(binding);System.out.println("[VirtualHost] 創建綁定成功 bindingKey: "+bindingKey);return true;}catch (MqException e) {System.out.println("[VirtualHost] 創建綁定失敗 bindingKey:"+bindingKey);e.printStackTrace();}return false;}//刪除綁定public boolean bindingDelete(String exchangeName,String queueName){exchangeName = virtualHostName + exchangeName;queueName = virtualHostName + queueName;try {Binding binding = memoryDataCenter.getBinding(exchangeName, queueName);if(binding==null){throw new MqException("[VirtualHost] 綁定不存在 queueName:"+queueName+" ,exchangeName:"+exchangeName);}//從內存刪除memoryDataCenter.deleteBinding(binding);//從磁盤刪除//此處可能綁定沒有保存在磁盤上,刪除失敗,但沒有關系,沒有影響diskDataManager.deleteBinding(binding.getBindingKey());System.out.println("[VirtualHost] 刪除綁定成功 exchangeName:"+exchangeName+" ,queueName:"+queueName +" , bindingKey:"+binding.getBindingKey());return true;}catch(Exception e){System.out.println("[VirtualHost] 刪除綁定失敗 exchangeName:"+exchangeName+" ,queueName:"+queueName);e.printStackTrace();return false;}}

創建Router類:

實現匹配判斷功能:

/*** 該類實現和綁定相關的操作//路由規定://routingKey: 只能由 數字 字母(大小寫) 下劃線 構成,使用.作為分割//bindingKey:只能包含 數字 字母 下劃線 * #,以 . 作為分割,* #只能作為獨立的分段*/
public class Router {//判斷消息攜帶的綁定格式是否正確public boolean checkRoutingKey(String routingKey){char[] ch = routingKey.toCharArray();for(char i:ch){if(i>='a' && i<='z') continue;if(i>='A' && i<='Z') continue;if(i>='0' && i<='9') continue;if(i=='.' || i=='_') continue;else return false;}return true;}//判斷綁定格式是否正確public boolean checkBindingKey(String bindingKey){char[] ch = bindingKey.toCharArray();for(char c:ch){if(c>='A' && c<='Z') continue;if(c>='a' && c<='z') continue;if(c>='0' && c<='9') continue;if(c=='_' || c=='*' || c=='#' || c=='.') continue;else return false;}//規定不能讓* #相連,即出現以下情況規定不成立:// *.#   #.*  #.#//以 . 對字符串進行分隔,判斷String[] s = bindingKey.split("\\.");for(int i=0;i<s.length-1;i++){if (s[i].equals("*") && s[i+1].equals("#") ||s[i].equals("#") && s[i+1].equals("*") ||s[i].equals("#") && s[i+1].equals("#")) {return false;}}return true;}//判斷bindingKey與routingKey是否匹配成功public boolean isRouting(ExchangeType type,String routingKey,String bindingKey) throws MqException {//判斷當前交換機類型:fanout/topicif(type==ExchangeType.FANOUT){//匹配到綁定交換機的所有隊列//直接返回即可return true;}else if(type==ExchangeType.TOPIC){//進行routingKey和BindingKey的匹配判斷return routingTopic(routingKey,bindingKey);}else{throw new MqException("[Router] 交換機類型有誤 type:"+type);}}/*** 規定:rotingKey匹配bindingKey*  *:匹配任意單個字符串*  #:匹配任意個任意字符串* @param routingKey 消息攜帶的匹配字符串* @param bindingKey 交換機和隊列的綁定關系* @return*/private boolean routingTopic(String routingKey, String bindingKey) {String[] b = bindingKey.split("\\.");String[] r = routingKey.split("\\.");int n1 = b.length;int n2 = r.length;int i = 0;int j = 0;while(i<n1 && j<n2){if(b[i].equals("*")){//可以匹配routingKey的任意單個字符//直接向后走就行:i++;j++;}else if(b[i].equals("#")){//匹配routingKey的任意個任意字符if(i==n1-1){//當bindingKey的最后一個字符為#時,可以匹配routingKey后面的所有字符串,直接返回true即可:return true;}else{//當b的#不是最后一個字符時,就找r之后的字符串中是否有b的下一個字符串的下標,當找不到時,就返回-1:i++;j = checkNext(b[i],r,j);//當在r中找不到b的下一個字符串時,一定匹配失敗,直接返回if(j==-1) return false;else{i++;j++;}}}else{//b為普通字符串時if(!b[i].equals(r[j])) return false;else {i++;j++;}}}//b / r有一個已經匹配到結尾了,只有兩個都完全匹配完,才算匹配成功if(i!=n1 || j!=n2){return false;}return true;}private int checkNext(String next, String[] r, int j) {for(int k=j;k<r.length;k++){if(r[k].equals(next)) return k;}return -1;}}

對Router類的TOPIC匹配進行測試:

@SpringBootTest
public class RouterTopicTest {private Router router = new Router();// [測試用例]// binding key          routing key         result// aaa                  aaa                 true// aaa.bbb              aaa.bbb             true// aaa.bbb              aaa.bbb.ccc         false// aaa.bbb              aaa.ccc             false// aaa.bbb.ccc          aaa.bbb.ccc         true// aaa.*                aaa.bbb             true// aaa.*.bbb            aaa.bbb.ccc         false// *.aaa.bbb            aaa.bbb             false// #                    aaa.bbb.ccc         true// aaa.#                aaa.bbb             true// aaa.#                aaa.bbb.ccc         true// aaa.#.ccc            aaa.ccc             true// aaa.#.ccc            aaa.bbb.ccc         true// aaa.#.ccc            aaa.aaa.bbb.ccc     true// #.ccc                ccc                 true// #.ccc                aaa.bbb.ccc         true@Testvoid test01() throws MqException {boolean ok = router.isRouting(ExchangeType.TOPIC, "aaa", "aaa");Assertions.assertTrue(ok);}@Testvoid test02() throws MqException {boolean ok = router.isRouting(ExchangeType.TOPIC,"aaa.bbb", "aaa.bbb");Assertions.assertTrue(ok);}@Testvoid test03() throws MqException {boolean ok = router.isRouting(ExchangeType.TOPIC,"aaa.bbb.ccc", "aaa.bbb");Assertions.assertFalse(ok);}@Testvoid test04() throws MqException {boolean ok = router.isRouting(ExchangeType.TOPIC,"aaa.ccc", "aaa.bbb");Assertions.assertFalse(ok);}@Testvoid test05() throws MqException {boolean ok = router.isRouting(ExchangeType.TOPIC,"aaa.bb.cc", "aaa.bb.cc");Assertions.assertTrue(ok);}@Testvoid test06() throws MqException {boolean ok = router.isRouting(ExchangeType.TOPIC,"aaa.bb", "aaa.*");Assertions.assertTrue(ok);}@Testvoid test07() throws MqException {boolean ok = router.isRouting(ExchangeType.TOPIC,"aaa.bb.cc", "aaa.*.bb");Assertions.assertFalse(ok);}@Testvoid test08() throws MqException {boolean ok = router.isRouting(ExchangeType.TOPIC,"aaa.bb", "*.aaa.bb");Assertions.assertFalse(ok);}@Testvoid test09() throws MqException {boolean ok = router.isRouting(ExchangeType.TOPIC,"aaa.bb.cc", "#");Assertions.assertTrue(ok);}@Testvoid test10() throws MqException {boolean ok = router.isRouting(ExchangeType.TOPIC,"aaa.bb", "aaa.#");Assertions.assertTrue(ok);}@Testvoid test11() throws MqException {boolean ok = router.isRouting(ExchangeType.TOPIC,"aaa.bb.cc", "aaa.#");Assertions.assertTrue(ok);}@Testvoid test12() throws MqException {boolean ok = router.isRouting(ExchangeType.TOPIC,"aaa.cc", "aaa.#.cc");Assertions.assertTrue(ok);}@Testvoid test13() throws MqException {boolean ok = router.isRouting(ExchangeType.TOPIC,"aaa.bb.cc", "aaa.#.cc");Assertions.assertTrue(ok);}@Testvoid test14() throws MqException {boolean ok = router.isRouting(ExchangeType.TOPIC,"aaa.aaa.bb.cc", "aaa.#.cc");Assertions.assertTrue(ok);}@Testvoid test15() throws MqException {boolean ok = router.isRouting(ExchangeType.TOPIC,"cc", "#.cc");Assertions.assertTrue(ok);}    @Testvoid test16() throws MqException {boolean ok = router.isRouting(ExchangeType.TOPIC,"aaa.bb.cc", "#.cc");Assertions.assertTrue(ok);}

發送消息:

//發送消息到隊列public boolean basicPublish(String exchangeName,String routingKey,BasicProperties basicProperties,byte[] body){exchangeName = virtualHostName + exchangeName;try {//1.判斷交換機是否存在Exchange exchange = memoryDataCenter.getExchange(exchangeName);if(exchange==null){throw new MqException("[VirtualHost] 交換機不存在 exchangeName:"+exchangeName);}//2.判斷routingKey格式是否正確boolean ok = router.checkRoutingKey(routingKey);if(!ok) {throw new MqException("[VirtualHost] routingKey格式有誤 routingKey:"+routingKey);}//3.根據交換機的類型進行路由匹配,分發消息if(exchange.getType()==ExchangeType.DIRECT){//直接交換機,routingKey就是隊列名,bindingKey無用,將消息路由到指定的隊列上//獲取到指定隊列String queueName = virtualHostName + routingKey;MSGQueue queue = memoryDataCenter.getQueue(queueName);if(queue==null){throw new MqException("[VirtualHost] 隊列不存在 queueName:"+queueName);}//構造消息對象Message message = new Message();message = message.createMessageById(null,basicProperties,body);//發送消息到隊列,再構造一個方法實現sendMessage(queue,message);}else{//當交換機類型為fanout/topic時://遍歷交換機所有的綁定ConcurrentHashMap<String, Binding> bindings = memoryDataCenter.getBindings(exchangeName);for(Binding b:bindings.values()){MSGQueue queue = memoryDataCenter.getQueue(b.getQueueName());//判斷交換機綁定的隊列是否存在:if(queue==null){System.out.println("[VirtualHost] 隊列不存在 queueName:"+b.getQueueName());continue;}//構造消息對象Message message = new Message().createMessageById(routingKey, basicProperties, body);//判斷routingKey與binding是否成功if(!router.isRouting(exchange.getType(),message.getRoutingKey(),b.getBindingKey())){//匹配失敗:System.out.println("[VirtualHost] routingKey和BindingKey不匹配 routingKey:"+routingKey+" , bindingKey:"+b.getBindingKey());continue;}//匹配成功時,就將消息轉發sendMessage(queue,message);System.out.println("[VirtualHost] 消息發送成功 queueName:"+queue.getName()+" ,messageId:"+message.getMessageId());}}return true;}catch (Exception e){System.out.println("[VirtualHost]消息發送失敗 ");e.printStackTrace();return false;}}//消費者管理對象:private ConsumerManager consumerManager = new ConsumerManager(this);private void sendMessage(MSGQueue queue, Message message) throws IOException, MqException, InterruptedException {//存入磁盤//是否持久化//1:持久化 0:非持久化if(message.getDeliveryMode()==1){diskDataManager.sendMessage(queue,message);}//存入內存:memoryDataCenter.sendMessage(queue,message);//消息已經到達隊列,通知訂閱隊列的消費者消費消息consumerManager.notifyConsumer(queue.getName());System.out.println("[VirtualHost] 發送消息成功");}

創建ConsumerManager類:

對消費者進行管理:


/*** 消費者管理類*/
public class ConsumerManager {//持有上層的VirtualHost對象的引用,用來操作數據private VirtualHost virtualHost;//    使??個線程池?來執?消息回調private ExecutorService workerPool = Executors.newFixedThreadPool(4);//存放令牌(隊列名)的隊列:那個隊列當前有消息了,就將隊列名加入到阻塞隊列中//然后掃描線程通過該隊列中存放的隊列名找到對應的消息和訂閱者,將信息打包放到線程池中進行消費private BlockingQueue<String> tokenQueue = new LinkedBlockingQueue<>();//掃描線程private Thread scannerThread = null;//通知消費者消費消息://調用時機:發送方發送消息成功后,//當隊列中有消息了,就將其放到阻塞隊列中,然后就要通知消費者消費消息了public void notifyConsumer(String queueName) throws InterruptedException {tokenQueue.put(queueName);}
}

訂閱消息:

    /**//訂閱消息//添加一個訂閱者:* @param consumerTag 消費者身份標識* @param queueName 隊列名* @param autoAck 是否自動確認消息* @param consumer 回調函數* @return*/public boolean basicConsume(String consumerTag, String queueName,boolean autoAck, Consumer consumer){queueName = virtualHostName + queueName;try {//通過消費者管理類實現添加消費者功能consumerManager.addConsumer(consumerTag,queueName,autoAck,consumer);System.out.println("[VirtualHost] basicConsumer 成功 queueName:"+queueName);return true;}  catch (MqException e) {System.out.println("[VirtualHost] basicConsumer 失敗 queueName:"+queueName);e.printStackTrace();return false;}}

創建ConsumerEnv類:

消費者完整環境類:


/*** 表示消費者(完整的執行環境)*/
@Data
public class ConsumerEnv {//消費者唯一標識private String consumerTag;//訂閱隊列的隊列名字private String queueName;//是否自動確認消息private boolean autoAck;//要執行的具體功能,通過一個接口,由調用者自己實現其方法體private Consumer consumer;
}

創建Consumer接口:

實現消費者的回調函數接口:通過lambda表達式,讓消費者自己實現對消息的處理

/*** 函數式接口,回調函數,當消費者收到消息后,要處理消息,調用者通過這個接口實現具體的功能*/
@FunctionalInterface
public interface Consumer {//deliver:投遞的意思,這個方法在每次服務器收到發送來的消息后,調用//通過這個方法把消息推送給對應的消費者void handlerDeliver(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException;
}

為MSGQueue類新增加二個屬性和方法,用于管理訂閱隊列的消費者集合:

//此處再添加一個屬性:訂閱該隊列的消費者集合private List<ConsumerEnv> consumerEnvList = new ArrayList<>();//當訂閱隊列的消費者不止一個時 , 規定以輪訓的方式消費消息//再添加一個屬性,記錄當前輪到哪個消費者消費消息了//這里使用AtomicInteger類來實現,目的是不讓手動修改,且要實現自增的功能private AtomicInteger atomicInteger = new AtomicInteger(0);//添加一個新的訂閱者(消費者)public void addConsumerEnv(ConsumerEnv consumerEnv){consumerEnvList.add(consumerEnv);}//挑選一個訂閱者,消費當前消息,按照輪訓的方式public ConsumerEnv chooseConsumerEnv(){if(consumerEnvList.isEmpty()){//當前該隊列還沒有消費者訂閱System.out.println("[MSGQueue] 當前該隊列沒有訂閱者");return null;}//按照輪訓的方式獲取一個要消費消息的訂閱者下標int index = atomicInteger.get()%consumerEnvList.size();//讓輪訓值 自增atomicInteger.getAndIncrement();return consumerEnvList.get(index);}

在ConsumerManager類中實現添加消費者方法:

 //添加新的消費者,并消費隊列中當前存在的消息public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {//1.找到對應的隊列MSGQueue queue = virtualHost.getMemoryDataCenter().getQueue(queueName);if(queue==null){throw new MqException("[ConsumerManager] 隊列不存在 queueName:"+queueName);}//2.創建一個消費者ConsumerEnv consumerEnv = new ConsumerEnv();consumerEnv.setConsumerTag(consumerTag);consumerEnv.setQueueName(queueName);consumerEnv.setAutoAck(autoAck);consumerEnv.setConsumer(consumer);//3.將訂閱者加入到隊列的訂閱者隊列中queue.addConsumerEnv(consumerEnv);//4.當隊列中已經有一些消息時,要將其消費掉synchronized (queue){int n = virtualHost.getMemoryDataCenter().getMessageCountFromQueue(queueName);for(int i=0;i<n;i++){//這個方法調用一次就消費一條消息consumerMessage(queue);}}}//  消費消息:調用消息的回調函數,并將消息從隊列中刪除//從隊列中獲取一個消息,并讓消費者消費,// 當消費者不止一個時,按照輪訓的方式讓消費者依次消費消息private void consumerMessage(MSGQueue queue) throws MqException {//1.從隊列的訂閱者中挑選一個訂閱者ConsumerEnv consumerEnv = queue.chooseConsumerEnv();if(consumerEnv==null){//當前隊列號沒有訂閱者,無法消費消息System.out.println("[ConsumerManager] 當前隊列中還沒有訂閱者");return;}//2.消費消息Message message = virtualHost.getMemoryDataCenter().pollMessage(queue.getName());if(message==null){//當前隊列中還沒有消息,不需要消費System.out.println("當前隊列中還沒有消息");return;}//將消息帶到消費者的回調方法中,給線程池執行workerPool.submit(()->{try{//1.在執行回調之前,先將消息放到待確認隊列集合中,一旦消息被消費失敗了.就重新發送消息virtualHost.getMemoryDataCenter().sendWaitMessage(queue.getName(),message);//2.執行訂閱者的回調方法consumerEnv.getConsumer().handlerDeliver(consumerEnv.getConsumerTag(),message.getBasicProperties(),message.getBody());//3.根據消費者的確認消息方式及消費者消費消息的情況,執行刪除消息操作//  這里完成為自動確認模式下的操作,手動模式下,在basicAck方法中實現if(consumerEnv.isAutoAck()){//4.刪除磁盤中的數據//  是否持久化//  1:非持久化 0:持久化if(message.getDeliveryMode()==0) {virtualHost.getDiskDataManager().deleteMessageFromQueue(queue, message);}//5.刪除未確認消息隊列中的消息virtualHost.getMemoryDataCenter().deleteWaitMessage(queue.getName(), message.getMessageId());//6.刪除消息集合中的消息virtualHost.getMemoryDataCenter().deleteMessage(message.getMessageId());System.out.println("[ConsumerManager] 消息被成功消費 ");}}catch (Exception e){System.out.println("[ConsumerManager] 消費消息失敗");e.printStackTrace();}});}

在ConsumerManager類中,添加掃描線程,不停掃描阻塞令牌隊列,查看是否有新的消息到來,需要消費者及時消費:

//先獲取到令牌,根據令牌找到指定的隊列,從隊列中獲取消息進行消費public ConsumerManager(VirtualHost parent){virtualHost = parent;//為推的模式.不斷的掃描令牌隊列,一但有消息進入隊列,就將其推送給消費者Thread t = new Thread(()->{while(true){try {//1.獲取令牌String queueName = tokenQueue.take();//2.根據令牌,找到指定的隊列MSGQueue queue = virtualHost.getMemoryDataCenter().getQueue(queueName);if(queue==null){throw new MqException("[ConsumerManager] 獲取令牌時,發現隊列不存在");}synchronized (queue){//3.從隊列中獲取一個消息并進行消費consumerMessage(queue);}} catch (InterruptedException | MqException e) {throw new RuntimeException(e);}}});//將線程設為后臺線程//當前臺線程執行結束了,后臺線程也就結束了,//若設為前臺線程,那么只有當前臺線程執行完了,整個進程才會結束,// 這里的循環是while(true)會一直卡著執行結束不了,因此要設成后臺線程t.setDaemon(true);//啟動線程t.start();}

十三.網絡通信協議設計

生產者和消費者都是客戶端,需要通過網絡和消息隊列服務器進行通信.

此處我們使?TCP協議,來作為通信的底層協議.同時在這個基礎上?定義應?層協議,完成客?端對服 務器這邊功能的遠程調?.

設計應用層協議:

使??進制的?式設定協議.

請求數據格式:

響應數據格式:

其中 type 表?請求響應不同的功能. 取值如下:

? 0x1 創建 channel

? 0x2 關閉 channel

? 0x3 創建 exchange

? 0x4 銷毀 exchange

? 0x5 創建 queue

? 0x6 銷毀 queue

? 0x7 創建 binding

? 0x8 銷毀 binding

? 0x9 發送 message

? 0xa 訂閱 message

? 0xb 返回 ack

? 0xc 服務器給客?端推送的消息. (被訂閱的消息) 響應獨有的

對于請求來說,payload是各種請求方法的參數信息

對響應來說,payload是方法的返回數據信息.

創建request類:


/*** 表示一個網絡通信中的請求對象*/
@Data
public class Request {/** type 表?請求響應不同的功能. 取值如下*  0x1  創建 channel* ? 0x2  關閉 channel* ? 0x3  創建 exchange* ? 0x4  銷毀 exchange* ? 0x5  創建 queue* ? 0x6  銷毀 queue* ? 0x7  創建 binding* ? 0x8  銷毀 binding* ? 0x9  發送 message* ? 0xa  訂閱 message* ? 0xb  返回 ack* ? 0xc  服務器給客?端推送的消息. (被訂閱的消息) 響應獨有的*///請求類型,設定占4字節private int type;//請求的數據長度,占4字節private int length;//請求體 payload 表?這次?法調?的各種參數信息private byte[] payload;
}

創建response類:

/*** 表示一個響應對象*/
@Data
public class Response {//按照自己的定義,響應類型,4字節private int type;//響應的數據長度,4字節private int length;//響應體private byte[] payload;
}

創建參數父類:

//定義參數?類
//構造?個類表??法的參數, 作為 Request 的 payload.
//不同的?法中, 參數形態各異, 但是有些信息是通?的, 使??個?類表?出來. 具體每個?法的參數再
//通過繼承的?式體現
@Data
public class BasicArgs implements Serializable {//表示一次請求的身份標識,用來和該請求 對應的返回的響應相對照protected String rid;//每一次請求需要建立連接,通過TCP建立連接,一個連接可以發送多次消息,每條消息通過信道傳送//一條信道可以發送多條消息//這次通信的信道channel的身份標識protected String channelId;
}

創建響應父類:

/*** 定義payload的返回數據*/
@Data
public class BasicReturns implements Serializable {//一次請求或相應的身份標識protected String rid;//標識一個channelprotected String channelId;//表示方法的執行結果  payload 表?這次?法調?的返回值.protected boolean ok;
}

創建設備功能的參數類:

exchangeDeclareArgs:

/*** 這個類表示調用聲明交換機方法的參數*/
@Data
public class ExchangeDeclareArgs extends BasicArgs implements Serializable {private String exchangeName;private ExchangeType type;private boolean isDurable;private boolean autoDelete;private Map<String,Object> args;
}

exchangeDeleteArgs:

@Data
public class ExchangeDeleteArgs extends BasicArgs implements Serializable {private String exchangeName;
}

queueDeclareArgs:

@Data
public class QueueDeclareArgs extends BasicArgs implements Serializable {private String queueName;private boolean isDurable;private boolean autoDelete;private Map<String,Object> args;
}

queueDeleteArgs:

@Data
public class QueueDeleteArgs extends BasicArgs implements Serializable {private String queueName;
}

bindingDeclareArgs:

@Data
public class BindingDeclareArgs extends BasicArgs implements Serializable {private String ExchangeName;private String queueName;private String bindingKey;
}

bindingDeleteArgs:

@Data
public class BindingDeleteArgs extends BasicArgs implements Serializable {private String exchangeName;private String queueName;
}

basicPublishArgs:

@Data
public class BasicPublishArgs extends BasicArgs implements Serializable {private String exchangeName;private String routingKey;private BasicProperties basicProperties;private byte[] body;
}

basicConsumerArgs:

@Data
public class BasicConsumerArgs extends BasicArgs implements Serializable {private String consumerTag;private String queueName;private boolean autoAck;//這個類對應的BasicConsumer方法還有一個參數 consumer,是一個回到參數//消費者客戶端收到服務器發送的消息后,針對自己的業務,實現這個回調接口就行了,//無需再將回調參數傳給服務器,因此解救不需要在這里寫這個參數了//并且,這個 回調參數也無法通過網絡傳輸給服務器
}

basicAckArgs:

/*** 手動響應數據*/
@Data
public class BasicAckArgs extends BasicArgs implements Serializable {private String queueName;private String messageId;
}

subscribeReturns:

/*** 這里類表示返回數據的具體參數* 是服務器給消費者提供的訂閱消息* consumerTag其實是channelId.* basicProperties和body共同構成了Message.*/
@Data
public class SubScribeReturns extends BasicReturns implements Serializable {private String consumerTag;private BasicProperties basicProperties;private byte[] body;
}

十四.實現BrokerServer

public class BrokerServer {//調用相關數據private VirtualHost virtualHost = new VirtualHost("default");//服務器??的 socketprivate ServerSocket serverSocket = null;//引入線程池,處理多個客戶端的請求private ExecutorService executorService = null;//引入一個哈希表,存儲所有的會話對象//key: channelId, val:socket對象private ConcurrentHashMap<String, Socket> sessions = new ConcurrentHashMap<>();//引入一個布爾變量,表示當前服務器是否要停止,//要對所有線程是立即可見的,用volatile修飾private volatile boolean runnable = true;public BrokerServer(int port) throws IOException {serverSocket = new ServerSocket(9090);}//啟動服務public void start() throws IOException {System.out.println("[BrokerServer] 啟動服務");executorService = Executors.newCachedThreadPool();try {while (runnable) {//accept:不斷接收客戶端發來的請求:Socket clientSocket = serverSocket.accept();executorService.submit(() -> {processConnection(clientSocket);});}} catch (SocketException e) {//正常結束System.out.println("[BrokerServer] 服務器停止運行!");}}//停止服務器public void stop() throws IOException {runnable = false;executorService.shutdown();serverSocket.close();}//處理一個客戶端的連接//一個個連接可能有多次的請求和相應//要讀取數據,處理數據,然后將結果返回給客戶端private void processConnection(Socket clientSocket) {try (InputStream inputStream = clientSocket.getInputStream();OutputStream outputStream = clientSocket.getOutputStream()) {//  這里需要按照特定格式進行讀取和解析數據try (DataInputStream dataInputStream = new DataInputStream(inputStream);DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {while (true) {//1.讀取請求Request request = readRequest(dataInputStream);//2.根據請求計算相應Response response = process(request, clientSocket);//3.將結果返回給客戶端writeResponse(dataOutputStream,response);}} catch (EOFException | SocketException e) {//當出現這兩種異常時,是正常的異常,是請求讀取結束了,讀到了空字符串拋出的異常,// 正常結束循環就可以了System.out.println("[BrokerServer] connection 連接關閉 ,客戶端地址: " + clientSocket.getInetAddress().toString()+ " : " + clientSocket.getPort());} catch (ClassNotFoundException e) {throw new RuntimeException(e);} catch (MqException e) {throw new RuntimeException(e);}} catch (IOException e) {throw new RuntimeException(e);} finally {//關閉資源try {//當前連接處理完之后,需要關閉SocketclientSocket.close();//把當前socket對應的所有channel也刪除了clearCloseSessions(clientSocket);} catch (IOException e) {e.printStackTrace();}//刪除sessions中客戶端和服務器建立的連接}}private Request readRequest(DataInputStream dataInputStream) throws IOException {Request request = new Request();request.setType(dataInputStream.readInt());request.setLength(dataInputStream.readInt());byte[] payload = new byte[request.getLength()];int n = dataInputStream.read(payload);if (n != request.getLength()) {throw new IOException("讀取請求格式出錯!");}request.setPayload(payload);return request;}private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {dataOutputStream.writeInt(response.getType());dataOutputStream.writeInt(response.getLength());dataOutputStream.write(response.getPayload());// 這個刷新緩沖區也是重要的操作!!dataOutputStream.flush();}private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {// 1. 把 request 中的 payload 做一個初步的解析.BasicArgs BasicArgs = (BasicArgs) BinaryTool.fromBytes(request.getPayload());System.out.println("[Request] rid=" + BasicArgs.getRid() + ", channelId=" + BasicArgs.getChannelId()+ ", type=" + request.getType() + ", length=" + request.getLength());// 2. 根據 type 的值, 來進一步區分接下來這次請求要干啥.boolean ok = true;if (request.getType() == 0x1) {// 創建 channelsessions.put(BasicArgs.getChannelId(), clientSocket);System.out.println("[BrokerServer] 創建 channel 完成! channelId=" + BasicArgs.getChannelId());} else if (request.getType() == 0x2) {// 銷毀 channelsessions.remove(BasicArgs.getChannelId());System.out.println("[BrokerServer] 銷毀 channel 完成! channelId=" + BasicArgs.getChannelId());} else if (request.getType() == 0x3) {// 創建交換機. 此時 payload 就是 ExchangeDeclareArgs 對象了.ExchangeDeclareArgs Args = (ExchangeDeclareArgs) BasicArgs;ok = virtualHost.exchangeDeclare(Args.getExchangeName(), Args.getType(),Args.isDurable(), Args.isAutoDelete(), Args.getArgs());} else if (request.getType() == 0x4) {ExchangeDeleteArgs Args = (ExchangeDeleteArgs) BasicArgs;ok = virtualHost.exchangeDelete(Args.getExchangeName());} else if (request.getType() == 0x5) {QueueDeclareArgs Args = (QueueDeclareArgs) BasicArgs;ok = virtualHost.queueDeclare(Args.getQueueName(), Args.isDurable(), Args.isAutoDelete(), Args.getArgs());} else if (request.getType() == 0x6) {QueueDeleteArgs Args = (QueueDeleteArgs) BasicArgs;ok = virtualHost.queueDelete((Args.getQueueName()));} else if (request.getType() == 0x7) {BindingDeclareArgs Args = (BindingDeclareArgs) BasicArgs;ok = virtualHost.bindingDeclare(Args.getQueueName(), Args.getExchangeName(), Args.getBindingKey());} else if (request.getType() == 0x8) {BindingDeleteArgs Args = (BindingDeleteArgs) BasicArgs;ok = virtualHost.bindingDelete(Args.getQueueName(), Args.getExchangeName());} else if (request.getType() == 0x9) {BasicPublishArgs Args = (BasicPublishArgs) BasicArgs;ok = virtualHost.basicPublish(Args.getExchangeName(), Args.getRoutingKey(),Args.getBasicProperties(), Args.getBody());} else if (request.getType() == 0xa) {BasicConsumerArgs Args = (BasicConsumerArgs) BasicArgs;ok = virtualHost.basicConsume(Args.getConsumerTag(), Args.getQueueName(), Args.isAutoAck(),new Consumer() {//這個回調函數要做的工作, 就是把服務器收到的消息可以直接推送回對應的消費者客戶端//此處 consumerTag 其實是 channelId. 根據 channelId 去 sessions 中查詢,//  就可以得到對應的socket 對象了, 從而可以往里面發送數據了@Overridepublic void handlerDeliver(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {// 先知道當前這個收到的消息, 要發給哪個客戶端.// 此處 consumerTag 其實是 channelId. 根據 channelId 去 sessions 中查詢, 就可以得到對應的// socket 對象了, 從而可以往里面發送數據了// 1. 根據 channelId 找到 socket 對象Socket clientSocket = sessions.get(consumerTag);if (clientSocket == null || clientSocket.isClosed()) {throw new MqException("[BrokerServer] 訂閱消息的客戶端已經關閉!");}// 2. 構造響應數據SubScribeReturns subScribeReturns = new SubScribeReturns();subScribeReturns.setChannelId(consumerTag);subScribeReturns.setRid(""); // 由于這里只有響應, 沒有請求, 不需要去對應. rid 暫時不需要.subScribeReturns.setOk(true);subScribeReturns.setConsumerTag(consumerTag);subScribeReturns.setBasicProperties(basicProperties);subScribeReturns.setBody(body);byte[] payload = BinaryTool.toByte(subScribeReturns);Response response = new Response();// 0xc 表示服務器給消費者客戶端推送的消息數據.response.setType(0xc);// response 的 payload 就是一個 SubScribeReturnsresponse.setLength(payload.length);response.setPayload(payload);// 3. 把數據寫回給客戶端.//    注意! 此處的 dataOutputStream 這個對象不能 close !!!//    如果 把 dataOutputStream 關閉, 就會直接把 clientSocket 里的 outputStream 也關了.//    此時就無法繼續往 socket 中寫入后續數據了.DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream());writeResponse(dataOutputStream, response);}});} else if (request.getType() == 0xb) {// 調用 basicAck 確認消息.BasicAckArgs Args = (BasicAckArgs) BasicArgs;ok = virtualHost.basicAck(Args.getQueueName(), Args.getMessageId());} else {// 當前的 type 是非法的.throw new MqException("[BrokerServer] 未知的 type! type=" + request.getType());}// 3. 構造響應BasicReturns basicReturns = new BasicReturns();basicReturns.setChannelId(BasicArgs.getChannelId());basicReturns.setRid(BasicArgs.getRid());basicReturns.setOk(ok);byte[] payload = BinaryTool.toByte(basicReturns);Response response = new Response();response.setType(request.getType());response.setLength(payload.length);response.setPayload(payload);System.out.println("[Response] rid=" + basicReturns.getRid() + ", channelId=" + basicReturns.getChannelId()+ ", type=" + response.getType() + ", length=" + response.getLength());return response;}private void clearCloseSessions(Socket clientSocket) {// 這里要做的事情, 主要就是遍歷上述 sessions hash 表, 把該被關閉的 socket 對應的鍵值對, 統統刪掉.List<String> toDeleteChannelId = new ArrayList<>();for (Map.Entry<String, Socket> entry : sessions.entrySet()) {if (entry.getValue() == clientSocket) {// 不能在這里直接刪除!!!// 這屬于使用集合類的一個大忌!!! 一邊遍歷, 一邊刪除!!!// sessions.remove(entry.getKey());toDeleteChannelId.add(entry.getKey());}}for (String channelId : toDeleteChannelId) {sessions.remove(channelId);}System.out.println("[BrokerServer] 清理 session 完成! 被清理的 channelId=" + toDeleteChannelId);}}

十五.實現客戶端

創建ConnectionFactory.

表示用來創建連接的工廠類:

/***連接工廠*/
@Data
public class ConnectionFactory {// broker server 的 ip 地址private String host;// broker server 的端口號private int port;public Connection newConnection() throws IOException {Connection connection = new Connection(host, port);return connection;}
}

創建Connection類:

一個Connection對應一個TCP,一個連接可以包含多個channel.

public class Connection {private Socket socket = null;private InputStream inputStream;private OutputStream outputStream;private DataInputStream dataInputStream;private DataOutputStream dataOutputStream;//創建線程池,用來處理客戶端這邊執行用戶回調的線程池private ExecutorService callbackPool = null;//  創建一個hash.來管理多個channelConcurrentHashMap<String,Channel> channelMap = new ConcurrentHashMap<>();//這個方法在客戶端構造好請求后,調用,用來發送請求到服務器:public void writeRequest(Request request) throws IOException {dataOutputStream.writeInt(request.getType());dataOutputStream.writeInt(request.getLength());dataOutputStream.write(request.getPayload());dataOutputStream.flush();System.out.println("[Connection] 發送請求! type=" + request.getType() + ", length=" + request.getLength());}
// 和服務器建立連接,接收服務器返回的響應,并處理響應public Connection(String host,int port) throws IOException {socket = new Socket(host,port);inputStream = socket.getInputStream();outputStream = socket.getOutputStream();dataInputStream = new DataInputStream(inputStream);dataOutputStream = new DataOutputStream(outputStream);callbackPool = Executors.newFixedThreadPool(4);
//      創建一個掃描線程,不斷的從socket中讀取響應,交給對應的channel進行處理Thread t = new Thread(()->{try{while (!socket.isClosed()){Response response = readResponse();//處理響應dispatchResponse(response);}} catch (SocketException e){//連接正常斷開System.out.println("[Connection] 連接正常斷開");}catch (IOException | ClassNotFoundException | MqException e) {System.out.println("[Connection] 連接異常斷開");e.printStackTrace();}});t.start();}public void close(){try{//關閉Connection ,釋放資源callbackPool.shutdownNow();channelMap.clear();outputStream.close();inputStream.close();socket.close();;}catch (IOException e){e.printStackTrace();}}// 讀取服務器返回的響應public Response readResponse() throws IOException {Response response = new Response();response.setType(dataInputStream.readInt());response.setLength(dataInputStream.readInt());byte[] payload = new byte[response.getLength()];int n = dataInputStream.read(payload);if (n != response.getLength()) {throw new IOException("讀取的響應數據不完整!");}response.setPayload(payload);System.out.println("[Connection] 收到響應! type=" + response.getType() + ", length=" + response.getLength());return response;}// 使用這個方法來分別處理響應, 當前的響應是一個針對控制請求的響應, 還是服務器推送的消息.private void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {if (response.getType() == 0xc) {// 服務器推送給消費者客戶端的消息數據SubScribeReturns subScribeReturns = (SubScribeReturns) BinaryTool.fromBytes(response.getPayload());// 根據 channelId 找到對應的 channel 對象Channel channel = channelMap.get(subScribeReturns.getChannelId());if (channel == null) {throw new MqException("[Connection] 該消息對應的 channel 在客戶端中不存在! channelId=" + channel.getChannelId());}// 執行該 channel 對象內部的回調.callbackPool.submit(() -> {try {channel.getConsumer().handlerDeliver(subScribeReturns.getConsumerTag(), subScribeReturns.getBasicProperties(),subScribeReturns.getBody());} catch (MqException | IOException e) {e.printStackTrace();}});} else {// 當前響應是針對剛才的控制請求的響應BasicReturns basicReturns = (BasicReturns) BinaryTool.fromBytes(response.getPayload());// 把這個結果放到對應的 channel 的 hash 表中.Channel channel = channelMap.get(basicReturns.getChannelId());if (channel == null) {throw new MqException("[Connection] 該消息對應的 channel 在客戶端中不存在! channelId=" + channel.getChannelId());}//獲取到響應后,將其放到響應的集合中,讓客戶端從集合中取走對應的響應.channel.putReturns(basicReturns);}}
// 通過這個方法, 在 Connection 中能夠創建出一個 Channelpublic Channel createChannel() throws IOException {String channelId = "C-" + UUID.randomUUID().toString();Channel channel = new Channel(channelId, this);// 把這個 channel 對象放到 Connection 管理 channel 的 哈希表 中.channelMap.put(channelId, channel);// 同時也需要把 "創建 channel" 的這個消息也告訴服務器.boolean ok = channel.createChannel();if (!ok) {// 服務器這里創建失敗了!! 整個這次創建 channel 操作不順利!!// 把剛才已經加入 hash 表的鍵值對, 再刪了.channelMap.remove(channelId);return null;}return channel;}
}

創建Channel類:

用于客戶端發送請求調用的相關的API:

@Data
public class Channel {private String channelId;// 當前這個 channel 屬于哪個連接.private Connection connection;// 用來存儲后續客戶端收到的服務器的響應.private ConcurrentHashMap<String, BasicReturns> basicReturnsMap = new ConcurrentHashMap<>();// 如果當前 Channel 訂閱了某個隊列, 就需要在此處記錄下對應回調是啥. 當該隊列的消息返回回來的時候, 調用回調.// 此處約定一個 Channel 中只能有一個回調.private Consumer consumer = null;public Channel(String channelId, Connection connection) {this.channelId = channelId;this.connection = connection;}/**   type 表?請求響應不同的功能. 取值如下*  0x1  創建 channel* ? 0x2  關閉 channel* ? 0x3  創建 exchange* ? 0x4  銷毀 exchange* ? 0x5  創建 queue* ? 0x6  銷毀 queue* ? 0x7  創建 binding* ? 0x8  銷毀 binding* ? 0x9  發送 message* ? 0xa  訂閱 message* ? 0xb  返回 ack* ? 0xc  服務器給客?端推送的消息. (被訂閱的消息) 響應獨有的*/// 在這個方法中, 和服務器進行交互, 告知服務器, 此處客戶端創建了新的 channel 了.public boolean createChannel() throws IOException {// 對于創建 Channel 操作來說, payload 就是一個 basicArgs 對象BasicArgs basicArgs = new BasicArgs();basicArgs.setChannelId(channelId);basicArgs.setRid(generateRid());byte[] payload = BinaryTool.toByte(basicArgs);Request request = new Request();request.setType(0x1);request.setLength(payload.length);request.setPayload(payload);// 構造出完整請求之后, 就可以發送這個請求了.connection.writeRequest(request);// 等待服務器的響應//服務器對根據請求處理并返回響應,對請求的處理時間不確定,// 該步驟可能會發生阻塞BasicReturns basicReturns = waitResult(basicArgs.getRid());return basicReturns.isOk();}// 通過UUID,生成唯一ridprivate String generateRid() {return "R-" + UUID.randomUUID().toString();}private BasicReturns waitResult(String rid) {BasicReturns basicReturns = null;while ((basicReturns = basicReturnsMap.get(rid)) == null) {// 如果查詢結果為 null, 說明包裹還沒回來.// 此時就需要阻塞等待.synchronized (this) {try {wait();} catch (InterruptedException e) {e.printStackTrace();}}}// 讀取成功之后, 還需要把這個消息從哈希表中刪除掉.basicReturnsMap.remove(rid);return basicReturns;}public void putReturns(BasicReturns basicReturns) {basicReturnsMap.put(basicReturns.getRid(), basicReturns);synchronized (this) {// 當前也不知道有多少個線程在等待上述的這個響應.// 把所有的等待的線程都喚醒.notifyAll();}}// 關閉 channel, 給服務器發送一個 type = 0x2 的請求public boolean close() throws IOException {BasicArgs basicArgs = new BasicArgs();basicArgs.setRid(generateRid());basicArgs.setChannelId(channelId);byte[] payload = BinaryTool.toByte(basicArgs);Request request = new Request();request.setType(0x2);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(basicArgs.getRid());return basicReturns.isOk();}// 創建交換機public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete, Map<String, Object> Args) throws IOException {ExchangeDeclareArgs exchangeDeclareArgs = new ExchangeDeclareArgs();exchangeDeclareArgs.setRid(generateRid());exchangeDeclareArgs.setChannelId(channelId);exchangeDeclareArgs.setExchangeName(exchangeName);exchangeDeclareArgs.setType(exchangeType);exchangeDeclareArgs.setDurable(durable);exchangeDeclareArgs.setAutoDelete(autoDelete);exchangeDeclareArgs.setArgs(Args);byte[] payload = BinaryTool.toByte(exchangeDeclareArgs);Request request = new Request();request.setType(0x3);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(exchangeDeclareArgs.getRid());return basicReturns.isOk();}// 刪除交換機public boolean exchangeDelete(String exchangeName) throws IOException {ExchangeDeleteArgs Args = new ExchangeDeleteArgs();Args.setRid(generateRid());Args.setChannelId(channelId);Args.setExchangeName(exchangeName);byte[] payload = BinaryTool.toByte(Args);Request request = new Request();request.setType(0x4);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(Args.getRid());return basicReturns.isOk();}// 創建隊列public boolean queueDeclare(String queueName, boolean durable, boolean autoDelete,Map<String, Object> Args) throws IOException {QueueDeclareArgs queueDeclareArgs = new QueueDeclareArgs();queueDeclareArgs.setRid(generateRid());queueDeclareArgs.setChannelId(channelId);queueDeclareArgs.setQueueName(queueName);queueDeclareArgs.setDurable(durable);queueDeclareArgs.setAutoDelete(autoDelete);queueDeclareArgs.setArgs(Args);byte[] payload = BinaryTool.toByte(queueDeclareArgs);Request request = new Request();request.setType(0x5);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(queueDeclareArgs.getRid());return basicReturns.isOk();}// 刪除隊列public boolean queueDelete(String queueName) throws IOException {QueueDeleteArgs Args = new QueueDeleteArgs();Args.setRid(generateRid());Args.setChannelId(channelId);Args.setQueueName(queueName);byte[] payload = BinaryTool.toByte(Args);Request request = new Request();request.setType(0x6);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(Args.getRid());return basicReturns.isOk();}// 創建綁定public boolean queueBind(String queueName, String exchangeName, String bindingKey) throws IOException {BindingDeclareArgs Args = new BindingDeclareArgs();Args.setRid(generateRid());Args.setChannelId(channelId);Args.setQueueName(queueName);Args.setExchangeName(exchangeName);Args.setBindingKey(bindingKey);byte[] payload = BinaryTool.toByte(Args);Request request = new Request();request.setType(0x7);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(Args.getRid());return basicReturns.isOk();}// 解除綁定public boolean queueUnbind(String queueName, String exchangeName) throws IOException {BindingDeleteArgs Args = new BindingDeleteArgs();Args.setRid(generateRid());Args.setChannelId(channelId);Args.setQueueName(queueName);Args.setExchangeName(exchangeName);byte[] payload = BinaryTool.toByte(Args);Request request = new Request();request.setType(0x8);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(Args.getRid());return basicReturns.isOk();}// 發送消息public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) throws IOException {BasicPublishArgs Args = new BasicPublishArgs();Args.setRid(generateRid());Args.setChannelId(channelId);Args.setExchangeName(exchangeName);Args.setRoutingKey(routingKey);Args.setBasicProperties(basicProperties);Args.setBody(body);byte[] payload = BinaryTool.toByte(Args);Request request = new Request();request.setType(0x9);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(Args.getRid());return basicReturns.isOk();}// 訂閱消息public boolean basicConsume(String queueName, boolean autoAck, Consumer consumer) throws MqException, IOException {// 先設置回調.if (this.consumer != null) {throw new MqException("該 channel 已經設置過消費消息的回調了, 不能重復設置!");}this.consumer = consumer;BasicConsumerArgs Args = new BasicConsumerArgs();Args.setRid(generateRid());Args.setChannelId(channelId);Args.setConsumerTag(channelId);  // 此處 consumerTag 也使用 channelId 來表示了.Args.setQueueName(queueName);Args.setAutoAck(autoAck);byte[] payload = BinaryTool.toByte(Args);Request request = new Request();request.setType(0xa);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(Args.getRid());return basicReturns.isOk();}// 確認消息public boolean basicAck(String queueName, String messageId) throws IOException {BasicAckArgs Args = new BasicAckArgs();Args.setRid(generateRid());Args.setChannelId(channelId);Args.setQueueName(queueName);Args.setMessageId(messageId);byte[] payload = BinaryTool.toByte(Args);Request request = new Request();request.setType(0xb);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(Args.getRid());return basicReturns.isOk();}
}

客戶端代碼測試:


@SpringBootTest
public class MqClientTest {private BrokerServer brokerServer = null;private ConnectionFactory factory = null;private Thread t = null;@BeforeEachpublic void setUp() throws IOException {// 1. 先啟動服務器Mq02Application.context = SpringApplication.run(Mq02Application.class);brokerServer = new BrokerServer(9090);t = new Thread(() -> {// 這個 start 方法會進入一個死循環. 使用一個新的線程來運行 start 即可!try {brokerServer.start();} catch (IOException e) {e.printStackTrace();}});t.start();// 2. 配置 ConnectionFactoryfactory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);}@AfterEachpublic void tearDown() throws IOException {// 停止服務器brokerServer.stop();// t.join();Mq02Application.context.close();// 刪除必要的文件File file = new File("./data");FileUtils.deleteDirectory(file);factory = null;}@Testpublic void testConnection() throws IOException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);}@Testpublic void testChannel() throws IOException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);}@Testpublic void testExchange() throws IOException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);Assertions.assertTrue(ok);ok = channel.exchangeDelete("testExchange");Assertions.assertTrue(ok);// 此處穩妥起見, 把改關閉的要進行關閉.channel.close();connection.close();}@Testpublic void testQueue() throws IOException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);boolean ok = channel.queueDeclare("testQueue", true, false,  null);Assertions.assertTrue(ok);ok = channel.queueDelete("testQueue");Assertions.assertTrue(ok);channel.close();connection.close();}@Testpublic void testBinding() throws IOException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);Assertions.assertTrue(ok);ok = channel.queueDeclare("testQueue", true,  false, null);Assertions.assertTrue(ok);ok = channel.queueBind("testQueue", "testExchange", "testBindingKey");Assertions.assertTrue(ok);ok = channel.queueUnbind("testQueue", "testExchange");Assertions.assertTrue(ok);channel.close();connection.close();}@Testpublic void testMessage() throws IOException, MqException, InterruptedException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);Assertions.assertTrue(ok);ok = channel.queueDeclare("testQueue", true, false, null);Assertions.assertTrue(ok);byte[] requestBody = "hello".getBytes();ok = channel.basicPublish("testExchange", "testQueue", null, requestBody);Assertions.assertTrue(ok);ok = channel.basicConsume("testQueue", true, new Consumer() {@Overridepublic void handlerDeliver(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[消費數據] 開始!");System.out.println("consumerTag=" + consumerTag);System.out.println("basicProperties=" + basicProperties);Assertions.assertArrayEquals(requestBody, body);System.out.println("[消費數據] 結束!");}});Assertions.assertTrue(ok);Thread.sleep(500);channel.close();connection.close();}
}

完成

成果測試:

啟動消息隊列服務器:

//啟動服務器:BrokerServer brokerServer = new BrokerServer(9090);brokerServer.start();

創建生產者 發送消息:

/*** 模擬生產者*/
public class producer {public static void main(String[] args) throws IOException, InterruptedException {ConnectionFactory factory = new ConnectionFactory();System.out.println("啟動生產者");factory.setHost("127.0.0.1");factory.setPort(9090);//創建連接Connection connection = factory.newConnection();//創建channelChannel channel = connection.createChannel();//創建交換機 隊列 綁定channel.exchangeDeclare("exchange", ExchangeType.DIRECT,true,false,null);channel.queueDeclare("queue",true,false,null);//發送消息boolean ok = channel.basicPublish("exchange", "queue",null,"hello".getBytes());System.out.println("消息發送成功: ok:"+ok);Thread.sleep(1000);//關閉資源channel.close();connection.createChannel();}
}

創建消費者消費消息:

/*** 模擬消費者*/
public class consumer {public static void main(String[] args) throws IOException, MqException, InterruptedException {ConnectionFactory factory = new ConnectionFactory();System.out.println("消費者啟動");factory.setHost("127.0.0.1");factory.setPort(9090);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("exchange", ExchangeType.DIRECT,true,false,null);channel.queueDeclare("queue",true,false,null);//接收消息boolean ok = channel.basicConsume("queue", true, new org.rabbitmq.mq02.common.Consumer() {@Overridepublic void handlerDeliver(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("處理消息開始");System.out.println("consumerTag:"+consumerTag);System.out.println("basicProperties:"+basicProperties);System.out.println("body:"+body.toString());System.out.println("處理消息結束");}});System.out.println("消費一條消息成功 ok:"+ok);// 由于消費者也不知道生產者要生產多少, 就在這里通過這個循環模擬一直等待消費.while (true) {Thread.sleep(500);}}
}

完結.

項目源碼:

Admin/模擬實現消息隊列 - Gitee.com

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/921273.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/921273.shtml
英文地址,請注明出處:http://en.pswp.cn/news/921273.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Unity Standard Shader 解析(五)之ShadowCaster

一、ShadowCaster // ------------------------------------------------------------------// Shadow rendering passPass {Name "ShadowCaster"Tags { "LightMode" "ShadowCaster" }ZWrite On ZTest LEqualCGPROGRAM#pragma target 3.0// --…

[MRCTF2020]Ez_bypass

BUUCTF在線評測BUUCTF 是一個 CTF 競賽和訓練平臺&#xff0c;為各位 CTF 選手提供真實賽題在線復現等服務。https://buuoj.cn/challenges#[MRCTF2020]Ez_bypass啟動靶機 有提示F12&#xff0c;那查看一下源碼。和頁面顯示的代碼一樣的&#xff0c;就是格式更規范而已 include…

C/C++關鍵字——union

1.介紹union是一種特殊的數據類型&#xff0c;它允許你在同一塊內存區域中存儲不同的數據類型。它的主要目的是節省內存&#xff0c;尤其是在處理多種可能的數據類型&#xff0c;但一次只使用其中一種的場景。2.特點與 struct&#xff08;結構體&#xff09;不同&#xff0c;結…

2024 arXiv Cost-Efficient Prompt Engineering for Unsupervised Entity Resolution

論文基本信息 題目&#xff1a; Cost-Efficient Prompt Engineering for Unsupervised Entity Resolution 作者&#xff1a; Navapat Nananukul, Khanin Sisaengsuwanchai, Mayank Kejriwal 機構&#xff1a; University of Southern California, Information Sciences Institu…

【XR技術概念科普】什么是注視點渲染(Foveated Rendering)?為什么Vision Pro離不開它?

一、前言2023 年&#xff0c;蘋果推出了 Vision Pro 頭顯&#xff0c;把“空間計算”概念推向大眾。與以往的 XR 設備不同&#xff0c;Vision Pro 強調高分辨率、真實感與沉浸感。然而&#xff0c;這種體驗背后隱藏著一個巨大的技術挑戰&#xff1a;如何在有限的計算與能耗條件…

Qt 系統相關 - 1

雖然 Qt 是跨平臺的 C 開發框架&#xff0c;Qt 有很多能力其實是操作系統提供的&#xff0c;只不過 Qt 封裝了系統的 API程序時運行在操作系統上的&#xff0c;需要系統給我們提供支撐&#xff01;事件文件操作多線程編程網絡編程多媒體&#xff08;音頻&#xff0c;視頻&#…

“12306”有多牛逼?從架構師的角度詳細的告訴你

12306鐵路票務系統架構深度解析 &#x1f4da; 目錄 系統概述業務特點與技術挑戰整體架構設計核心技術架構高并發處理策略數據存儲與管理緩存體系設計分布式系統架構安全防護體系性能優化策略監控與運維技術演進歷程總結與展望 每到春節、國慶這種全民遷徙的時刻&#xff0c;…

數據采集機器人哪家好?2025 年實測推薦:千里聆 RPA 憑什么成企業首選?

在數字化轉型加速的今天&#xff0c;數據采集已成為企業運營的核心環節&#xff0c;數據采集機器人正在重構企業的效率邊界。2025 年中國 RPA 市場排名顯示&#xff0c;泛微旗下的千里聆 RPA 已躋身行業前五&#xff0c;成為中大型國央企的首選品牌。本文將通過三維評估體系&am…

基礎crud項目(前端部分+總結)

本人根據自己對前端微不足道的理解和 AI 老師的指導下&#xff0c;艱難地完成了基礎crud代碼的全棧開發&#xff0c;算是自己的第一個 Java 項目&#xff0c;對此做個簡單總結。 后端部分 在前后端分離開發中&#xff0c;前端負責頁面交互與數據展示&#xff0c;后端提供接口支…

MATLAB矩陣及其運算(二)函數

函數分為MATLAB內置函數及用戶自定義函數&#xff0c;用戶可以直接調用內置函數進行數據處理。內置函數的使用函數由三部分組成&#xff1a;名稱、輸入和輸出。內置函數示例&#xff1a;單輸入單輸出函數&#xff1a;sqrt(x)&#xff1b;單輸入多輸出函數&#xff1a;size(x)&a…

自動化運維-ansible中對于大項目的管理

自動化運維-ansible中對于大項目的管理 一、引用主機清單 在Playbook中引用主機時&#xff0c;hosts 字段指定的目標必須與Ansible主機清單中定義的標識符完全匹配。如果清單中配置的是主機名&#xff0c;則在Playbook中使用IP地址或其他別名將無法匹配&#xff0c;導致任務被跳…

59_基于深度學習的麥穗計數統計系統(yolo11、yolov8、yolov5+UI界面+Python項目源碼+模型+標注好的數據集)

目錄 項目介紹&#x1f3af; 功能展示&#x1f31f; 一、環境安裝&#x1f386; 環境配置說明&#x1f4d8; 安裝指南說明&#x1f3a5; 環境安裝教學視頻 &#x1f31f; 二、數據集介紹&#x1f31f; 三、系統環境&#xff08;框架/依賴庫&#xff09;說明&#x1f9f1; 系統環…

面試問題詳解十六:Qt 內存管理機制

在 Qt 開發過程中&#xff0c;很多初學者&#xff08;包括不少有經驗的 C 程序員&#xff09;經常會產生這樣的疑問&#xff1a;“我在 Qt 中 new 出來的控件好像都沒有 delete&#xff0c;那內存不會泄漏嗎&#xff1f;”比如下面這段代碼&#xff1a; void Widget::createLef…

Pycharm 試用

Ubuntu 重置Pycharm試用期限&#xff08;30 天&#xff09; 先關閉Pycharm刪除系統緩存 rm -rf ~/.config/JetBrains/ && rm -rf ~/.local/share/JetBrains/ && rm -rf ~/.cache/JetBrains/刪除已經安裝的 Pycharm 軟件運行目錄去官網下載新的 就行了

C++ Qt 開發核心知識

Qt 框架概述Qt 是一個跨平臺的 C 應用程序開發框架&#xff0c;廣泛用于開發圖形用戶界面程序。其核心特性包括跨平臺能力、豐富的功能模塊和強大的工具集。核心概念與機制元對象系統Qt 擴展了標準 C&#xff0c;通過元對象系統提供信號與槽機制、運行時類型信息和動態屬性系統…

net9 aspose.cell 自定義公式AbstractCalculationEngine,帶超鏈接excel轉html后背景色丟失

AbstractCalculationEngine 是 Aspose.Cells 中一個強大的抽象類&#xff0c;允許您自定義公式計算邏輯。當您需要覆蓋默認計算行為或實現自定義函數時非常有用。直接上代碼1. 創建自定義計算引擎using Aspose.Cells; using System;// 創建自定義計算引擎 public class CustomC…

如何監控員工的電腦?7款實用的員工電腦管理軟件,探索高效管理捷徑!

當銷售團隊在淘寶刷單、設計師用公司電腦挖礦、程序員頻繁訪問代碼托管網站時&#xff0c;企業損失的不僅是帶寬——低效、泄密、合規風險正成為隱形利潤殺手。 傳統管理依賴“人盯人”或抽查日志&#xff0c;但面對分布式辦公與遠程協作趨勢&#xff0c;這些方法早已力不從心…

機器視覺軟件--VisionPro、Visual Master,Halcon 和 OpenCV 的學習路線

Halcon 和 OpenCV區別 Halcon 和 OpenCV 都是計算機視覺領域的重要工具&#xff0c;但它們的設計理念、功能側重和適用場景有顯著不同。下面這個表格匯總了它們的核心區別&#xff0c;方便你快速了解&#xff1a; 開發模式與體驗??&#xff1a;Halcon 配備了強大的??圖形化…

算法-根據前序+中序遍歷打印樹的右視圖

題目請根據二叉樹的前序遍歷&#xff0c;中序遍歷恢復二叉樹&#xff0c;并打印出二叉樹的右視圖數據范圍&#xff1a; 0≤n≤100000≤n≤10000 要求&#xff1a; 空間復雜度 O(n)O(n)&#xff0c;時間復雜度 O(n)O(n)如輸入[1,2,4,5,3],[4,2,5,1,3]時&#xff0c;通過前序遍歷…

Kafka面試精講 Day 7:消息序列化與壓縮策略

【Kafka面試精講 Day 7】消息序列化與壓縮策略 在Kafka的高性能消息系統中&#xff0c;消息序列化與壓縮是影響吞吐量、延遲和網絡開銷的核心環節。作為“Kafka面試精講”系列的第7天&#xff0c;本文聚焦于這一關鍵主題&#xff0c;深入剖析其原理、實現方式、配置策略及常見…