通過 NIO + 多線程 提升硬件設備與系統的數據傳輸性能

一、項目展示

下圖(模擬的數據可視化大屏)中數據是動態顯示的

在這里插入圖片描述

二、項目簡介

描述:使用Client模擬了硬件設備,比如可燃氣體濃度檢測器。Client通過Socket與Server建立連接,Server保存數據到txt文件,并使用WebSocket將數據推送到數據可視化大屏

工作:通過多線程+NIO優化了Server性能

原理圖:

在這里插入圖片描述

三、代碼實現

Server

NioSocketServerService.java

package com.example.server;import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;@Service
public class NioSocketServerService {private static final int PORT = 8081;private static final int TIMEOUT = 5000;private static final BlockingQueue<String> writeQueue = new LinkedBlockingQueue<>();@PostConstructpublic void startServer() {for (int i = 0; i < 4; i++) {new Thread(new FileWriterTask(writeQueue)).start();}new Thread(() -> {try {Selector selector = Selector.open();ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.bind(new InetSocketAddress(PORT));serverSocketChannel.configureBlocking(false);serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);System.out.println("Server is listening on port " + PORT);while (true) {if (selector.select(TIMEOUT) == 0) {continue;}Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();try {if (key.isAcceptable()) {handleAccept(key, selector);} else if (key.isReadable()) {handleRead(key);}} catch (IOException e) {key.cancel();               // 取消鍵的注冊,這意味著該通道不再被選擇器監視key.channel().close();      // 關閉通道,釋放資源}}}} catch (IOException e) {e.printStackTrace();}}).start();}private void handleAccept(SelectionKey key, Selector selector) throws IOException {ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();SocketChannel socketChannel = serverSocketChannel.accept();socketChannel.configureBlocking(false);socketChannel.register(selector, SelectionKey.OP_READ);SocketChannelHandler.addBuffer(socketChannel);System.out.println("New client connected: " + socketChannel.getRemoteAddress());}private void handleRead(SelectionKey key) throws IOException {SocketChannel socketChannel = (SocketChannel) key.channel();SocketChannelHandler.readFromChannel(socketChannel, writeQueue);}
}

SocketChannelHandler.java

package com.example.server;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;public class SocketChannelHandler {private static final String DIRECTORY = "data/";private static final int BUFFER_SIZE = 2048;private static final Map<SocketChannel, ByteBuffer> bufferMap = new ConcurrentHashMap<>();private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");public static void addBuffer(SocketChannel socketChannel) {bufferMap.put(socketChannel, ByteBuffer.allocateDirect(BUFFER_SIZE));}public static void readFromChannel(SocketChannel socketChannel, BlockingQueue<String> writeQueue) throws IOException {ByteBuffer buffer = bufferMap.get(socketChannel);buffer.clear();int bytesRead;try {bytesRead = socketChannel.read(buffer);} catch (IOException e) {System.err.println("Error reading from socket: " + e.getMessage());socketChannel.close();bufferMap.remove(socketChannel);return;}if (bytesRead == -1) {          // 讀取到-1表示客戶端已關閉連接,移除緩沖區socketChannel.close();bufferMap.remove(socketChannel);} else if (bytesRead > 0) {buffer.flip();byte[] data = new byte[buffer.remaining()];buffer.get(data);String message = new String(data);String[] dataParts = message.split(" : ", 2);if (dataParts.length == 2) {String deviceId = dataParts[0].trim();String deviceData = dataParts[1].trim();String currentTime = LocalDateTime.now().format(dateTimeFormatter);String dataToWrite = DIRECTORY + deviceId + ".txt : " + currentTime + " : " + deviceData;writeQueue.add(dataToWrite);WebSocketServer.sendMessage(deviceId + " : " + currentTime + " : " + deviceData);}}}
}

FileWriterTask.java

package com.example.server;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;public class FileWriterTask implements Runnable {private static final int BATCH_SIZE = 10;/*** BlockingQueue是JUC包中的一個接口,提供了線程安全的隊列操作* 支持阻塞的put和take操作,當隊列滿時put會阻塞,直到隊列有空位;當隊列空時take會阻塞,直到隊列有元素* 其主要實現包括:ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue*/private final BlockingQueue<String> writeQueue;public FileWriterTask(BlockingQueue<String> writeQueue) {this.writeQueue = writeQueue;}@Overridepublic void run() {while (true) {try {List<String> dataList = new ArrayList<>();// 讀取BATCH_SIZE條數據,或等待超時后退出循環while (dataList.size() < BATCH_SIZE) {String data = writeQueue.poll(100, TimeUnit.MILLISECONDS);if (data != null) {dataList.add(data);} else {break;}}// 如果讀取到數據,則將其寫入文件if (!dataList.isEmpty()) {for (String data : dataList) {String[] dataParts = data.split(" : ");if (dataParts.length == 3) {String fileName = dataParts[0].trim();try (FileChannel fileChannel = FileChannel.open(Paths.get(fileName), StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.APPEND)) {ByteBuffer buffer = ByteBuffer.wrap((data + System.lineSeparator()).getBytes());fileChannel.write(buffer);}}}}} catch (IOException | InterruptedException e) {e.printStackTrace();}}}
}

Client

MultiThreadedSocketClient.java

package com.example.client;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class MultiThreadedSocketClient {public static void main(String[] args) {String hostname = "localhost";int port = 8081;int numberOfDevices = 1000;ExecutorService executor = Executors.newFixedThreadPool(numberOfDevices);for (int i = 1; i <= numberOfDevices; i++) {String deviceId = "Device" + i;executor.submit(new DeviceClient(hostname, port, deviceId));}executor.shutdown();}
}

DeviceClient.java

package com.example.client;import java.io.IOException;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Random;
import java.util.concurrent.TimeUnit;class DeviceClient implements Runnable {private String hostname;private int port;private String deviceId;private Random random = new Random();private static final int MAX_RETRIES = 15;private static final int RETRY_DELAY_MS = 1000;public DeviceClient(String hostname, int port, String deviceId) {this.hostname = hostname;this.port = port;this.deviceId = deviceId;}@Overridepublic void run() {int attempt = 0;boolean connected = false;while (attempt < MAX_RETRIES && !connected) {try {Thread.sleep(random.nextInt(15000));} catch (InterruptedException e) {throw new RuntimeException(e);}try (Socket socket = new Socket(hostname, port);PrintWriter out = new PrintWriter(socket.getOutputStream(), true)) {connected = true;while (true) {try {String data = deviceId + " : " + random.nextInt(50000);out.println(data);TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {Thread.currentThread().interrupt();break;}}} catch (UnknownHostException e) {System.err.println("Unknown host: " + hostname);break;} catch (IOException e) {attempt++;int randomDelay = random.nextInt(10000);System.err.println(deviceId + "\tAttempt " + attempt + " - Connection refused. Retrying in " + (RETRY_DELAY_MS + randomDelay) + "ms...");try {Thread.sleep(RETRY_DELAY_MS + randomDelay);} catch (InterruptedException ie) {Thread.currentThread().interrupt();break;}}}if (!connected) {System.err.println("Failed to connect after " + MAX_RETRIES + " attempts.");}}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/15661.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/15661.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/15661.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

結構體(位段)內存分配

結構體由多個數據類型的成員組成。那編譯器分配的內存是不是所有成員的字節數總和呢&#xff1f; 首先&#xff0c;stu的內存大小并不為29個字節&#xff0c;即證明結構體內存不是所有成員的字節數和。 ??其次&#xff0c;stu成員中sex的內存位置不在21&#xff0c;即可推測…

Swift 請求用戶授權以跟蹤其跨應用或網站的活動

步驟1:導入框架 首先,需要在Swift文件中導入AppTrackingTransparency框架。 import AppTrackingTransparency import AdSupport步驟2:請求跟蹤許可 在適當的地方請求用戶的跟蹤許可。通常,這個請求會在應用啟動時或者在用戶執行某些操作(例如,訪問應用中的廣告相關功能…

Linux服務器安裝docker,基于Linux(openEuler、CentOS8)

本實驗環境為openEuler系統(以server方式安裝)&#xff08;CentOS8基本一致&#xff0c;可參考本文) 目錄 知識點實驗 知識點 Docker 是一個開源的應用容器引擎。它允許開發者將應用及其所有依賴項打包到一個可移植的容器中&#xff0c;并發布到任何支持Docker的流行Linux或Wi…

基于python flask的web服務

基本例子 from flask import Flask app Flask(__name__) app.route(/)#檢查訪問的網址&#xff0c;根路徑走這里 def hello_world():return hello world#返回hello worldif __name__ __main__:# 綁定到指定的IP地址和端口app.run(host0.0.0.0, port1000, debugTrue)##綁定端…

設計一個完美的用戶角色權限表

設計一個完美的用戶角色權限表需要考慮系統的安全性、靈活性和可擴展性。以下是一個詳細的用戶角色權限管理表設計方案&#xff0c;包含多個表結構和字段描述。 目錄 1. 用戶表&#xff08;Users Table&#xff09;2. 角色表&#xff08;Roles Table&#xff09;3. 權限表&…

【數據結構與算法 | 基礎篇】環形數組模擬隊列

1. 前言 上文我們用環形單向鏈表實現了隊列.接下來我們用環形數組來模擬隊列.并實現了isFull()&#xff0c;isEmpty()等方法. 2. 環形數組模擬隊列 (1). Queue接口 : public interface Queue<E> {//向隊伍插入值, 插入成功返回true, 否則返回falseboolean offer(E v…

【Linux】TCP協議【上】{協議段屬性:源端口號/目的端口號/序號/確認序號/窗口大小/緊急指針/標記位}

文章目錄 1.引入2.協議段格式4位首部長度16位窗口大小32位序號思考三個問題【demo】標記位URG: 緊急指針是否有效提升某報文被處理優先級【0表示不設置1表示設置】ACK: 確認號是否有效PSH: 提示接收端應用程序立刻從TCP緩沖區把數據讀走RST: 對方要求重新建立連接; 我們把攜帶R…

windows 設置系統字體 (win11 win10)

由于微軟的字體是有版權的&#xff0c;所以我打算替換掉 1.下載替換工具 github的項目&#xff0c;看起來很多人對微軟默認字體帶版權深惡痛絕。 項目地址&#xff1a;nomeiryoUi地址 這里選取最新的版本即可 2.打開軟件 這里顯示標題欄不能改&#xff0c;確認&#xff0c;其…

蓋雅技能發展云,助力制造企業人效合一

制造行業盡管經歷多次變革&#xff0c;但企業對人的管理始終是一項高度依賴經驗和耗費人力的工作。隨著供應鏈管理和生產設備的自動化、數字化升級&#xff0c;如何將第一生產要素——人&#xff0c;通過數字化的工具融入制造過程的閉環&#xff0c;對企業實現自動化工廠和智能…

力扣 滑動窗口題目總結

Leetcode3.無重復字符的最長子串 思路&#xff1a; 這道題主要用到思路是&#xff1a;滑動窗口 什么是滑動窗口&#xff1f; 其實就是一個隊列,比如例題中的 abcabcbb&#xff0c;進入這個隊列&#xff08;窗口&#xff09;為 abc 滿足題目要求&#xff0c;當再進入 a&#x…

牛客NC334 字典序第K小【困難 10叉樹 Java/Go/PHP/C++】,力扣 440. 字典序的第K小數字

題目 題目鏈接&#xff1a; https://www.nowcoder.com/practice/670c2bda374241d7ae06ade60de33e8b https://leetcode.cn/problems/k-th-smallest-in-lexicographical-order/description/ 本答案核心 10叉樹, 數學規律Java代碼 import java.util.*;public class Solution {…

大模型的靈魂解讀:Anthropic AI的Claude3 Sonnet可解釋性研究

大模型技術論文不斷&#xff0c;每個月總會新增上千篇。本專欄精選論文重點解讀&#xff0c;主題還是圍繞著行業實踐和工程量產。若在某個環節出現卡點&#xff0c;可以回到大模型必備腔調重新閱讀。而最新科技&#xff08;Mamba,xLSTM,KAN&#xff09;則提供了大模型領域最新技…

Vue集成Iframe

一、應用場景&#xff0c;為什么要集成Iframe&#xff1f; 1、龐大項目拆分后&#xff0c;便于管理和部署&#xff0c;用集成Iframe的方法合并 2、避免功能重復開發&#xff0c;共用模塊可單獨開發為一個項目&#xff0c;既可獨立部署&#xff0c;也可集成到中臺系統 二、集成…

[算法][前綴和] [leetcode]724. 尋找數組的中心下標

題目地址 https://leetcode.cn/problems/find-pivot-index/description/ 題目描述 代碼 class Solution {public int pivotIndex(int[] nums) {int total Arrays.stream(nums).sum();//前綴和int prefixSum 0;int len nums.length;for(int i 0;i<len;i){if (i-1>0){p…

小豬APP分發:一站式托管服務,輕松玩轉應用市場

在當今移動應用爆炸式增長的時代&#xff0c;開發者們面臨的挑戰不再僅限于創意的火花和代碼的實現&#xff0c;更在于如何讓精心打造的應用快速觸達廣大用戶。這正是小豬APP分發www.appzhu.net應運而生的背景——作為一個全面、高效的APP托管服務分發平臺&#xff0c;它為開發…

基于PHP的物業管理的設計與實現

第1章 緒論... 1 1.1 研究背景與意義... 1 1.2 國內外發展現狀... 2 第2章 關鍵技術介紹... 3 2.1 PHP語言... 3 2.2 MySQL數據庫... 3 2.3 Zend框架... 4 2.4 B/S架構... 4 第3章 系統需求分析... 5 3.1 可行性分析... 5 3.1.1 技術可行性分析... 5 3.1.2 經濟可行…

解決Java中的IllegalArgumentException異常的正確方法

解決Java中的IllegalArgumentException異常的正確方法 引言 在Java編程中&#xff0c;IllegalArgumentException是一個常見的運行時異常&#xff0c;它通常在方法接收到不合法或不適當的參數時拋出。這篇文章將詳細介紹IllegalArgumentException異常的原因、如何診斷以及解決…

金職優學:分析央國企面試如何通關?

在當今競爭激烈的就業市場中&#xff0c;中央和國有企業&#xff08;以下簡稱“央國企”&#xff09;的面試機會對求職者來說是非常有吸引力的。這些企業通常擁有穩定的發展前景、良好的薪酬福利和廣闊的職業發展空間。但是&#xff0c;要想成功通過央國企的面試&#xff0c;求…

探索Python編程世界:從基礎到實戰

新書上架~&#x1f447;全國包郵奧~ python實用小工具開發教程http://pythontoolsteach.com/3 歡迎關注我&#x1f446;&#xff0c;收藏下次不迷路┗|&#xff40;O′|┛ 嗷~~ 目錄 一、Python語言簡介與動態特性 代碼示例&#xff1a;動態類型與變量命名 二、Python應用領…

vue 表格表頭展示不下,顯示。。。;鼠標懸浮展示全部

vue 表格表頭展示不下&#xff0c;顯示。。。&#xff1b;鼠標懸浮展示全部 <templateslot-scope"scope"slot"header"><span:title"臨時證券類型"style"white-space:nowrap">{{ 臨時證券類型 }}</span></templa…