Java實現消息隊列服務

使用 JAVA 語言自己動手來寫一個MQ (類似ActiveMQ,RabbitMQ)
在這里插入圖片描述
主要角色

首先我們必須需要搞明白 MQ (消息隊列) 中的三個基本角色

ProducerBrokerConsumer

整體架構如下所示
在這里插入圖片描述
自定義協議

首先從上一篇中介紹了協議的相關信息,具體廠商的 MQ(消息隊列) 需要遵循某種協議或者自定義協議 , 消息的 生產者和消費者需要遵循其協議(約定)才能后成功地生產消息和生產消息 ,所以在這里我們自定義一個協議如下.

消息處理中心 : 如果接收到的信息包含"SEND"字符串,即視為生產者發送的消息,消息處理中心需要將此信息存儲等待消費者消費

消息處理中心 : 如果接受到的信息為CONSUME,既視為消費者發送消費請求,需要將存儲的消息隊列頭部的信息轉發給消費者,然后將此消息從隊列中移除

消息處理中心 : 如果消息處理中心存儲的消息滿3條仍然沒有消費者進行消費,則不再接受生產者的生產請求

消息生產者:需要遵循協議將生產的消息頭部增加"SEND:" 表示生產消息

消息消費者:需要遵循協議向消息處理中心發送"CONSUME"字符串表示消費消息

流程順序

項目構建流程

下面將整個MQ的構建流程過一遍

新建一個 Broker 類,內部維護一個 ArrayBlockingQueue 隊列,提供生產消息和消費消息的方法, 僅僅具備存儲服務功能

新建一個 BrokerServer 類,將 Broker 發布為服務到本地9999端口,監聽本地9999端口的 Socket 鏈接,在接受的信息中進行我們的協議校驗, 這里 僅僅具備接受消息,校驗協議,轉發消息功能;

新建一個 MqClient 類,此類提供與本地端口9999的Socket鏈接 , 僅僅具備生產消息和消費消息的方法

測試:新建兩個 MyClient 類對象,分別執行其生產方法和消費方法

具體使用流程

生產消息:客戶端執行生產消息方法,傳入需要生產的信息,該信息需要遵循我們自定義的協議,消息處理中心服務在接受到消息會根據自定義的協議校驗該消息是否合法,如果合法如果合法就會將該消息存儲到Broker內部維護的 ArrayBlockingQueue 隊列中.如果 ArrayBlockingQueue 隊列沒有達到我們協議中的最大長度將將消息添加到隊列中,否則輸出生產消息失敗.

消息消息:客戶端執行消費消息方法, Broker服務 會校驗請求的信息的信息是否等于 CONSUME ,如果驗證成功則從Broker內部維護的 ArrayBlockingQueue 隊列的 Poll 出一個消息返回給客戶端

代碼演示

消息處理中心 Broker

/*** * 消息處理中心* */
public class Broker {// 隊列存儲消息的最大數量private final static int MAX_SIZE = 3;// 保存消息數據的容器private static ArrayBlockingQueue messageQueue = new ArrayBlockingQueue(MAX_SIZE);// 生產消息public static void produce(String msg) {if (messageQueue.offer(msg)) {System.out.println("成功向消息處理中心投遞消息:" + msg + ",當前暫存的消息數量是:" + messageQueue.size());} else {System.out.println("消息處理中心內暫存的消息達到最大負荷,不能繼續放入消息!");}System.out.println("=======================");}// 消費消息public static String consume() { String msg = messageQueue.poll();if(msg !=null) {// 消費條件滿足情況,從消息容器中取出一條消息System.out.println("已經消費消息:"+ msg +",當前暫存的消息數量是:"+ messageQueue.size());   }else{            System.out.println("消息處理中心內沒有消息可供消費!");        }   System.out.println("=======================");returnmsg; }
}}

消息處理中心服務 BrokerServer

客戶端 MqClient


/*** * 用于啟動消息處理中心* */
public class BrokerServer implements Runnable {public static int SERVICE_PORT = 9999;private final Socket socket;public BrokerServer(Socket socket) {this.socket = socket;}@Overridepublic void run() {try (BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));PrintWriter out = new PrintWriter(socket.getOutputStream())) {while (true) {String str = in.readLine();if (str == null) {continue;}System.out.println("接收到原始數據:" + str);if (str.equals("CONSUME")) {// CONSUME 表示要消費一條消息//從消息隊列中消費一條消息String message = Broker.consume();out.println(message);out.flush();} else if (str.contains("SEND:")) {// 接受到的請求包含SEND:字符串 表示生產消息放到消息隊列中Broker.produce(str);} else {System.out.println("原始數據:" + str + "沒有遵循協議,不提供相關服務");}}} catch (Exception e) {e.printStackTrace();}}public static void main(String[] args) throws Exception {ServerSocket server = new ServerSocket(SERVICE_PORT);while (true) {BrokerServer brokerServer = new BrokerServer(server.accept());new Thread(brokerServer).start();}}
}

測試MQ

public class ProduceClient {public static void main(String[] args) throws Exception {MqClient client = newMqClient();client.produce("SEND:Hello World");}
}public class ConsumeClient {public static void main(String[] args) throws Exception {MqClient client = newMqClient();String message = client.consume();System.out.println("獲取的消息為:" + message);}
}

我們多執行幾次客戶端的生產方法和消費方法就可以看到一個完整的MQ的通訊過程,下面是我執行了幾次的一些日志

接收到原始數據:SEND:Hello World成功向消息處理中心投遞消息:SEND:Hello World,當前暫存的消息數量是:

1=======================接收到原始數據:SEND:Hello World成功向消息處理中心投遞消息:SEND:Hello World,當前暫存的消息數量是:
2=======================接收到原始數據:SEND:Hello World成功向消息處理中心投遞消息:SEND:Hello World,當前暫存的消息數量是:
3=======================接收到原始數據:SEND:Hello World消息處理中心內暫存的消息達到最大負荷,不能繼續放入消息!
=======================接收到原始數據:Hello World原始數據:Hello World沒有遵循協議,不提供相關服務接收到原始數據:CONSUME已經消費消息:SEND:Hello World,當前暫存的消息數量是:
2=======================接收到原始數據:CONSUME已經消費消息:SEND:Hello World,當前暫存的消息數量是:
1=======================接收到原始數據:CONSUME已經消費消息:SEND:Hello World,當前暫存的消息數量是:
0=======================接收到原始數據:CONSUME消息處理中心內沒有消息可供消費!=======================

小結

本章示例代碼主要源自分布式消息中間件實踐一書 , 這里我們自己使用Java語言寫了一個MQ消息隊列 , 通過這個消息隊列我們對MQ中的幾個角色 “生產者,消費者,消費處理中心,協議” 有了更深的理解 ; 那么下一章節我們就來一塊學習具體廠商的MQ RabbitMQ

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/249725.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/249725.shtml
英文地址,請注明出處:http://en.pswp.cn/news/249725.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Knockout中ko.utils中處理數組的方法集合

每一套框架基本上都會有一個工具類,如:Vue中的Vue.util、Knockout中的ko.utils、jQuery直接將一些工具類放到了$里面,如果你還需要更多的工具類可以試試lodash。本文只介紹一下Knockout中ko.utils中處理數組的一些方法。 ko.utils.arrayForEa…

$nextTick 源碼

x現在沒時間,留個坑 轉載于:https://www.cnblogs.com/smzd/p/11634665.html

java 發布訂閱

1.發布者接口 package com.yy.subpub; /** * Description: 發布者接口 * author: leijing * date: 2016年9月29日 下午5:07:20 */ public interface IPublisher<M> { /** * Description: 向訂閱器發布消息 * param subscribePublish 訂閱器 * param message 消息 * para…

.NET Core Session的簡單使用

前言 在之前的.NET 里&#xff0c;我們可以很容易的使用Session讀取值。那今天我們來看看 如何在.NET Core中讀取Session值呢&#xff1f; Session 使用Session之前&#xff0c;我們需要到Startup.cs中配置我們的服務如下&#xff1a; ①在ConfigureServices中加入&#xff1a;…

EasyNVR內網攝像機接入網關+EasyNVS云端管理平臺,組件起一套輕量級類似于企業級螢石云的解決方案...

背景分析 對于EasyNVR我們應該都了解&#xff0c;主要應用于互聯安防直播&#xff0c;對于EasyNVR&#xff0c;我們可以清楚的發現&#xff0c;EasyNVR的工作機制是EasyNVR拉取攝像機的RTSP/Onvif視頻流&#xff0c;然后客戶端可以通過訪問EasyNVR服務端實現流分發&#xff0c;…

java.util.Queue用法

隊列是一種特殊的線性表&#xff0c;它只允許在表的前端&#xff08;front&#xff09;進行刪除操作&#xff0c;而在表的后端&#xff08;rear&#xff09;進行插入操作。進行插入操作的端稱為隊尾&#xff0c;進行刪除操作的端稱為隊頭。隊列中沒有元素時&#xff0c;稱為空隊…

Vim刪除文件到行首或者行尾

vim用的不是很熟練&#xff0c;只是有時候需要的時候會學習一下 我們知道&#xff0c;vim有三種模式&#xff0c;一種是一般模式&#xff0c;一種是編輯模式&#xff0c;另外一種是命令行模式 在一般模式下&#xff0c;可以進行刪除&#xff0c;復制粘貼等操作&#xff0c;在編…

新版本微信導致的ios表單bug

解決方法如下&#xff1a; $(document).delegate(input, textarea, select, blur, function(){setTimeout(function(){$(html).animate({height: 100.1vh}, 100, function(){$(this).animate({height: 100vh}, 1)})},100); }); 轉載于:https://www.cnblogs.com/qdlhj/p/1033676…

Golang的值類型和引用類型的范圍、存儲區域、區別

常見的值類型和引用類型分別有哪些&#xff1f; 值類型&#xff1a;基本數據類型 int 系列, float 系列, bool, string 、數組和結構體struct&#xff0c;使用這些類型的變量直接指向存在內存中的值&#xff0c;值類型的變量的值通常存儲在棧中。 引用類型&#xff1a;指針、sl…

python3之time模塊

時間戳1: import time2: print(time.time()) 可讀的時間格式1: import time2: print(time.ctime())3: later time.time() 6004: print(time.ctime(later)) 結果1: Wed Jan 30 17:11:49 20192: Wed Jan 30 17:21:49 2019 暫停程序(進程或者線程)1: time.sleep(secs) 計時時鐘1…

網絡規劃設計(項目類業務)

前期準備&#xff1a;找經開部要到當地的現場結構圖 1.和通信段約好時間&#xff0c;實地跑一趟&#xff0c;找到光纜、電纜的原匯聚點。 2.與車間人員溝通&#xff0c;看是否要遷匯聚點&#xff0c;倘若遷匯聚點&#xff0c;遷到哪里。 3.怎么從光纜/電纜的舊址遷到新址&#…

RPC框架實現原理

一、什么是RPC框架&#xff1f; RPC&#xff0c;全稱為Remote Procedure Call&#xff0c;即遠程過程調用&#xff0c;是一種計算機通信協議。 比如現在有兩臺機器&#xff1a;A機器和B機器&#xff0c;并且分別部署了應用A和應用B。假設此時位于A機器上的A應用想要調用位于B機…

jQuery安裝

http://www.runoob.com/jquery/jquery-install.html 網頁中添加jQuery&#xff1a; 方法一&#xff1a;可以從http://jquery.com/download/ 下載jQuery庫 方法二&#xff1a;從CDN中載入jQuery 下載 jQuery 有兩個版本的 jQuery 可供下載&#xff1a; Production version - 用于…

redhat相關配置

網絡配置&#xff1a; vi /etc/sysconfig/network-scripts/ifcfg-eth BOOTPROTOstaticONBOOTyesIP配置IPADDR192.168.31.102NETMASK255.255.255.0GATEWAY192.168.31.1DNS1192.168.31.1redhat6&#xff1a;防火墻&#xff1a;1. 永久性生效開啟&#xff1a;chkconfig iptables o…

zookeeper入門系列

zookeeper可謂是目前使用最廣泛的分布式組件了。其功能和職責單一&#xff0c;但卻非常重要。 在現今這個年代&#xff0c;介紹zookeeper的書和文章可謂多如牛毛&#xff0c;本人不才&#xff0c;試圖通過自己的理解來介紹zookeeper&#xff0c;希望通過一個初學者的視角來學習…

java.lang.NumberFormatException: multiple points錯誤問題

最近項目一直會出現時間轉換報錯&#xff0c;一直不知道是什么問題??? java.lang.NumberFormatException: multiple pointsat sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1110)at java.lang.Double.parseDouble(Double.java:540)at java.text.Dig…

plsql查詢數據中文亂碼

在plsql中進行表數據查詢的時候&#xff0c;發現查詢出來的中文居然顯示為亂碼&#xff0c;通過查找資料解決該問題。 1、查看數據的編碼&#xff08;語句&#xff1a;select * from v$nls_parameters&#xff09; 發現顯示的語言不是我們常用的GBK模式 2、配置本機語言環境變量…

Zookeeper的功能以及工作原理

1.ZooKeeper是什么&#xff1f; ZooKeeper是一個分布式的&#xff0c;開放源碼的分布式應用程序協調服務&#xff0c;是Google的Chubby一個開源的實現&#xff0c;它是集群的管理者&#xff0c;監視著集群中各個節點的狀態根據節點提交的反饋進行下一步合理操作。最終&#xf…

前端學習總結——CSS布局方式之傳統布局

傳統布局 傳統布局即是早期在平板電腦、智能手機等移動設備并不流行的時候使用的布局方式。 一、表格布局 例如&#xff1a;采用表格方式實現如下簡單模型的布局 &#xff08;1&#xff09;固定布局 即用具體的像素值來確定模型的寬和高等值。 HTML代碼如下所示 <tabl…

aspose word for java去除目錄文字藍色樣式以及文字下方藍色下劃線

//去除目錄文字藍色樣式以及文字下方藍色下劃線for(FieldStart field: (Iterable<FieldStart>)doc.getChildNodes(NodeType.FIELD_START, true)){if (field.getFieldType() FieldType.FIELD_HYPERLINK){FieldHyperlink hyperlink (FieldHyperlink)field.getField();//判…