消息隊列(12) - 定義服務器類

目錄

  • 前言
  • 設計思想

前言

之前,我們寫了通信協議的具體設計,接下來我們設計服務器類

設計思想

我們先只考慮一個虛擬主機的情況下, 在一個虛擬主機的情況下,我們需要有一個session會話來幫助我們存儲信息,并且既然是網絡通信,那么socket關鍵字肯定也必不可少,我們在引入一個線程池,用來處理多個客戶端的請求

  private ServerSocket serverSocket = null;// 當前考慮一個 BrokerServer 上只有一個 虛擬主機private VirtuaHost virtualHost = new VirtuaHost("default");// 使用這個 哈希表 表示當前的所有會話(也就是說有哪些客戶端正在和咱們的服務器進行通信)// 此處的 key 是 channelId, value 為對應的 Socket 對象private ConcurrentHashMap<String, Socket> sessions = new ConcurrentHashMap<String, Socket>();// 引入一個線程池, 來處理多個客戶端的請求.private ExecutorService executorService = null;// 引入一個 boolean 變量控制服務器是否繼續運行private volatile boolean runnable = true;

代碼實現

package com.example.demo.mqServer;import com.example.demo.Common.*;
import com.example.demo.mqServer.core.BasicProperties;import javax.websocket.Session;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/* 消息隊列的本體服務器, 是TCP服務器 */
public class BrokerServer {private ServerSocket serverSocket = null;// 當前考慮一個 BrokerServer 上只有一個 虛擬主機private VirtuaHost virtualHost = new VirtuaHost("default");// 使用這個 哈希表 表示當前的所有會話(也就是說有哪些客戶端正在和咱們的服務器進行通信)// 此處的 key 是 channelId, value 為對應的 Socket 對象private ConcurrentHashMap<String, Socket> sessions = new ConcurrentHashMap<String, Socket>();// 引入一個線程池, 來處理多個客戶端的請求.private ExecutorService executorService = null;// 引入一個 boolean 變量控制服務器是否繼續運行private volatile boolean runnable = true;public  BrokerServer (int port) throws IOException {serverSocket = new ServerSocket(port);}// 開始服務器public void start() throws IOException {System.out.println("[BrokerServer] 啟動!");executorService = Executors.newCachedThreadPool();try {while (runnable) {Socket clientSocket = serverSocket.accept();// 把處理連接的邏輯丟給這個線程池.executorService.submit(() -> {processConnection(clientSocket);});}} catch (SocketException e) {System.out.println("[BrokerServer] 服務器停止運行!");// e.printStackTrace();}}// 停止服務器public void stop() throws IOException {runnable=false;executorService.shutdownNow();serverSocket.close();}// 處理一個客戶端的鏈接// 在這一個鏈接中, 可能會涉及到多個請求和響應private void processConnection(Socket clientSocket) {try (InputStream inputStream = clientSocket.getInputStream();OutputStream outputStream = clientSocket.getOutputStream()){try (DataInputStream dataInputStream = new DataInputStream(inputStream);DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){while (true){// 1 讀取請求并解析Request request = readRequest(dataInputStream);// 2 根據請求計算響應Response response = process(request, clientSocket);// 3. 把響應寫回給客戶端writeResponse(dataOutputStream, response);}} catch (EOFException | SocketException e) {// 對于這個代碼, DataInputStream 如果讀到 EOF , 就會拋出一個 EOFException 異常.// 需要借助這個異常來結束循環System.out.println("[BrokerServer] connection 關閉! 客戶端的地址: " + clientSocket.getInetAddress().toString()+ ":" + clientSocket.getPort());} catch (ClassNotFoundException | MqException e) {e.printStackTrace();}} catch (IOException e) {System.out.println("[BrokerServer] connection 出現異常!");e.printStackTrace();} finally {try {// 當連接處理完了, 就需要記得關閉 socketclientSocket.close();// 一個 TCP 連接中, 可能包含多個 channel. 需要把當前這個 socket 對應的所有 channel 也順便清理掉.clearClosedSession(clientSocket);} catch (IOException e) {e.printStackTrace();}}}private Request readRequest(DataInputStream dataInputStream) throws IOException {Request request = new Request();request.setType(dataInputStream.readInt());request.setLength(dataInputStream.readInt());byte[] payload = new byte[request.getLength()];int n = dataInputStream.read(payload);if (n != request.getLength()) {throw new IOException("讀取請求格式出錯!");}request.setPayload(payload);return request;}private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {dataOutputStream.writeInt(response.getType());dataOutputStream.writeInt(response.getLength());dataOutputStream.write(response.getPayload());// 這個刷新緩沖區也是重要的操作!!dataOutputStream.flush();}private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {// 1. 把 request 中的 payload 做一個初步的解析.BasicArguments basicArguments = (BasicArguments) BinaryTool.toObject(request.getPayload());System.out.println("[Request] rid=" + basicArguments.getRid() + ", channelId=" + basicArguments.getChannelId()+ ", type=" + request.getType() + ", length=" + request.getLength());// 2. 根據 type 的值, 來進一步區分接下來這次請求要干啥.boolean ok = true;if (request.getType() == 0x1) {// 創建 channelsessions.put(basicArguments.getChannelId(), clientSocket);System.out.println("[BrokerServer] 創建 channel 完成! channelId=" + basicArguments.getChannelId());} else if (request.getType() == 0x2) {// 銷毀 channelsessions.remove(basicArguments.getChannelId());System.out.println("[BrokerServer] 銷毀 channel 完成! channelId=" + basicArguments.getChannelId());} else if (request.getType() == 0x3) {// 創建交換機. 此時 payload 就是 ExchangeDeclareArguments 對象了.ExchangeDeclareArguments arguments = (ExchangeDeclareArguments) basicArguments;ok = virtualHost.exchangeDeclare(arguments.getExchangeName(), arguments.getExchangeType(),arguments.isDurable(), arguments.isAutoDelete(), arguments.getArguments());} else if (request.getType() == 0x4) {ExchangeDeleteArguments arguments = (ExchangeDeleteArguments) basicArguments;ok = virtualHost.exchangeDelete(arguments.getExchangeName());} else if (request.getType() == 0x5) {QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments;ok = virtualHost.queueDeclare(arguments.getQueueName(), arguments.isDurable(),arguments.isExclusive(), arguments.isAutoDelete(), arguments.getArguments());} else if (request.getType() == 0x6) {QueueDeleteArguments arguments = (QueueDeleteArguments) basicArguments;ok = virtualHost.queueDelete((arguments.getQueueName()));} else if (request.getType() == 0x7) {QueueBindArguments arguments = (QueueBindArguments) basicArguments;ok = virtualHost.queueBind(arguments.getQueueName(), arguments.getExchangeName(), arguments.getBindingKey());} else if (request.getType() == 0x8) {QueueUnbindArguments arguments = (QueueUnbindArguments) basicArguments;ok = virtualHost.queueUnbind(arguments.getQueueName(), arguments.getExchangeName());} else if (request.getType() == 0x9) {BasicPublishArguments arguments = (BasicPublishArguments) basicArguments;ok = virtualHost.basicPublish(arguments.getExchangeName(), arguments.getRoutingKey(),arguments.getBasicProperties(), arguments.getBody());} else if (request.getType() == 0xa) {BasicConsumeArguments arguments = (BasicConsumeArguments) basicArguments;ok = virtualHost.basicConsume(arguments.getConsumerTag(), arguments.getQueueName(), arguments.isAutoAck(),new Consumer() {// 這個回調函數要做的工作, 就是把服務器收到的消息可以直接推送回對應的消費者客戶端@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {// 先知道當前這個收到的消息, 要發給哪個客戶端.// 此處 consumerTag 其實是 channelId. 根據 channelId 去 sessions 中查詢, 就可以得到對應的// socket 對象了, 從而可以往里面發送數據了// 1. 根據 channelId 找到 socket 對象Socket clientSockte = sessions.get(consumerTag);if (clientSocket == null || clientSocket.isClosed()) {throw new MqException("[BrokerServer] 訂閱消息的客戶端已經關閉!");}SubScribeReturns subScribeReturns = new SubScribeReturns();subScribeReturns.setChannelId(consumerTag);subScribeReturns.setRid("");subScribeReturns.setOk(true);subScribeReturns.setConsumerTag(consumerTag);subScribeReturns.setBody(body);subScribeReturns.setBasicProperties(basicProperties);byte[] payload = BinaryTool.toBytes(subScribeReturns);// 2. 構造響應數據Response response = new Response();// oxc 服務器給消費者客戶端托送的消息數據response.setType(0xc);// response 的 payload 就是一個 SubScribeReturnsresponse.setLength(payload.length);response.setPayload(payload);// 把數據寫回到客戶端 , 寫入到響應之中DataOutputStream dataOutputStream = new DataOutputStream(clientSockte.getOutputStream());writeResponse(dataOutputStream,response);}});} else if (request.getType() == 0xb) {// 調用 basicAck 確認消息.BasicAckArguments arguments = (BasicAckArguments) basicArguments;ok = virtualHost.basicAck(arguments.getQueueName(), arguments.getMessageId());} else {// 當前的 type 是非法的.throw new MqException("[BrokerServer] 未知的 type! type=" + request.getType());}// 3. 構造響應BasicReturns basicReturns = new BasicReturns();basicReturns.setChannelId(basicArguments.getChannelId());basicReturns.setRid(basicArguments.getRid());basicReturns.setOk(ok);byte[] payload = BinaryTool.toBytes(basicReturns);Response response = new Response();response.setType(request.getType());response.setLength(payload.length);response.setPayload(payload);System.out.println("[Response] rid=" + basicReturns.getRid() + ", channelId=" + basicReturns.getChannelId()+ ", type=" + response.getType() + ", length=" + response.getLength());return response;}private void clearClosedSession(Socket clientSocket) {// 這里主要做的事情就是, 將遍歷哈希表, 將不用的session 清楚掉// 不能在集合類中變查詢, 邊刪除List<String> toDeleteChannelId = new ArrayList<>();for (Map.Entry<String ,Socket> entry:sessions.entrySet()) {toDeleteChannelId.add(entry.getKey());}for (String s:toDeleteChannelId) {sessions.remove(s);}}}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/35067.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/35067.shtml
英文地址,請注明出處:http://en.pswp.cn/news/35067.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

解決lldb調試時可能出現的personality set failed: Function not implemented

最近在嘗試使用Visual Studio 2022遠程連接Linux進行C/C的開發&#xff0c;由于CentOS風波不斷&#xff0c;所以現在的開發基本上都是使用ubuntu了&#xff0c;但是目前VS2022有一些BUG&#xff0c;就是遠程調試時&#xff0c;如果目標系統是ubuntu則會出現啟動調試器很慢的問題…

mysql高并發下主鍵自增打來的問題

在一般情況下&#xff0c;在新增領域對象后&#xff0c;都需要獲取對應的主鍵值。使用應用層來維護主鍵&#xff0c;在一定程度上有利于程序性能的優化和應用移植性的提高。在采用數據庫自增主鍵的方案里&#xff0c;如果JDBC驅動不能綁定新增記錄對應的主鍵&#xff0c;就需要…

LeetCode 1281. 整數的各位積和之差

【LetMeFly】1281.整數的各位積和之差 力扣題目鏈接&#xff1a;https://leetcode.cn/problems/subtract-the-product-and-sum-of-digits-of-an-integer/ 給你一個整數 n&#xff0c;請你幫忙計算并返回該整數「各位數字之積」與「各位數字之和」的差。 示例 1&#xff1a; …

學習筆記整理-JS-03-表達式和運算符

[[toc]] 一、表達式和運算符 1. 表達式 表達式種類 算術、關系、邏輯、賦值、綜合 二、JS基本表達式 1. 算術運算符 意義運算符加減-乘*除/取余% 加減乘除 加減的符號和數學一致&#xff0c;乘號是*號&#xff0c;除法是/號默認情況&#xff0c;乘除法的優先級高于加法和…

安卓源碼分析(10)Lifecycle實現組件生命周期管理

參考&#xff1a; https://developer.android.google.cn/topic/libraries/architecture/lifecycle?hlzh-cn#java https://developer.android.google.cn/reference/androidx/lifecycle/Lifecycle 文章目錄 1、概述2、LifeCycle類3、LifecycleOwner類4、LifecycleObserver類 1、…

數據庫字段命名導致的SQL報錯

1.表設計 create table variables (id bigint not null comment 主鍵,business_key varchar(128) null comment 業務key,key varchar(128) null comment Map中的key,value varchar(255) null comment…

Centos yum命令大全

1.使用YUM查找軟件包 $ yum search python 2.列出所有可安裝的軟件包 $ yum list | grep python 3.列出所有可更新的軟件包 $ yum list updates 4.列出所有已安裝的軟件包 $ yum list installed | grep python

[GIN-debug] [ERROR] listen tcp: address 8080: missing port in address

學習Golang_gin框架的第一天 遇到一下報錯 : [GIN-debug] [ERROR] listen tcp: address 8080: missing port in address 錯誤代碼 : package mainimport "github.com/gin-gonic/gin"func main() {router : gin.Default()router.GET("/index", func…

910數據結構(2014年真題)

算法設計題 問題1 已知一個帶頭結點的單鏈表head&#xff0c;假設結點中的元素為整數&#xff0c;試編寫算法&#xff1a;按遞增次序輸出單鏈表中各個結點的數據元素&#xff0c;并釋放結點所占的存儲空間。要求&#xff1a;(1)用文字給出你的算法思想&#xff1b;(2)不允許使…

nginx禁用3DES和DES弱加密算法

nginx禁用3DES和DES弱加密算法 項目背景 最近護網行動&#xff0c;收到漏洞報告&#xff0c;如下&#xff1a; 漏洞名稱SSL/TLS協議信息泄露漏洞(CVE-2016-2183)【原理掃描】詳細描述TLS是安全傳輸層協議&#xff0c;用于在兩個通信應用程序之間提供保密性和數據完整性。 TLS…

opencv 基礎50-圖像輪廓學習03-Hu矩函數介紹及示例-cv2.HuMoments()

什么是Hu 矩&#xff1f; Hu 矩&#xff08;Hu Moments&#xff09;是由計算機視覺領域的科學家Ming-Kuei Hu于1962年提出的一種圖像特征描述方法。這些矩是用于描述圖像形狀和幾何特征的不變特征&#xff0c;具有平移、旋轉和尺度不變性&#xff0c;適用于圖像識別、匹配和形狀…

C語言鏈表操作

目錄 鏈表基本操作 刪除重復元素 查找倒數第N個節點 查找中間節點 約瑟夫環 循環鏈表 合并有序鏈表 逆置鏈表 逆置鏈表(雙向鏈表) 鏈表基本操作 //linklist.c#include "linklist.h" #include <stdlib.h>struct node *head NULL; struct node *tail…

React 18 state 狀態更新函數

參考文章 把一系列 state 更新加入隊列 設置組件 state 會把一次重新渲染加入隊列。但有時可能會希望在下次渲染加入隊列之前對 state 的值執行多次操作。為此&#xff0c;了解 React 如何批量更新 state 會很有幫助。 React 會對 state 更新進行批處理 在下面的示例中&…

Docker查看、創建、進入容器相關的命令

1.查看、創建、進入容器的指令 用-it指令創建出來的容器&#xff0c;創建完成之后會立馬進入容器。退出之后立馬關閉容器。 docker run -it --namec1 centos:7 /bin/bash退出容器&#xff1a; exit查看現在正在運行的容器命令&#xff1a; docker ps查看歷史容器&#xff0…

docker小白第二天

centos上安裝docker docker官網&#xff0c;docker官網&#xff0c;找到下圖中的doc文檔。 進入如下頁面 選中manuals&#xff0c;安裝docker引擎。 最終centos下的docker安裝文檔鏈接&#xff1a;安裝文檔鏈接. 具體安裝步驟&#xff1a; 1、打開Centos&#xff0c;輸入命…

【BASH】回顧與知識點梳理(十五)

【BASH】回顧與知識點梳理 十五 十五. 指令與文件的搜尋15.1 腳本文件名的搜尋which (尋找『執行檔』) 15.2 文件檔名的搜尋whereis (由一些特定的目錄中尋找文件文件名)locate / updatedbfind與時間有關的選項與使用者或組名有關的參數與文件權限及名稱有關的參數額外可進行的…

JVM垃圾回收

如何確定垃圾 對堆垃圾回收前的第一步就是要判斷哪些對象已經死亡&#xff08;即不能再被任何途徑使用的對象&#xff09; 引用計數法 這個方法就是為對象添加計數器來標識引用個數&#xff0c;計數器為 0 的對象就是不可能再被使用的。但是這種方法存在循環引用問題&#x…

布谷鳥配音:一站式配音軟件

這是一款智能語音合成軟件&#xff0c;可以快速將文字轉換成語音&#xff0c;擁有多種真人模擬發音&#xff0c;可以選擇不同男聲、女聲、童聲&#xff0c;以及四川話、粵語等中文方言和外語配音&#xff0c;并且可對語速、語調、節奏、數字讀法、多音字、背景音等進行全方位設…

less、sass的使用及其區別

CSS預處理器 CSS 預處理器是一種擴展了原生 CSS 的工具&#xff0c;它們添加了一些編程語言的特性&#xff0c;以便更有效地編寫、組織和維護樣式代碼。預處理器允許開發者使用變量、嵌套、函數、混合等功能&#xff0c;從而使 CSS 更具可讀性、可維護性和重用性&#xff0c;特…

學習筆記整理-JS-01-語法與變量

文章目錄 一、語法與變量1. 初識JavaScript2. JavaScript的歷史3. JavaScript與ECMAScript的關系4. JavaScript的體系5. JavaScript的語言風格和特性 二、語法1. JavaScript的書寫位置2. 認識輸出語句3. REPL環境&#xff0c;交互式解析器4. 變量是什么5. 重點內容 一、語法與變…