目錄
一、自定義應用層協議
🍅 1、格式定義
🍅 2、準備工作
🎄定義請求和響應?
?🎄 定義BasicArguments
🎄 定義BasicReturns
🍅 2、創建參數類
????????🎄 交換機
????????🎄 隊列
????????🎄 綁定
? ? ? ? 🎄發布消息
? ? ? ? 🎄 訂閱消息
? ? ? ? 🎄確認應答
? ? ? ? 🎄 消息推送
二、服務器設計
?🍅 1、編寫實例變量和構造方法
🍅 2、編寫啟動類和關閉類
🍅 3、編寫處理連接的方法:processConnection()
?🍅 4、編寫讀取請求readRequest()和寫回響應writeResponse方法
🍅 5、實現根據請求計算響應:process()方法編寫
一、自定義應用層協議
🍅 1、格式定義
本消息隊列,是需要通過網絡進行通信的。這里主要基于TCP協議,自定義應用層協議。
由于當前交互的Message數據,是二進制數據,由于HTTP和JSON都是文本協議,所以這里就不適用了。使用自定義的應用層協議。
約定自定義應用層協議格式:
????????以下是請求和響應的組成部分:
?type:
描述當前請求和響應式做什么的,描述當前請求/響應是在調用哪個API(VirtualHost中的核心API)
????????以下是type標識請求相應不同的功能,取值如下:
????????其中Channel代表的是Connection(TCP的連接)內部的”邏輯上"的連接。此時一個? ? ? ? ? ?Connection中可能會含有多個Channel。存在的意義是為了讓TCP連接
VirtualHost中的十多個方法: 0x1創建channel 0x2關閉channel 0x3創建exchange 0x4銷毀exchange 0x5創建queue 0x6銷毀queue 0x7創建binding 0x8銷毀binding 0x9發送message 0xa訂閱message 0xb返回ack 0xc服務器給客戶端推送的消息(被訂閱的消息)(響應獨有)
length:描述了payload的長度
payload:?會根據當前是請求還是響應,以及當前的type有不同的取值。
比如當前是0x3(創建交換機),
/* * 表示一個網絡通信中的請求對象,按照自定義協議的格式來展開 * */ @Data public class Request {private int type;private int length;private byte[] payload; }
當前是一個請求,那么pyload中的內容是exchangeDeclare的參數的序列化的結果;
如果當前是一個響應,那么payload里面的內容就是exchangeDeclare的返回結果的序列化內容。
那么接下來就進行代碼設計
以下都是再commen包中創建。
🍅 2、準備工作
🎄定義請求和響應?
/*
* 表示一個網絡通信中的請求對象,按照自定義協議的格式來展開
* */
@Data
public class Request {private int type;private int length;private byte[] payload;
}
/*
* 表示一個網絡通信中的響應對象,也是根據自定義應用層協議來的
* */
@Data
public class Response {private int type;private int length;private byte[] payload;
}
?🎄 定義BasicArguments
使用這個類表示方法的公共參數/輔助的字段 ,后續的每個方法會有一些不同的參數,不同的參數再使用不同的子類來表示。
rid代表請求的id,和響應的id一樣,他們是一對
channel表示的是“邏輯連接”,表示客戶端各種模塊復用一個TCP連接,
channelId就代表這些連接。
@Data
public class BasicArguments implements Serializable {
// 表示一次請求/響應的身份標識,可以把請求和響應對上protected String rid;
// 客戶端的身份標識protected String channelId;
}
🎄 定義BasicReturns
使用這個類標識各個遠程調用的方法的返回值的公共信息
/*
* 標識各個遠程調用的方法的返回值的公共信息
* */
@Data
public class BasicReturns implements Serializable {
// 用來標識唯一的請求和響應protected String rid;protected String channelId;
// 用來表示當前遠程調用方法的返回值protected boolean ok;
}
🍅 2、創建參數類
根據前面VirtualHost中的十多個方法,每個方法創建一個類,標識該方法中的相關參數。
那么這個參數到底是如何進行傳遞的?
如下圖,以交換機的參數進行舉例。
關于我們遠程調用的過程:當發起請求時,就把這些參數通過請求傳過去,然后調用VirtualHost中的API(就是VirtualHost中的那些創建刪除方法),調用完以后再返回響應。
以下是有關交換機的請求報文:
以下是創建交換機的響應報文:沒有請求報文復雜是因為,響應只需要返回請求是否執行遠程調用是否成功即可。?
以下就創建這些參數類:?
????????🎄 交換機
?創建交換機:
@Data
public class ExchangeDeclareArguments extends BasicArguments implements Serializable {private String ExchangeName;private ExchangeType exchangeType;private boolean durable;
}
刪除交換機:
@Data
public class ExchangeDeleteArguments extends BasicArguments implements Serializable {private String exchangeName;
}
????????🎄 隊列
創建隊列:
@Data
public class QueueDeclareArguments extends BasicArguments implements Serializable {private String QueueName;private boolean durable;
}
刪除隊列:
@Data
public class QueueDeleteArguments extends BasicArguments implements Serializable {private String queueName;
}
????????🎄 綁定
創建綁定:
@Data
public class QueueBindArguments extends BasicArguments implements Serializable {private String exchangeName;private String queueName;private String bindingKey;
}
刪除綁定:
@Data
public class QueueUnbindArguments extends BasicArguments implements Serializable {private String queueName;private String exchangeName;
}
? ? ? ? 🎄發布消息
@Data
public class BasicPublishArguments extends BasicArguments implements Serializable {private String exchangeName;private String routingKey;private BasicProperties basicProperties;private byte[] body;
}
? ? ? ? 🎄 訂閱消息
這個方法參數,還包含一個Consumer consumer。
這是一個回調函數,這個回調函數是不能作為參數進行傳輸的,因為這個回調函數,是客戶端這邊的。
比如,這里請求調用一個”訂閱隊列“的遠程方法,
客戶端這邊:服務器收到了請求,執行了basicConsume方法,并且返回了響應。訂閱以后,客戶端的消費者就會在后面收到消息,而這個回調函數是在消費者收到消息以后,才會進行邏輯處理,而不是再發送請求時進行傳遞的。
服務器這邊:執行的是一個固定的回調函數:把消息返回給客戶端。
@Data
public class BasicConsumeArguments extends BasicArguments implements Serializable {private String consumerTag;private String queueName;private boolean autoAck;
}
? ? ? ? 🎄確認應答
@Data
public class BasicAckArguments extends BasicArguments implements Serializable {private String queueName;private String messageId;
}
? ? ? ? 🎄 消息推送
前面的都是客戶端給服務器發送消息,這里是服務器給消費者推送消息。所以要繼承BasicReturns。
@Data
public class SubScribeReturns extends BasicReturns implements Serializable {private String consumerTag;private BasicProperties basicProperties;private byte[] body;
}
二、服務器設計
在 mqServer包中創建一個BrokerServer類。
?🍅 1、編寫實例變量和構造方法
private ServerSocket serverSocket = null;private VirtualHost virtualHost = new VirtualHost("default");// 使用這個哈希表,表示當前所有會話(那些客戶端在和這個服務器進行通信)
// 此處的key是channelId,value是對應的 socket對象private ConcurrentHashMap<String , Socket> sessions = new ConcurrentHashMap<String ,Socket>();// 引入線程池,處理多個客戶端的請求private ExecutorService executorService = null;// 引入boolean變量控制服務器是否運行private volatile boolean runnable = true;public BrokerServer(int port) throws IOException {
// 端口號serverSocket = new ServerSocket(port);}
🍅 2、編寫啟動類和關閉類
?這里利用了線程池,不斷的處理連接
public void start() throws IOException {System.out.println("[BrokerServer] 啟動!");executorService = Executors.newCachedThreadPool();try {while (runnable) {Socket clientSocket = serverSocket.accept();// 把處理連接的邏輯丟給這個線程池.executorService.submit(() -> {processConnection(clientSocket);});}} catch (SocketException e) {System.out.println("[BrokerServer] 服務器停止運行!");// e.printStackTrace();}}public void stop() throws IOException {runnable = false;
// 停止線程池executorService.shutdownNow();serverSocket.close();}private void processConnection(Socket clientSocket) {//TODO
}
🍅 3、編寫處理連接的方法:processConnection()
處理一個客戶端的連接,主要有以下幾步:
? ? ? ? (1)讀取請求并且解析
? ? ? ? (2)根據請求計算響應
? ? ? ? (3)把相應協寫回給客戶端
// 通過該方法,處理一個客戶端的連接
// 在一個連接中,可能會涉及到多個連接和請求private void processConnection(Socket clientSocket) throws IOException {
// 獲取到流對象,讀取應用層協議try(InputStream inputStream = clientSocket.getInputStream();OutputStream outputStream = clientSocket.getOutputStream()){
// 按照特定格式來讀取并且解析(轉換),此時就需要用到DataInputStream和DataOutputStreamtry (DataInputStream dataInputStream = new DataInputStream(inputStream);DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {while (true) {
// 1、讀取請求并且解析Request request = readRequest(dataInputStream);
// 2、根據請求計算響應Response response = process(request, clientSocket);
// 3、把響應寫回給客戶端writeResponse(dataOutputStream,response);}}}catch (EOFException|SocketException e) {
// DataInputStream如果讀到EOF(文件末尾),會拋出一個EOFException異常
// 視為正常的異常,用或者異常來結束循環System.out.println("[BrokerServer] connection 關閉! 客戶端的地址: " + clientSocket.getInetAddress().toString()+ ":" + clientSocket.getPort());} catch (IOException | ClassNotFoundException | MqException e) {System.out.println("[BrokerServer] connection 出現異常!");e.printStackTrace();} finally {try {clientSocket.close();
// 一個TCP連接中,可能含有多個channel,需要把當前socket對應的channel也順便清理掉clearClosedSession(clientSocket);}catch (IOException e) {e.printStackTrace();}}}
?🍅 4、編寫讀取請求readRequest()和寫回響應writeResponse方法
這里就是根據前面設定的報文格式來編寫的讀取請求和寫回響應的方法,這里的payload的具體內容在這里不作解析,在后面的process方法中進行解析
// 讀取請求并且解析private Request readRequest(DataInputStream dataInputStream) throws IOException {Request request = new Request();// 讀取出請求中4個字節的typerequest.setType(dataInputStream.readInt());
// 讀出4個字節的lengthrequest.setLength(dataInputStream.readInt());byte[] payload = new byte[request.getLength()];int n = dataInputStream.read(payload);if (n != request.getLength()){throw new IOException("讀取請求格式出錯");}request.setPayload(payload);return request;}// 把響應寫回給客戶端private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {dataOutputStream.writeInt(response.getType());dataOutputStream.writeInt(response.getLength());dataOutputStream.write(response.getPayload());
// 刷新緩沖區dataOutputStream.flush();}
🍅 5、實現根據請求計算響應:process()方法編寫
這里就要針對具體的payload進行編寫了。
當前請求中的payload里面的內容,是根據type來的,如下
VirtualHost中的十多個方法:
0x1創建channel
0x2關閉channel
0x3創建exchange
0x4銷毀exchange
0x5創建queue
0x6銷毀queue
0x7創建binding
0x8銷毀binding
0x9發送message
0xa訂閱message
0xb返回ack
0xc服務器給客戶端推送的消息(被訂閱的消息)(響應獨有)
如果是0x3,就是創建交換機對應的參數......?
主要分為以下幾步:
? ? ? ? 1、把request中的payload作出一個初步的解析
? ? ? ? 2、根據type的值,進一步區分請求要做什么
? ? ? ? 3、構造響應
private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {// 1. 把 request 中的 payload 做一個初步的解析.BasicArguments basicArguments = (BasicArguments) BinaryTool.fromBytes(request.getPayload());System.out.println("[Request] rid=" + basicArguments.getRid() + ", channelId=" + basicArguments.getChannelId()+ ", type=" + request.getType() + ", length=" + request.getLength());// 2. 根據 type 的值, 來進一步區分接下來這次請求要干啥.boolean ok = true;if (request.getType() == 0x1) {// 創建 channelsessions.put(basicArguments.getChannelId(), clientSocket);System.out.println("[BrokerServer] 創建 channel 完成! channelId=" + basicArguments.getChannelId());} else if (request.getType() == 0x2) {// 銷毀 channelsessions.remove(basicArguments.getChannelId());System.out.println("[BrokerServer] 銷毀 channel 完成! channelId=" + basicArguments.getChannelId());} else if (request.getType() == 0x3) {// 創建交換機. 此時 payload 就是 ExchangeDeclareArguments 對象了.ExchangeDeclareArguments arguments = (ExchangeDeclareArguments) basicArguments;ok = virtualHost.exchangeDeclare(arguments.getExchangeName(), arguments.getExchangeType(),arguments.isDurable());} else if (request.getType() == 0x4) {
// 刪除交換機ExchangeDeleteArguments arguments = (ExchangeDeleteArguments) basicArguments;ok = virtualHost.exchangeDelete(arguments.getExchangeName());} else if (request.getType() == 0x5) {
// 創建隊列QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments;ok = virtualHost.queueDeclare(arguments.getQueueName(), arguments.isDurable());} else if (request.getType() == 0x6) {
// 刪除隊列QueueDeleteArguments arguments = (QueueDeleteArguments) basicArguments;ok = virtualHost.queueDelete((arguments.getQueueName()));} else if (request.getType() == 0x7) {
// 創建綁定QueueBindArguments arguments = (QueueBindArguments) basicArguments;ok = virtualHost.queueBind(arguments.getQueueName(), arguments.getExchangeName(), arguments.getBindingKey());} else if (request.getType() == 0x8) {// 刪除綁定QueueUnbindArguments arguments = (QueueUnbindArguments) basicArguments;ok = virtualHost.queueUnbind(arguments.getQueueName(), arguments.getExchangeName());} else if (request.getType() == 0x9) {BasicPublishArguments arguments = (BasicPublishArguments) basicArguments;ok = virtualHost.basicPublish(arguments.getExchangeName(), arguments.getRoutingKey(),arguments.getBasicProperties(), arguments.getBody());} else if (request.getType() == 0xa) {BasicConsumeArguments arguments = (BasicConsumeArguments) basicArguments;ok = virtualHost.basicConsume(arguments.getConsumerTag(), arguments.getQueueName(), arguments.isAutoAck(),new Consumer() {// 這個回調函數要做的工作, 就是把服務器收到的消息可以直接推送回對應的消費者客戶端@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {// 先知道當前這個收到的消息, 要發給哪個客戶端.// 此處 consumerTag 其實是 channelId. 根據 channelId 去 sessions 中查詢, 就可以得到對應的// socket 對象了, 從而可以往里面發送數據了// 1. 根據 channelId 找到 socket 對象Socket clientSocket = sessions.get(consumerTag);if (clientSocket == null || clientSocket.isClosed()) {throw new MqException("[BrokerServer] 訂閱消息的客戶端已經關閉!");}// 2. 構造響應數據SubScribeReturns subScribeReturns = new SubScribeReturns();subScribeReturns.setChannelId(consumerTag);subScribeReturns.setRid(""); // 由于這里只有響應, 沒有請求, 不需要去對應. rid 暫時不需要.subScribeReturns.setOk(true);subScribeReturns.setConsumerTag(consumerTag);subScribeReturns.setBasicProperties(basicProperties);subScribeReturns.setBody(body);byte[] payload = BinaryTool.toBytes(subScribeReturns);Response response = new Response();// 0xc 表示服務器給消費者客戶端推送的消息數據.response.setType(0xc);// response 的 payload 就是一個 SubScribeReturnsresponse.setLength(payload.length);response.setPayload(payload);// 3. 把數據寫回給客戶端.// 注意! 此處的 dataOutputStream 這個對象不能 close !!!// 如果 把 dataOutputStream 關閉, 就會直接把 clientSocket 里的 outputStream 也關了.// 此時就無法繼續往 socket 中寫入后續數據了.DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream());writeResponse(dataOutputStream, response);}});} else if (request.getType() == 0xb) {// 調用 basicAck 確認消息.BasicAckArguments arguments = (BasicAckArguments) basicArguments;ok = virtualHost.basicAck(arguments.getQueueName(), arguments.getMessageId());} else {// 當前的 type 是非法的.throw new MqException("[BrokerServer] 未知的 type! type=" + request.getType());}// 3. 構造響應BasicReturns basicReturns = new BasicReturns();basicReturns.setChannelId(basicArguments.getChannelId());basicReturns.setRid(basicArguments.getRid());basicReturns.setOk(ok);byte[] payload = BinaryTool.toBytes(basicReturns);Response response = new Response();response.setType(request.getType());response.setLength(payload.length);response.setPayload(payload);System.out.println("[Response] rid=" + basicReturns.getRid() + ", channelId=" + basicReturns.getChannelId()+ ", type=" + response.getType() + ", length=" + response.getLength());return response;}
🍅 6、清理過期的sessions:clearClosedSession()
// 遍歷sessions hash表,把該被關閉的socket對應的鍵值對都刪掉private void clearClosedSession(Socket clientSocket) {List<String> toDeleteChannelId = new ArrayList<>();for(Map.Entry<String,Socket> entry : sessions.entrySet()){if(entry.getValue() == clientSocket){
// 使用集合類,不能一邊遍歷,一邊刪除toDeleteChannelId.add(entry.getKey());}}for (String channelId : toDeleteChannelId){sessions.remove(channelId);}System.out.println("[BrokerServer]清理session完成~ 被清理的channeId = " + toDeleteChannelId);}