目錄
- 聊天室數據傳輸設計
- 客戶端、服務器數據交互
- 數據傳輸協議
- 服務器、多客戶端模型
- 客戶端如何發送消息到另外一個客戶端
- 2個以上設備如何交互數據?
- 聊天室消息接收實現
- 代碼結構
- client客戶端重構
- server服務端重構
- 自身描述信息的構建
- 重構TCPServer.java
- 基于synchronized 解決多線程操作的安全問題
- 聊天室Server/Client啟動、測試
- 源碼下載
聊天室數據傳輸設計
- 必要條件:客戶端、服務器
- 必要約束:數據傳輸協議
- 原理:服務器監聽消息來源、客戶端鏈接服務器并發送消息到服務器
客戶端、服務器數據交互
client 發送消息到服務器端,服務器端回復消息也就是回送消息。
數據傳輸協議
數據在傳輸的時候,需要在尾部追加換行符,也就是說原來5個字節的數據,在實際傳輸時,是有6個字節長度的。
服務器、多客戶端模型
在客戶端有多個情況下,客戶端都會向服務器端進行發送消息;想要在PC發送消息給服務器端時,也讓安卓、平板等終端都能收到,其操作應該是,當PC端發送一條消息到服務器端之后,服務器端得到該數據后,它會把這條數據發送(回送)給當前連接的客戶端。而這些當前連接的客戶端收到這條消息后,就實現了把PC消息發送到手機的過程。
客戶端如何發送消息到另外一個客戶端
每個客戶端都是服務器也是客戶端?
答:不是
2個以上設備如何交互數據?
答:約定一個基礎的數據格式,這里使用回車換行符來作為信息的截斷
客戶端-服務器-轉發到客戶端,如下圖:
User1發送消息到服務端,服務端將消息轉發給其他的客戶端(比如User2),從而實現聊天室的功能
聊天室消息接收實現
代碼結構
代碼分為四個module,分別為clink、constants、client、server。
- clink:該module為提供工具類進行校驗與流處理。
- constants:基礎的共用類代碼
- server:服務端代碼,需要依賴 clink、constants兩個module
- client:客戶端代碼,需要依賴 clink、constants兩個module
clink、constants的工具類,基礎數據類參考前面 TCP點對點傳輸的代碼邏輯
client客戶端重構
初版代碼和TCP點對點傳輸的基本一致,聊天室主要在TCPServer端進行轉發,所以Client不需要代碼重構。
server服務端重構
初版代碼和TCP點對點傳輸的基本一致,要實現聊天室消息接收則需要進行重構。主要重構 TCPServer.java 、ClientHandler.java類。
ClientHandler.java - 消息轉發
原有的消息在收到后就只是打印到控制臺
// 打印到屏幕
System.out.println(str);
而實現聊天室功能需要將收到的消息進行通知出去。這里可以通過 CloseNotify() 接口進行實現。這里對該接口進行改造,并新增轉發的接口方法來將消息通知回去。
/*** 消息回調*/public interface ClientHandlerCallback {// 自身不安比通知void onSelfClosed(ClientHandler handler);// 收到消息時通知void onNewMessageArrived(ClientHandler handler,String msg);}
在將消息打印到屏幕的同時,將消息通知出去:
// 打印到屏幕System.out.println(str);clientHandlerCallback.onNewMessageArrived(ClientHandler.this,str);
調用onNewMessageArrived()方法從而進行轉發。這里主要是把當前收到的消息傳遞回去,同時也要把自身傳遞回去。
自身描述信息的構建
新增clientInfo類變量:
private final String clientInfo;
自身描述信息初始化:
public ClientHandler(Socket socket, ClientHandlerCallback clientHandlerCallback) throws IOException {this.socket = socket;this.readHandler = new ClientReadHandler(socket.getInputStream());this.writeHandler = new ClientWriteHandler(socket.getOutputStream());this.clientHandlerCallback = clientHandlerCallback;// 新增自身描述信息this.clientInfo = "A[" + socket.getInetAddress().getHostAddress() + "] P[" + socket.getPort() + "]";System.out.println("新客戶端連接:" + clientInfo);}public String getClientInfo() {return clientInfo;}
重構TCPServer.java
重構 clientHandler.ClientHandlerCallback的兩個回調方法,這里要將之提到TCPServer.java類上。
讓TCPServer.java 實現 clientHandler.ClientHandlerCallback接口。并實現兩個方法:
@Overridepublic synchronized void onSelfClosed(ClientHandler handler) {}@Overridepublic void onNewMessageArrived(ClientHandler handler, String msg) {}
并將 客戶端構建溢出線程的remove操作遷移到 onSelfClosed() 方法實現內:
@Overridepublic synchronized void onSelfClosed(ClientHandler handler) {clientHandlerList.remove(handler);}
原有的ClientHandler異步線程處理邏輯如下
// 客戶端構建異步線程ClientHandler clientHandler = new ClientHandler(client,handler -> clientHandlerList.remove(handler));
重構后,如下:
// 客戶端構建異步線程ClientHandler clientHandler = new ClientHandler(client,TCPServer.this);
消息轉發
/*** 轉發消息給其他客戶端* @param handler* @param msg*/@Overridepublic void onNewMessageArrived(ClientHandler handler, String msg) {// 打印到屏幕System.out.println("Received-" + handler.getClientInfo() + ":" + msg);// 轉發forwardingThreadPoolExecutor.execute(()->{for (ClientHandler clientHandler : clientHandlerList){if(clientHandler.equals(handler)){// 跳過自己continue;}// 向其他客戶端投遞消息clientHandler.send(msg);}});}
基于synchronized 解決多線程操作的安全問題
由于這里有對 clientHandlerList集合的刪除、添加、遍歷等操作,這涉及到對所有客戶端的操作,在多線程的環境下,默認的List不是線程安全的,所以存在多線程的安全問題。
public void stop() {if (mListener != null) {mListener.exit();}synchronized (TCPServer.this){for (ClientHandler clientHandler : clientHandlerList) {clientHandler.exit();}clientHandlerList.clear();}// 停止線程池forwardingThreadPoolExecutor.shutdownNow();}public synchronized void broadcast(String str) {for (ClientHandler clientHandler : clientHandlerList) {clientHandler.send(str);}}/*** 刪除當前消息* @param handler*/@Overridepublic synchronized void onSelfClosed(ClientHandler handler) {clientHandlerList.remove(handler);}/*** 轉發消息給其他客戶端* @param handler* @param msg*/@Overridepublic void onNewMessageArrived(ClientHandler handler, String msg) {// 打印到屏幕System.out.println("Received-" + handler.getClientInfo() + ":" + msg);// 轉發}
這里加類鎖來保證刪除操作的線程安全。
關于添加操作的線程安全問題解決如下:
try {// 客戶端構建異步線程ClientHandler clientHandler = new ClientHandler(client,TCPServer.this);// 讀取數據并打印clientHandler.readToPrint();// 添加同步處理synchronized (TCPServer.this) {clientHandlerList.add(clientHandler);}} catch (IOException e) {e.printStackTrace();System.out.println("客戶端連接異常:" + e.getMessage());}
異步轉發
// 轉發clientHandlerCallback.onNewMessageArrived(ClientHandler.this,str);
在ClientHandler.java中,上述代碼所在的線程是主要線程,會一直有消息進來,所以不能做同步處理,那樣會導致當前線程阻塞,從而導致后面進來的消息無法及時處理。
所以當 onNewMessageArrived()將消息拋出去之后,TCPServer.java的實現要采取異步轉發的方式退給其他客戶端。創建一個新的單例線程池來做轉發的操作:
新增轉發線程池:
// 轉發線程池private final ExecutorService forwardingThreadPoolExecutor;public TCPServer(int port) {this.port = port;this.forwardingThreadPoolExecutor = Executors.newSingleThreadExecutor();}
轉發投遞消息給其他客戶端:
/*** 轉發消息給其他客戶端* @param handler* @param msg*/@Overridepublic void onNewMessageArrived(ClientHandler handler, String msg) {// 打印到屏幕System.out.println("Received-" + handler.getClientInfo() + ":" + msg);// 轉發forwardingThreadPoolExecutor.execute(()->{synchronized (TCPServer.this){for (ClientHandler clientHandler : clientHandlerList){if(clientHandler.equals(handler)){// 跳過自己continue;}// 向其他客戶端投遞消息clientHandler.send(msg);}}});}
防止客戶端下線后,依舊重復發送的問題:
ClientHandler.java - ClientWriteHandler
/*** 發送到客戶端* @param str*/void send(String str) {// 如果已經發送完成,就返回if(done){return;}executorService.execute(new WriteRunnable(str));}
聊天室Server/Client啟動、測試
idea單個程序同時啟動多個窗口的方法:
-
啟動main方法
-
勾選運行運行多個
-
保存退出就可以了
測試結果如下:
-
先啟動服務端,再啟動三個客戶端
-
服務端和客戶端發消息
服務端發送:我是服務端
客戶端發送客戶端1、客戶端2、客戶端3
-
其中一個客戶端退出,不影響其他客戶端和服務端發送消息
至此,socket簡易,聊天室重構完成
源碼下載
下載地址:https://gitee.com/qkongtao/socket_study/tree/master/src/main/java/cn/kt/socket/SocketDemo_chatroom