Socket網絡編程(五)——TCP數據發送與接收并行

目錄

  • 主要實現需求
  • TCP 服務端收發并行重構
    • 啟動main方法重構
    • 重構分離收發消息的操作
    • 重構接收消息的操作
    • 重構發送消息
    • TCPServer調用發送消息的邏輯
    • 監聽客戶端鏈接邏輯重構
    • Socket、流的退出與關閉
  • TCP 客戶端收發并行重構
    • 客戶端 main函數重構
    • 客戶端接收消息重構
    • 客戶端發送消息重構
    • 客戶端 linkWith 主方法重構
  • TCP 收發并行重構測試
    • 服務端重構后執行日志
    • 客戶端重構后執行日志
  • 源碼下載

主要實現需求

多線程收發并行
TCP多線程收發協作
TCP 服務端收發并行重構

TCP 服務端收發并行重構

啟動main方法重構

原有的main邏輯如下:
20240229-034932-Jk.png

重構后如下:

public class Server {public static void main(String[] args) throws IOException {TCPServer tcpServer = new TCPServer(TCPConstants.PORT_SERVER);boolean isSucceed = tcpServer.start();if(!isSucceed){System.out.println("Start TCP server failed.");}UDPProvider.start(TCPConstants.PORT_SERVER);// 鍵盤輸入:BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));String str;do {str = bufferedReader.readLine();tcpServer.broadcast(str);} while (!"00bye00".equalsIgnoreCase(str));UDPProvider.stop();tcpServer.stop();}
}

重構后,從while循環不斷讀取鍵盤輸入信息,當輸入“00bye00” 時退出讀取。此處只讀取鍵盤輸入數據,客戶端發送的數據在會重新拆分出來新的線程單獨處理。

重構分離收發消息的操作

創建 ClientHandler.java 重構收發消息操作:

public class ClientHandler {private final Socket socket;private final ClientReadHandler readHandler;private final ClientWriteHandler writeHandler;private final CloseNotiry closeNotiry;public ClientHandler(Socket socket, CloseNotiry closeNotiry ) throws IOException {this.socket = socket;this.readHandler = new ClientReadHandler(socket.getInputStream());this.writeHandler = new ClientWriteHandler(socket.getOutputStream());this.closeNotiry = closeNotiry;System.out.println("新客戶鏈接: " + socket.getInetAddress() + "\tP:" + socket.getPort());} 
}

重構接收消息的操作

    /*** 接收數據*/class ClientReadHandler extends Thread {private boolean done = false;private final InputStream inputStream;ClientReadHandler(InputStream inputStream){this.inputStream = inputStream;}@Overridepublic void run(){super.run();try {// 得到輸入流,用于接收數據BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream));do {// 客戶端拿到一條數據String str = socketInput.readLine();if(str == null){System.out.println("客戶端已無法讀取數據!");// 退出當前客戶端ClientHandler.this.exitBySelf();break;}// 打印到屏幕System.out.println(str);}while (!done);socketInput.close();}catch (IOException e){if(!done){System.out.println("連接異常斷開");ClientHandler.this.exitBySelf();}}finally {// 連接關閉CloseUtils.close(inputStream);}}void exit(){done = true;CloseUtils.close(inputStream);}}

創建一個單獨的線程進行接收消息,該線程不需要關閉。

重構發送消息

    /*** 發送數據*/class ClientWriteHandler {private boolean done = false;private final PrintStream printStream;private final ExecutorService executorService;ClientWriteHandler(OutputStream outputStream) {this.printStream = new PrintStream(outputStream);// 發送消息使用線程池來實現this.executorService = Executors.newSingleThreadExecutor();}void exit(){done = true;CloseUtils.close(printStream);executorService.shutdown();}void send(String str) {executorService.execute(new WriteRunnable(str));}class WriteRunnable implements  Runnable{private final String msg;WriteRunnable(String msg){this.msg = msg;}@Overridepublic void run(){if(ClientWriteHandler.this.done){return;}try {ClientWriteHandler.this.printStream.println(msg);}catch (Exception e){e.printStackTrace();}}}}

TCPServer調用發送消息的邏輯

    public void broadcast(String str) {for (ClientHandler client : clientHandlerList){// 發送消息client.send(str);}}

監聽客戶端鏈接邏輯重構

    private List<ClientHandler> clientHandlerList = new ArrayList<>();/*** 監聽客戶端鏈接*/private class ClientListener extends Thread {private ServerSocket server;private boolean done = false;private ClientListener(int port) throws IOException {server = new ServerSocket(port);System.out.println("服務器信息: " + server.getInetAddress() + "\tP:" + server.getLocalPort());}@Overridepublic void run(){super.run();System.out.println("服務器準備就緒~");// 等待客戶端連接do{// 得到客戶端Socket client;try {client = server.accept();}catch (Exception e){continue;}try {// 客戶端構建異步線程ClientHandler  clientHandler = new ClientHandler(client, handler -> clientHandlerList.remove(handler));// 啟動線程clientHandler.readToPrint();clientHandlerList.add(clientHandler);} catch (IOException e) {e.printStackTrace();System.out.println("客戶端連接異常: " + e.getMessage());}}while (!done);System.out.println("服務器已關閉!");}void exit(){done = true;try {server.close();}catch (IOException e){e.printStackTrace();}}}

clientHandlerList作為已經建立了連接的客戶端的集合,用于管理當前用戶的信息。接收與發送都使用該集合。

Socket、流的退出與關閉

    /*** 退出、關閉流*/public void exit(){readHandler.exit();writeHandler.exit();CloseUtils.close(socket);System.out.println("客戶端已退出:" + socket.getInetAddress() + "\tP:" + socket.getPort());}/*** 發送消息* @param str*/public void send(String str){writeHandler.send(str);}/*** 接收消息*/public void readToPrint() {readHandler.exit();}/***  接收、發送消息異常,自動關閉*/private void exitBySelf() {exit();closeNotiry.onSelfClosed(this);}/***  關閉流*/public interface CloseNotiry{void onSelfClosed(ClientHandler handler);}

TCP 客戶端收發并行重構

客戶端 main函數重構

    public static void main(String[] args) {// 定義10秒的搜索時間,如果超過10秒未搜索到,就認為服務器端沒有開機ServerInfo info = UDPSearcher.searchServer(10000);System.out.println("Server:" + info);if( info != null){try {TCPClient.linkWith(info);}catch (IOException e){e.printStackTrace();}}}

客戶端接收消息重構

    static class ReadHandler extends Thread{private boolean done = false;private final InputStream inputStream;ReadHandler(InputStream inputStream){this.inputStream = inputStream;}@Overridepublic void run(){try {// 得到輸入流,用于接收數據BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream));do {// 客戶端拿到一條數據String str = null;try {str = socketInput.readLine();}catch (SocketTimeoutException e){}if(str == null){System.out.println("連接已關閉,無法讀取數據!");break;}// 打印到屏幕System.out.println(str);}while (!done);socketInput.close();}catch (IOException e){if(!done){System.out.println("連接異常斷開:" + e.getMessage());}}finally {// 連接關閉CloseUtils.close(inputStream);}}void exit(){done = true;CloseUtils.close(inputStream);}}

創建ReadHandler用單獨的線程去接收服務端的消息。連接關閉則exit() 關閉客戶端。

客戶端發送消息重構

    private static void write(Socket client) throws IOException {// 構建鍵盤輸入流InputStream in = System.in;BufferedReader input = new BufferedReader(new InputStreamReader(in));// 得到Socket輸出流,并轉換為打印流OutputStream outputStream = client.getOutputStream();PrintStream socketPrintStream = new PrintStream(outputStream);boolean flag = true;do {// 鍵盤讀取一行String str = input.readLine();// 發送到服務器socketPrintStream.println(str);// 從服務器讀取一行if("00bye00".equalsIgnoreCase(str)){break;}}while(flag);// 資源釋放socketPrintStream.close();}

在linkWith() 中調用write() 發送方法,由 do-while 循環讀取本地鍵盤輸入信息進行發送操作。當滿足 “00bye00” 時,關閉循環,關閉socket連接,結束該線程。

客戶端 linkWith 主方法重構

     public static void linkWith(ServerInfo info) throws IOException {Socket socket = new Socket();// 超時時間socket.setSoTimeout(3000);// 端口2000;超時時間300mssocket.connect(new InetSocketAddress(Inet4Address.getByName(info.getAddress()),info.getPort()));//System.out.println("已發起服務器連接,并進入后續流程~");System.out.println("客戶端信息: " + socket.getLocalAddress() + "\tP:" + socket.getLocalPort());System.out.println("服務器信息:" + socket.getInetAddress() + "\tP:" + socket.getPort());try {ReadHandler readHandler = new ReadHandler(socket.getInputStream());readHandler.start();// 發送接收數據write(socket);}catch (Exception e){System.out.println("異常關閉");}// 釋放資源socket.close();System.out.println("客戶端已退出~");}

原有的邏輯里,是調用 todo() 方法,在todo() 方法里同時進行收發操作。現在是進行讀寫分離。

TCP 收發并行重構測試

服務端重構后執行日志

20240229-053719-hC.png

客戶端重構后執行日志

20240229-053740-Qt.png

源碼下載

下載地址:https://gitee.com/qkongtao/socket_study/tree/master/src/main/java/cn/kt/socket/SocketDemo_L5_TCP_Channel

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/713785.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/713785.shtml
英文地址,請注明出處:http://en.pswp.cn/news/713785.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

前端封裝通用下載方法及下載后端返回的文件流

目錄 1.下載方法封裝 2.將后端返回的文件流轉換為文件 3.總結 1.下載方法封裝 ①說明 前端的請求大概分為三種類型 普通請求&#xff1a;常用的get&#xff0c;post&#xff0c;put&#xff0c;delete等請求 上傳請求&#xff1a;使用post請求&#xff0c;發送formdata對…

Zookeeper學習1:概述、安裝、應用場景、集群配置

文章目錄 概述安裝LinuxWindows 配置參數集群參考配置文件配置步驟流程啟動 概述 Zookeeper&#xff1a; 為分布式框架組件提供協調服務的中間件 【類似&#xff1a;文件系統通知機制】 負責存儲上下層應用關系的數據以及接收觀察者注冊監聽&#xff0c;一旦觀察查關心的數據發…

git操作基本指令

1.查看用戶名 git config user.name 2.查看密碼 git config user.password 3.查看郵箱 git config user.email 4.修改用戶名 git config --global user.name "xxx(新用戶名)" 5.修改密碼 git config --global user.password "xxx(新密碼)" 6.修改…

筆記73:ROS中的各種消息包

參考視頻&#xff1a; 33.ROS 的標準消息包 std_msgs_嗶哩嗶哩_bilibili 34. ROS 中的幾何包 geometry_msgs 和 傳感器包 sensor_msgs_嗶哩嗶哩_bilibili 標準消息包&#xff1a;std_msgs常用消息包&#xff1a;common_msgs導航消息包&#xff1a;nav_msgs幾何消息包&#xf…

實戰分享:Tomcat打破雙親委派模型,實現Web應用獨立與安全隔離的奧秘

目錄 一、JVM 類加載機制 二、Tomcat 類加載器 2.2 findClass 介紹 3.2 loadClass 介紹 三、web應用隔離 3.1 Spring 加載問題 在開始文章內容之前&#xff0c;先來看三個問題 假如在 Tomcat 上運行了兩個 Web 應用程序&#xff0c;兩個 web 應用中有同名的Servlet&#xf…

C++數據結構與算法——二叉樹的屬性

C第二階段——數據結構和算法&#xff0c;之前學過一點點數據結構&#xff0c;當時是基于Python來學習的&#xff0c;現在基于C查漏補缺&#xff0c;尤其是樹的部分。這一部分計劃一個月&#xff0c;主要利用代碼隨想錄來學習&#xff0c;刷題使用力扣網站&#xff0c;不定時更…

AGI概念與實現

AGI AGI&#xff08;Artificial General Intelligence&#xff09;&#xff0c;中文名為“通用人工智能”或“強人工智能”&#xff0c;是指通過機器學習和數據分析等技術&#xff0c;使計算機具有類似于人類的認知和學習能力的技術. 多模態的大模型 &#xff08;Multimodal…

詳細介紹如何用windows自帶Hyper-V安裝虛擬機(windows11和ubuntu22)

通過系統自帶的hyper-v安裝windows11&#xff0c;舒服又愜意&#xff0c;相比用第三方虛擬機軟件速度快很多。 硬件準備 準備 系統需要符合能安裝 Hyper-V 的最低要求windows版本含Hyper-V的功能 電腦空間 電腦要有足夠的空間來安裝你這個虛擬機。根據自己的磁盤容量情況來規…

2673. 使二叉樹所有路徑值相等的最小代價

給你一個整數 n 表示一棵 滿二叉樹 里面節點的數目&#xff0c;節點編號從 1 到 n 。根節點編號為 1 &#xff0c;樹中每個非葉子節點 i 都有兩個孩子&#xff0c;分別是左孩子 2 * i 和右孩子 2 * i 1 。 樹中每個節點都有一個值&#xff0c;用下標從 0 開始、長度為 n 的整…

CloudCanal x Hive 構建高效的實時數倉

簡述 CloudCanal 最近對于全周期數據流動進行了初步探索&#xff0c;打通了Hive 目標端的實時同步&#xff0c;為實時數倉的構建提供了支持&#xff0c;這篇文章簡要做下分享。 基于臨時表的增量合并方式基于 HDFS 文件寫入方式臨時表統一 Schema任務級的臨時表 基于臨時表的…

【Linux實踐室】Linux初體驗

&#x1f308;個人主頁&#xff1a;聆風吟 &#x1f525;系列專欄&#xff1a;Linux實踐室、網絡奇遇記 &#x1f516;少年有夢不應止于心動&#xff0c;更要付諸行動。 文章目錄 一. ??任務描述二. ??相關知識2.1 &#x1f514;Linux 目錄結構介紹2.2 &#x1f514;Linux …

WebFlux相關問題及答案(2024)

1、什么是Spring WebFlux&#xff1f; Spring WebFlux 是 Spring Framework 5.0 中引入的一個全新的反應式框架&#xff0c;用于構建異步、非阻塞且事件驅動的服務。它允許開發者使用響應式編程模型來處理并發性很高的操作&#xff0c;而無需擔心傳統的多線程環境中的復雜性。…

poi工具讀寫excel操作學習總結

寫在前面的話 POI作為比較早期的Excel處理工具&#xff0c;其使用較為成熟且廣泛。EasyExcel相較之下&#xff0c;則是相對較新的工具&#xff0c;其卻有著比POI更為優越的一些特性&#xff0c;如更加簡單的API接口和更加優秀的性能。 性能對比&#xff1a;在數據量較小的情況下…

mybatis mysql insert 主鍵id為空

錯誤示范 java代碼設置了param參數&#xff0c;但是sql 字段沒有帶上參數&#xff0c;例如 void insertV2(Param("historyDO") HistoryDO historyDO); <insert id"insertDuplicate" parameterType"com.test.entity.HistoryDO"keyProperty&…

MySQL:一行記錄如何

1、表空間文件結構 表空間由段「segment」、區「extent」、頁「page」、行「row」組成&#xff0c;InnoDB存儲引擎的邏輯存儲結構大致如下圖&#xff1a; 行 數據庫表中的記錄都是按「行」進行存放的&#xff0c;每行記錄根據不同的行格式&#xff0c;有不同的存儲結構。 頁…

hippy 調試demo運行聯調-mac環境準備篇

適用對于終端編譯環境不熟悉的人看&#xff0c;僅mac端 hippy 調試文檔官網地址 前提&#xff1a;請使用node16 聯調預覽效果圖&#xff1a; 編譯iOS Demo環境準備 未跑通&#xff0c;待補充 編譯Android Demo環境準備 1、正常安裝Android Studio 2、下載Android NDK&a…

Windows系統誤刪文件恢復

最近很多用戶反饋誤刪文件的場景比較多.下面華仔將講解數據恢復的原理和過程.以及一些注意事項。 建議的數據恢復軟件 1.EaseUS Data Recovery Wizard(易我數據恢復)需要斷網使用 2.Wondershare Recoverit(萬興數據恢復)&#xff0c; Windows系統刪除文件原理&#xff1a;如果是…

Android ShellUtils手機管理器

1. Android ShellUtils手機管理器 Android Shell工具類&#xff0c;可用于檢查系統root權限&#xff0c;并在shell或root用戶下執行shell命令。如&#xff1a; checkRootPermission() 檢查root權限 。execCommand(String[] commands, boolean isRoot, boolean isNeedResultMsg)…

HTTPS是什么,詳解它的加密過程

目錄 1.前言 2.兩種加密解密方式 2.1對稱加密 2.2非對稱加密 3.HTTPS的加密過程 3.1針對明文的對稱加密 3.2針對密鑰的非對稱加密 3.3證書的作用 1.前言 我們知道HTTP協議是超文本傳輸協議,它被廣泛的應用在客戶端服務器上,用來傳輸文字,圖片,視頻,js,html等.但是這種傳…

java數據結構與算法刷題-----LeetCode572. 另一棵樹的子樹(經典題,樹字符串化KMP)

java數據結構與算法刷題目錄&#xff08;劍指Offer、LeetCode、ACM&#xff09;-----主目錄-----持續更新(進不去說明我沒寫完)&#xff1a;https://blog.csdn.net/grd_java/article/details/123063846 文章目錄 1. 暴力求解&#xff0c;深度優先2. KMP算法進行串匹配 1. 暴力求…