使用 JAVA 語言自己動手來寫一個MQ (類似ActiveMQ,RabbitMQ)
主要角色
首先我們必須需要搞明白 MQ (消息隊列) 中的三個基本角色
ProducerBrokerConsumer
整體架構如下所示
自定義協議
首先從上一篇中介紹了協議的相關信息,具體廠商的 MQ(消息隊列) 需要遵循某種協議或者自定義協議 , 消息的 生產者和消費者需要遵循其協議(約定)才能后成功地生產消息和生產消息 ,所以在這里我們自定義一個協議如下.
消息處理中心 : 如果接收到的信息包含"SEND"字符串,即視為生產者發送的消息,消息處理中心需要將此信息存儲等待消費者消費
消息處理中心 : 如果接受到的信息為CONSUME,既視為消費者發送消費請求,需要將存儲的消息隊列頭部的信息轉發給消費者,然后將此消息從隊列中移除
消息處理中心 : 如果消息處理中心存儲的消息滿3條仍然沒有消費者進行消費,則不再接受生產者的生產請求
消息生產者:需要遵循協議將生產的消息頭部增加"SEND:" 表示生產消息
消息消費者:需要遵循協議向消息處理中心發送"CONSUME"字符串表示消費消息
流程順序
項目構建流程
下面將整個MQ的構建流程過一遍
新建一個 Broker 類,內部維護一個 ArrayBlockingQueue 隊列,提供生產消息和消費消息的方法, 僅僅具備存儲服務功能
新建一個 BrokerServer 類,將 Broker 發布為服務到本地9999端口,監聽本地9999端口的 Socket 鏈接,在接受的信息中進行我們的協議校驗, 這里 僅僅具備接受消息,校驗協議,轉發消息功能;
新建一個 MqClient 類,此類提供與本地端口9999的Socket鏈接 , 僅僅具備生產消息和消費消息的方法
測試:新建兩個 MyClient 類對象,分別執行其生產方法和消費方法
具體使用流程
生產消息:客戶端執行生產消息方法,傳入需要生產的信息,該信息需要遵循我們自定義的協議,消息處理中心服務在接受到消息會根據自定義的協議校驗該消息是否合法,如果合法如果合法就會將該消息存儲到Broker內部維護的 ArrayBlockingQueue 隊列中.如果 ArrayBlockingQueue 隊列沒有達到我們協議中的最大長度將將消息添加到隊列中,否則輸出生產消息失敗.
消息消息:客戶端執行消費消息方法, Broker服務 會校驗請求的信息的信息是否等于 CONSUME ,如果驗證成功則從Broker內部維護的 ArrayBlockingQueue 隊列的 Poll 出一個消息返回給客戶端
代碼演示
消息處理中心 Broker
/*** * 消息處理中心* */
public class Broker {// 隊列存儲消息的最大數量private final static int MAX_SIZE = 3;// 保存消息數據的容器private static ArrayBlockingQueue messageQueue = new ArrayBlockingQueue(MAX_SIZE);// 生產消息public static void produce(String msg) {if (messageQueue.offer(msg)) {System.out.println("成功向消息處理中心投遞消息:" + msg + ",當前暫存的消息數量是:" + messageQueue.size());} else {System.out.println("消息處理中心內暫存的消息達到最大負荷,不能繼續放入消息!");}System.out.println("=======================");}// 消費消息public static String consume() { String msg = messageQueue.poll();if(msg !=null) {// 消費條件滿足情況,從消息容器中取出一條消息System.out.println("已經消費消息:"+ msg +",當前暫存的消息數量是:"+ messageQueue.size()); }else{ System.out.println("消息處理中心內沒有消息可供消費!"); } System.out.println("=======================");returnmsg; }
}}
消息處理中心服務 BrokerServer
客戶端 MqClient
/*** * 用于啟動消息處理中心* */
public class BrokerServer implements Runnable {public static int SERVICE_PORT = 9999;private final Socket socket;public BrokerServer(Socket socket) {this.socket = socket;}@Overridepublic void run() {try (BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));PrintWriter out = new PrintWriter(socket.getOutputStream())) {while (true) {String str = in.readLine();if (str == null) {continue;}System.out.println("接收到原始數據:" + str);if (str.equals("CONSUME")) {// CONSUME 表示要消費一條消息//從消息隊列中消費一條消息String message = Broker.consume();out.println(message);out.flush();} else if (str.contains("SEND:")) {// 接受到的請求包含SEND:字符串 表示生產消息放到消息隊列中Broker.produce(str);} else {System.out.println("原始數據:" + str + "沒有遵循協議,不提供相關服務");}}} catch (Exception e) {e.printStackTrace();}}public static void main(String[] args) throws Exception {ServerSocket server = new ServerSocket(SERVICE_PORT);while (true) {BrokerServer brokerServer = new BrokerServer(server.accept());new Thread(brokerServer).start();}}
}
測試MQ
public class ProduceClient {public static void main(String[] args) throws Exception {MqClient client = newMqClient();client.produce("SEND:Hello World");}
}public class ConsumeClient {public static void main(String[] args) throws Exception {MqClient client = newMqClient();String message = client.consume();System.out.println("獲取的消息為:" + message);}
}
我們多執行幾次客戶端的生產方法和消費方法就可以看到一個完整的MQ的通訊過程,下面是我執行了幾次的一些日志
接收到原始數據:SEND:Hello World成功向消息處理中心投遞消息:SEND:Hello World,當前暫存的消息數量是:
1=======================接收到原始數據:SEND:Hello World成功向消息處理中心投遞消息:SEND:Hello World,當前暫存的消息數量是:
2=======================接收到原始數據:SEND:Hello World成功向消息處理中心投遞消息:SEND:Hello World,當前暫存的消息數量是:
3=======================接收到原始數據:SEND:Hello World消息處理中心內暫存的消息達到最大負荷,不能繼續放入消息!
=======================接收到原始數據:Hello World原始數據:Hello World沒有遵循協議,不提供相關服務接收到原始數據:CONSUME已經消費消息:SEND:Hello World,當前暫存的消息數量是:
2=======================接收到原始數據:CONSUME已經消費消息:SEND:Hello World,當前暫存的消息數量是:
1=======================接收到原始數據:CONSUME已經消費消息:SEND:Hello World,當前暫存的消息數量是:
0=======================接收到原始數據:CONSUME消息處理中心內沒有消息可供消費!=======================
小結
本章示例代碼主要源自分布式消息中間件實踐一書 , 這里我們自己使用Java語言寫了一個MQ消息隊列 , 通過這個消息隊列我們對MQ中的幾個角色 “生產者,消費者,消費處理中心,協議” 有了更深的理解 ; 那么下一章節我們就來一塊學習具體廠商的MQ RabbitMQ