一、項目展示
下圖(模擬的數據可視化大屏)中數據是動態顯示的
二、項目簡介
描述:使用Client模擬了硬件設備,比如可燃氣體濃度檢測器。Client通過Socket與Server建立連接,Server保存數據到txt文件,并使用WebSocket將數據推送到數據可視化大屏
工作:通過多線程+NIO優化了Server性能
原理圖:
三、代碼實現
Server
NioSocketServerService.java
package com.example.server;import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;@Service
public class NioSocketServerService {private static final int PORT = 8081;private static final int TIMEOUT = 5000;private static final BlockingQueue<String> writeQueue = new LinkedBlockingQueue<>();@PostConstructpublic void startServer() {for (int i = 0; i < 4; i++) {new Thread(new FileWriterTask(writeQueue)).start();}new Thread(() -> {try {Selector selector = Selector.open();ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.bind(new InetSocketAddress(PORT));serverSocketChannel.configureBlocking(false);serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);System.out.println("Server is listening on port " + PORT);while (true) {if (selector.select(TIMEOUT) == 0) {continue;}Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();try {if (key.isAcceptable()) {handleAccept(key, selector);} else if (key.isReadable()) {handleRead(key);}} catch (IOException e) {key.cancel(); // 取消鍵的注冊,這意味著該通道不再被選擇器監視key.channel().close(); // 關閉通道,釋放資源}}}} catch (IOException e) {e.printStackTrace();}}).start();}private void handleAccept(SelectionKey key, Selector selector) throws IOException {ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();SocketChannel socketChannel = serverSocketChannel.accept();socketChannel.configureBlocking(false);socketChannel.register(selector, SelectionKey.OP_READ);SocketChannelHandler.addBuffer(socketChannel);System.out.println("New client connected: " + socketChannel.getRemoteAddress());}private void handleRead(SelectionKey key) throws IOException {SocketChannel socketChannel = (SocketChannel) key.channel();SocketChannelHandler.readFromChannel(socketChannel, writeQueue);}
}
SocketChannelHandler.java
package com.example.server;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;public class SocketChannelHandler {private static final String DIRECTORY = "data/";private static final int BUFFER_SIZE = 2048;private static final Map<SocketChannel, ByteBuffer> bufferMap = new ConcurrentHashMap<>();private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");public static void addBuffer(SocketChannel socketChannel) {bufferMap.put(socketChannel, ByteBuffer.allocateDirect(BUFFER_SIZE));}public static void readFromChannel(SocketChannel socketChannel, BlockingQueue<String> writeQueue) throws IOException {ByteBuffer buffer = bufferMap.get(socketChannel);buffer.clear();int bytesRead;try {bytesRead = socketChannel.read(buffer);} catch (IOException e) {System.err.println("Error reading from socket: " + e.getMessage());socketChannel.close();bufferMap.remove(socketChannel);return;}if (bytesRead == -1) { // 讀取到-1表示客戶端已關閉連接,移除緩沖區socketChannel.close();bufferMap.remove(socketChannel);} else if (bytesRead > 0) {buffer.flip();byte[] data = new byte[buffer.remaining()];buffer.get(data);String message = new String(data);String[] dataParts = message.split(" : ", 2);if (dataParts.length == 2) {String deviceId = dataParts[0].trim();String deviceData = dataParts[1].trim();String currentTime = LocalDateTime.now().format(dateTimeFormatter);String dataToWrite = DIRECTORY + deviceId + ".txt : " + currentTime + " : " + deviceData;writeQueue.add(dataToWrite);WebSocketServer.sendMessage(deviceId + " : " + currentTime + " : " + deviceData);}}}
}
FileWriterTask.java
package com.example.server;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;public class FileWriterTask implements Runnable {private static final int BATCH_SIZE = 10;/*** BlockingQueue是JUC包中的一個接口,提供了線程安全的隊列操作* 支持阻塞的put和take操作,當隊列滿時put會阻塞,直到隊列有空位;當隊列空時take會阻塞,直到隊列有元素* 其主要實現包括:ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue*/private final BlockingQueue<String> writeQueue;public FileWriterTask(BlockingQueue<String> writeQueue) {this.writeQueue = writeQueue;}@Overridepublic void run() {while (true) {try {List<String> dataList = new ArrayList<>();// 讀取BATCH_SIZE條數據,或等待超時后退出循環while (dataList.size() < BATCH_SIZE) {String data = writeQueue.poll(100, TimeUnit.MILLISECONDS);if (data != null) {dataList.add(data);} else {break;}}// 如果讀取到數據,則將其寫入文件if (!dataList.isEmpty()) {for (String data : dataList) {String[] dataParts = data.split(" : ");if (dataParts.length == 3) {String fileName = dataParts[0].trim();try (FileChannel fileChannel = FileChannel.open(Paths.get(fileName), StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.APPEND)) {ByteBuffer buffer = ByteBuffer.wrap((data + System.lineSeparator()).getBytes());fileChannel.write(buffer);}}}}} catch (IOException | InterruptedException e) {e.printStackTrace();}}}
}
Client
MultiThreadedSocketClient.java
package com.example.client;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class MultiThreadedSocketClient {public static void main(String[] args) {String hostname = "localhost";int port = 8081;int numberOfDevices = 1000;ExecutorService executor = Executors.newFixedThreadPool(numberOfDevices);for (int i = 1; i <= numberOfDevices; i++) {String deviceId = "Device" + i;executor.submit(new DeviceClient(hostname, port, deviceId));}executor.shutdown();}
}
DeviceClient.java
package com.example.client;import java.io.IOException;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Random;
import java.util.concurrent.TimeUnit;class DeviceClient implements Runnable {private String hostname;private int port;private String deviceId;private Random random = new Random();private static final int MAX_RETRIES = 15;private static final int RETRY_DELAY_MS = 1000;public DeviceClient(String hostname, int port, String deviceId) {this.hostname = hostname;this.port = port;this.deviceId = deviceId;}@Overridepublic void run() {int attempt = 0;boolean connected = false;while (attempt < MAX_RETRIES && !connected) {try {Thread.sleep(random.nextInt(15000));} catch (InterruptedException e) {throw new RuntimeException(e);}try (Socket socket = new Socket(hostname, port);PrintWriter out = new PrintWriter(socket.getOutputStream(), true)) {connected = true;while (true) {try {String data = deviceId + " : " + random.nextInt(50000);out.println(data);TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {Thread.currentThread().interrupt();break;}}} catch (UnknownHostException e) {System.err.println("Unknown host: " + hostname);break;} catch (IOException e) {attempt++;int randomDelay = random.nextInt(10000);System.err.println(deviceId + "\tAttempt " + attempt + " - Connection refused. Retrying in " + (RETRY_DELAY_MS + randomDelay) + "ms...");try {Thread.sleep(RETRY_DELAY_MS + randomDelay);} catch (InterruptedException ie) {Thread.currentThread().interrupt();break;}}}if (!connected) {System.err.println("Failed to connect after " + MAX_RETRIES + " attempts.");}}
}