目錄
- 主要實現需求
- TCP 服務端收發并行重構
- 啟動main方法重構
- 重構分離收發消息的操作
- 重構接收消息的操作
- 重構發送消息
- TCPServer調用發送消息的邏輯
- 監聽客戶端鏈接邏輯重構
- Socket、流的退出與關閉
- TCP 客戶端收發并行重構
- 客戶端 main函數重構
- 客戶端接收消息重構
- 客戶端發送消息重構
- 客戶端 linkWith 主方法重構
- TCP 收發并行重構測試
- 服務端重構后執行日志
- 客戶端重構后執行日志
- 源碼下載
主要實現需求
多線程收發并行
TCP多線程收發協作
TCP 服務端收發并行重構
TCP 服務端收發并行重構
啟動main方法重構
原有的main邏輯如下:
重構后如下:
public class Server {public static void main(String[] args) throws IOException {TCPServer tcpServer = new TCPServer(TCPConstants.PORT_SERVER);boolean isSucceed = tcpServer.start();if(!isSucceed){System.out.println("Start TCP server failed.");}UDPProvider.start(TCPConstants.PORT_SERVER);// 鍵盤輸入:BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));String str;do {str = bufferedReader.readLine();tcpServer.broadcast(str);} while (!"00bye00".equalsIgnoreCase(str));UDPProvider.stop();tcpServer.stop();}
}
重構后,從while循環不斷讀取鍵盤輸入信息,當輸入“00bye00” 時退出讀取。此處只讀取鍵盤輸入數據,客戶端發送的數據在會重新拆分出來新的線程單獨處理。
重構分離收發消息的操作
創建 ClientHandler.java 重構收發消息操作:
public class ClientHandler {private final Socket socket;private final ClientReadHandler readHandler;private final ClientWriteHandler writeHandler;private final CloseNotiry closeNotiry;public ClientHandler(Socket socket, CloseNotiry closeNotiry ) throws IOException {this.socket = socket;this.readHandler = new ClientReadHandler(socket.getInputStream());this.writeHandler = new ClientWriteHandler(socket.getOutputStream());this.closeNotiry = closeNotiry;System.out.println("新客戶鏈接: " + socket.getInetAddress() + "\tP:" + socket.getPort());}
}
重構接收消息的操作
/*** 接收數據*/class ClientReadHandler extends Thread {private boolean done = false;private final InputStream inputStream;ClientReadHandler(InputStream inputStream){this.inputStream = inputStream;}@Overridepublic void run(){super.run();try {// 得到輸入流,用于接收數據BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream));do {// 客戶端拿到一條數據String str = socketInput.readLine();if(str == null){System.out.println("客戶端已無法讀取數據!");// 退出當前客戶端ClientHandler.this.exitBySelf();break;}// 打印到屏幕System.out.println(str);}while (!done);socketInput.close();}catch (IOException e){if(!done){System.out.println("連接異常斷開");ClientHandler.this.exitBySelf();}}finally {// 連接關閉CloseUtils.close(inputStream);}}void exit(){done = true;CloseUtils.close(inputStream);}}
創建一個單獨的線程進行接收消息,該線程不需要關閉。
重構發送消息
/*** 發送數據*/class ClientWriteHandler {private boolean done = false;private final PrintStream printStream;private final ExecutorService executorService;ClientWriteHandler(OutputStream outputStream) {this.printStream = new PrintStream(outputStream);// 發送消息使用線程池來實現this.executorService = Executors.newSingleThreadExecutor();}void exit(){done = true;CloseUtils.close(printStream);executorService.shutdown();}void send(String str) {executorService.execute(new WriteRunnable(str));}class WriteRunnable implements Runnable{private final String msg;WriteRunnable(String msg){this.msg = msg;}@Overridepublic void run(){if(ClientWriteHandler.this.done){return;}try {ClientWriteHandler.this.printStream.println(msg);}catch (Exception e){e.printStackTrace();}}}}
TCPServer調用發送消息的邏輯
public void broadcast(String str) {for (ClientHandler client : clientHandlerList){// 發送消息client.send(str);}}
監聽客戶端鏈接邏輯重構
private List<ClientHandler> clientHandlerList = new ArrayList<>();/*** 監聽客戶端鏈接*/private class ClientListener extends Thread {private ServerSocket server;private boolean done = false;private ClientListener(int port) throws IOException {server = new ServerSocket(port);System.out.println("服務器信息: " + server.getInetAddress() + "\tP:" + server.getLocalPort());}@Overridepublic void run(){super.run();System.out.println("服務器準備就緒~");// 等待客戶端連接do{// 得到客戶端Socket client;try {client = server.accept();}catch (Exception e){continue;}try {// 客戶端構建異步線程ClientHandler clientHandler = new ClientHandler(client, handler -> clientHandlerList.remove(handler));// 啟動線程clientHandler.readToPrint();clientHandlerList.add(clientHandler);} catch (IOException e) {e.printStackTrace();System.out.println("客戶端連接異常: " + e.getMessage());}}while (!done);System.out.println("服務器已關閉!");}void exit(){done = true;try {server.close();}catch (IOException e){e.printStackTrace();}}}
clientHandlerList作為已經建立了連接的客戶端的集合,用于管理當前用戶的信息。接收與發送都使用該集合。
Socket、流的退出與關閉
/*** 退出、關閉流*/public void exit(){readHandler.exit();writeHandler.exit();CloseUtils.close(socket);System.out.println("客戶端已退出:" + socket.getInetAddress() + "\tP:" + socket.getPort());}/*** 發送消息* @param str*/public void send(String str){writeHandler.send(str);}/*** 接收消息*/public void readToPrint() {readHandler.exit();}/*** 接收、發送消息異常,自動關閉*/private void exitBySelf() {exit();closeNotiry.onSelfClosed(this);}/*** 關閉流*/public interface CloseNotiry{void onSelfClosed(ClientHandler handler);}
TCP 客戶端收發并行重構
客戶端 main函數重構
public static void main(String[] args) {// 定義10秒的搜索時間,如果超過10秒未搜索到,就認為服務器端沒有開機ServerInfo info = UDPSearcher.searchServer(10000);System.out.println("Server:" + info);if( info != null){try {TCPClient.linkWith(info);}catch (IOException e){e.printStackTrace();}}}
客戶端接收消息重構
static class ReadHandler extends Thread{private boolean done = false;private final InputStream inputStream;ReadHandler(InputStream inputStream){this.inputStream = inputStream;}@Overridepublic void run(){try {// 得到輸入流,用于接收數據BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream));do {// 客戶端拿到一條數據String str = null;try {str = socketInput.readLine();}catch (SocketTimeoutException e){}if(str == null){System.out.println("連接已關閉,無法讀取數據!");break;}// 打印到屏幕System.out.println(str);}while (!done);socketInput.close();}catch (IOException e){if(!done){System.out.println("連接異常斷開:" + e.getMessage());}}finally {// 連接關閉CloseUtils.close(inputStream);}}void exit(){done = true;CloseUtils.close(inputStream);}}
創建ReadHandler用單獨的線程去接收服務端的消息。連接關閉則exit() 關閉客戶端。
客戶端發送消息重構
private static void write(Socket client) throws IOException {// 構建鍵盤輸入流InputStream in = System.in;BufferedReader input = new BufferedReader(new InputStreamReader(in));// 得到Socket輸出流,并轉換為打印流OutputStream outputStream = client.getOutputStream();PrintStream socketPrintStream = new PrintStream(outputStream);boolean flag = true;do {// 鍵盤讀取一行String str = input.readLine();// 發送到服務器socketPrintStream.println(str);// 從服務器讀取一行if("00bye00".equalsIgnoreCase(str)){break;}}while(flag);// 資源釋放socketPrintStream.close();}
在linkWith() 中調用write() 發送方法,由 do-while 循環讀取本地鍵盤輸入信息進行發送操作。當滿足 “00bye00” 時,關閉循環,關閉socket連接,結束該線程。
客戶端 linkWith 主方法重構
public static void linkWith(ServerInfo info) throws IOException {Socket socket = new Socket();// 超時時間socket.setSoTimeout(3000);// 端口2000;超時時間300mssocket.connect(new InetSocketAddress(Inet4Address.getByName(info.getAddress()),info.getPort()));//System.out.println("已發起服務器連接,并進入后續流程~");System.out.println("客戶端信息: " + socket.getLocalAddress() + "\tP:" + socket.getLocalPort());System.out.println("服務器信息:" + socket.getInetAddress() + "\tP:" + socket.getPort());try {ReadHandler readHandler = new ReadHandler(socket.getInputStream());readHandler.start();// 發送接收數據write(socket);}catch (Exception e){System.out.println("異常關閉");}// 釋放資源socket.close();System.out.println("客戶端已退出~");}
原有的邏輯里,是調用 todo() 方法,在todo() 方法里同時進行收發操作。現在是進行讀寫分離。
TCP 收發并行重構測試
服務端重構后執行日志
客戶端重構后執行日志
源碼下載
下載地址:https://gitee.com/qkongtao/socket_study/tree/master/src/main/java/cn/kt/socket/SocketDemo_L5_TCP_Channel