目錄
- 前言
- 設計思想
前言
之前,我們寫了通信協議的具體設計,接下來我們設計服務器類
設計思想
我們先只考慮一個虛擬主機的情況下, 在一個虛擬主機的情況下,我們需要有一個session會話來幫助我們存儲信息,并且既然是網絡通信,那么socket關鍵字肯定也必不可少,我們在引入一個線程池,用來處理多個客戶端的請求
private ServerSocket serverSocket = null;// 當前考慮一個 BrokerServer 上只有一個 虛擬主機private VirtuaHost virtualHost = new VirtuaHost("default");// 使用這個 哈希表 表示當前的所有會話(也就是說有哪些客戶端正在和咱們的服務器進行通信)// 此處的 key 是 channelId, value 為對應的 Socket 對象private ConcurrentHashMap<String, Socket> sessions = new ConcurrentHashMap<String, Socket>();// 引入一個線程池, 來處理多個客戶端的請求.private ExecutorService executorService = null;// 引入一個 boolean 變量控制服務器是否繼續運行private volatile boolean runnable = true;
代碼實現
package com.example.demo.mqServer;import com.example.demo.Common.*;
import com.example.demo.mqServer.core.BasicProperties;import javax.websocket.Session;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/* 消息隊列的本體服務器, 是TCP服務器 */
public class BrokerServer {private ServerSocket serverSocket = null;// 當前考慮一個 BrokerServer 上只有一個 虛擬主機private VirtuaHost virtualHost = new VirtuaHost("default");// 使用這個 哈希表 表示當前的所有會話(也就是說有哪些客戶端正在和咱們的服務器進行通信)// 此處的 key 是 channelId, value 為對應的 Socket 對象private ConcurrentHashMap<String, Socket> sessions = new ConcurrentHashMap<String, Socket>();// 引入一個線程池, 來處理多個客戶端的請求.private ExecutorService executorService = null;// 引入一個 boolean 變量控制服務器是否繼續運行private volatile boolean runnable = true;public BrokerServer (int port) throws IOException {serverSocket = new ServerSocket(port);}// 開始服務器public void start() throws IOException {System.out.println("[BrokerServer] 啟動!");executorService = Executors.newCachedThreadPool();try {while (runnable) {Socket clientSocket = serverSocket.accept();// 把處理連接的邏輯丟給這個線程池.executorService.submit(() -> {processConnection(clientSocket);});}} catch (SocketException e) {System.out.println("[BrokerServer] 服務器停止運行!");// e.printStackTrace();}}// 停止服務器public void stop() throws IOException {runnable=false;executorService.shutdownNow();serverSocket.close();}// 處理一個客戶端的鏈接// 在這一個鏈接中, 可能會涉及到多個請求和響應private void processConnection(Socket clientSocket) {try (InputStream inputStream = clientSocket.getInputStream();OutputStream outputStream = clientSocket.getOutputStream()){try (DataInputStream dataInputStream = new DataInputStream(inputStream);DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){while (true){// 1 讀取請求并解析Request request = readRequest(dataInputStream);// 2 根據請求計算響應Response response = process(request, clientSocket);// 3. 把響應寫回給客戶端writeResponse(dataOutputStream, response);}} catch (EOFException | SocketException e) {// 對于這個代碼, DataInputStream 如果讀到 EOF , 就會拋出一個 EOFException 異常.// 需要借助這個異常來結束循環System.out.println("[BrokerServer] connection 關閉! 客戶端的地址: " + clientSocket.getInetAddress().toString()+ ":" + clientSocket.getPort());} catch (ClassNotFoundException | MqException e) {e.printStackTrace();}} catch (IOException e) {System.out.println("[BrokerServer] connection 出現異常!");e.printStackTrace();} finally {try {// 當連接處理完了, 就需要記得關閉 socketclientSocket.close();// 一個 TCP 連接中, 可能包含多個 channel. 需要把當前這個 socket 對應的所有 channel 也順便清理掉.clearClosedSession(clientSocket);} catch (IOException e) {e.printStackTrace();}}}private Request readRequest(DataInputStream dataInputStream) throws IOException {Request request = new Request();request.setType(dataInputStream.readInt());request.setLength(dataInputStream.readInt());byte[] payload = new byte[request.getLength()];int n = dataInputStream.read(payload);if (n != request.getLength()) {throw new IOException("讀取請求格式出錯!");}request.setPayload(payload);return request;}private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {dataOutputStream.writeInt(response.getType());dataOutputStream.writeInt(response.getLength());dataOutputStream.write(response.getPayload());// 這個刷新緩沖區也是重要的操作!!dataOutputStream.flush();}private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {// 1. 把 request 中的 payload 做一個初步的解析.BasicArguments basicArguments = (BasicArguments) BinaryTool.toObject(request.getPayload());System.out.println("[Request] rid=" + basicArguments.getRid() + ", channelId=" + basicArguments.getChannelId()+ ", type=" + request.getType() + ", length=" + request.getLength());// 2. 根據 type 的值, 來進一步區分接下來這次請求要干啥.boolean ok = true;if (request.getType() == 0x1) {// 創建 channelsessions.put(basicArguments.getChannelId(), clientSocket);System.out.println("[BrokerServer] 創建 channel 完成! channelId=" + basicArguments.getChannelId());} else if (request.getType() == 0x2) {// 銷毀 channelsessions.remove(basicArguments.getChannelId());System.out.println("[BrokerServer] 銷毀 channel 完成! channelId=" + basicArguments.getChannelId());} else if (request.getType() == 0x3) {// 創建交換機. 此時 payload 就是 ExchangeDeclareArguments 對象了.ExchangeDeclareArguments arguments = (ExchangeDeclareArguments) basicArguments;ok = virtualHost.exchangeDeclare(arguments.getExchangeName(), arguments.getExchangeType(),arguments.isDurable(), arguments.isAutoDelete(), arguments.getArguments());} else if (request.getType() == 0x4) {ExchangeDeleteArguments arguments = (ExchangeDeleteArguments) basicArguments;ok = virtualHost.exchangeDelete(arguments.getExchangeName());} else if (request.getType() == 0x5) {QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments;ok = virtualHost.queueDeclare(arguments.getQueueName(), arguments.isDurable(),arguments.isExclusive(), arguments.isAutoDelete(), arguments.getArguments());} else if (request.getType() == 0x6) {QueueDeleteArguments arguments = (QueueDeleteArguments) basicArguments;ok = virtualHost.queueDelete((arguments.getQueueName()));} else if (request.getType() == 0x7) {QueueBindArguments arguments = (QueueBindArguments) basicArguments;ok = virtualHost.queueBind(arguments.getQueueName(), arguments.getExchangeName(), arguments.getBindingKey());} else if (request.getType() == 0x8) {QueueUnbindArguments arguments = (QueueUnbindArguments) basicArguments;ok = virtualHost.queueUnbind(arguments.getQueueName(), arguments.getExchangeName());} else if (request.getType() == 0x9) {BasicPublishArguments arguments = (BasicPublishArguments) basicArguments;ok = virtualHost.basicPublish(arguments.getExchangeName(), arguments.getRoutingKey(),arguments.getBasicProperties(), arguments.getBody());} else if (request.getType() == 0xa) {BasicConsumeArguments arguments = (BasicConsumeArguments) basicArguments;ok = virtualHost.basicConsume(arguments.getConsumerTag(), arguments.getQueueName(), arguments.isAutoAck(),new Consumer() {// 這個回調函數要做的工作, 就是把服務器收到的消息可以直接推送回對應的消費者客戶端@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {// 先知道當前這個收到的消息, 要發給哪個客戶端.// 此處 consumerTag 其實是 channelId. 根據 channelId 去 sessions 中查詢, 就可以得到對應的// socket 對象了, 從而可以往里面發送數據了// 1. 根據 channelId 找到 socket 對象Socket clientSockte = sessions.get(consumerTag);if (clientSocket == null || clientSocket.isClosed()) {throw new MqException("[BrokerServer] 訂閱消息的客戶端已經關閉!");}SubScribeReturns subScribeReturns = new SubScribeReturns();subScribeReturns.setChannelId(consumerTag);subScribeReturns.setRid("");subScribeReturns.setOk(true);subScribeReturns.setConsumerTag(consumerTag);subScribeReturns.setBody(body);subScribeReturns.setBasicProperties(basicProperties);byte[] payload = BinaryTool.toBytes(subScribeReturns);// 2. 構造響應數據Response response = new Response();// oxc 服務器給消費者客戶端托送的消息數據response.setType(0xc);// response 的 payload 就是一個 SubScribeReturnsresponse.setLength(payload.length);response.setPayload(payload);// 把數據寫回到客戶端 , 寫入到響應之中DataOutputStream dataOutputStream = new DataOutputStream(clientSockte.getOutputStream());writeResponse(dataOutputStream,response);}});} else if (request.getType() == 0xb) {// 調用 basicAck 確認消息.BasicAckArguments arguments = (BasicAckArguments) basicArguments;ok = virtualHost.basicAck(arguments.getQueueName(), arguments.getMessageId());} else {// 當前的 type 是非法的.throw new MqException("[BrokerServer] 未知的 type! type=" + request.getType());}// 3. 構造響應BasicReturns basicReturns = new BasicReturns();basicReturns.setChannelId(basicArguments.getChannelId());basicReturns.setRid(basicArguments.getRid());basicReturns.setOk(ok);byte[] payload = BinaryTool.toBytes(basicReturns);Response response = new Response();response.setType(request.getType());response.setLength(payload.length);response.setPayload(payload);System.out.println("[Response] rid=" + basicReturns.getRid() + ", channelId=" + basicReturns.getChannelId()+ ", type=" + response.getType() + ", length=" + response.getLength());return response;}private void clearClosedSession(Socket clientSocket) {// 這里主要做的事情就是, 將遍歷哈希表, 將不用的session 清楚掉// 不能在集合類中變查詢, 邊刪除List<String> toDeleteChannelId = new ArrayList<>();for (Map.Entry<String ,Socket> entry:sessions.entrySet()) {toDeleteChannelId.add(entry.getKey());}for (String s:toDeleteChannelId) {sessions.remove(s);}}}