java socket發送定長報文_一個基于TCP協議的Socket通信實例

原標題:一個基于TCP協議的Socket通信實例

1. 前言

一般接口對接多以http/https或webservice的方式,socket方式的對接比較少并且會有一些難度。正好前段時間完成了一個socket的接口的對接需求,現將實現的思路做一個整理。

2. 需求概述

2.1 需要提供一個socket服務端,實時接收三方傳遞過來的數據

2.2 實時報文規范說明

2.2.1 通訊及接口格式說明

通訊方式:

通訊采用 TCP 協議, SOCKET 同步短連接方式。

報文結構:

報文為不定長報文,以定長報文頭+不定長報文體的方式

報文基本結構如下圖所示:

報文長度

報文體

6位交易報文長度+交易報文。其中 6 位交易報文長度以 ASCII 碼字符串方式表示(6 個字節),右對齊,左補 0,不包括自身的長度,表示的是報文體的長度。如“000036fbced3fe-7025-4b5c-9cef-2421cd981f39”, 000036 為長度,“fbced3fe-7025-4b5c-9cef-2421cd981f39”為報文內容。

報文結構符合 XML 標準的報文格式,報文以無 BOM 格式的 GBK 編碼。報文根節點為 Transaction節點。除非報文里有特殊說明,報文定義的字段都是 Transaction 節點的子節點。報文格式參考下節示例。

2.2.2 報文示例

請求:

000410<?xml version="1.0" encoding="GBK"?>29greerg+4741414141test02018-06-1516:15:00

響應:

000683<?xml version="1.0" encoding="GBK"?>1OK0c2c002f-ccc6-4c7b-86e1-c7871b1c98b31Message enqueued for sendingSMS-AFFS-000000100+47419155906y06b02hdo001

3 代碼實現

3.1 BIO 阻塞模式

簡單的描述一下BIO的服務端通信模型:采用BIO通信模型的服務端,通常由一個獨立的Acceptor線程負責監聽客戶端的連接,它接收到客戶端連接請求之后為每個客戶端分配一個線程進行業務邏輯處理,通過輸出流返回應答給客戶端,線程銷毀。即典型的請求應答模型。

傳統BIO通信模型圖(此圖來源于網絡)

該模型最大的問題就是缺乏彈性伸縮能力,當客戶端并發訪問量增加后,服務端的線程個數和客戶端并發訪問數呈1:1的正比關系, Java中的線程也是比較寶貴的系統資源,線程數量快速膨脹后,系統的性能將急劇下降,隨著訪問量的繼續增大,系統最終崩潰。

但是這種模式在一些特定的應用場景下效果是最好的,比如只有少量的TCP連接通信,且雙方都非常快速的傳輸數據,此時這種模式的性能最好,實現比較簡單。

實現代碼如下:

3.1.1 服務端同步阻塞模式的:

import java.io.*;

import java.net.*;

import java.nio.charset.Charset;

import java.text.NumberFormat;

import javax.annotation.PostConstruct;

public class TCPBlockServer {

// 服務IP

private final String SERVER_IP = "127.0.0.1";

// 服務端口

private final int SERVER_PORT = 8888;

private final int BACKLOG = 150;

private final String CHARSET_NAME = "GBK";

@PostConstruct

public void start() throws Exception {

System.out.println("server Socket 啟動 。。。。。。。");

// 這里使用了Java的自動關閉的語法

try (ServerSocket serverSocket = new ServerSocket(SERVER_PORT, BACKLOG, InetAddress.getByName(SERVER_IP))) {

while (true) {

Socket socket = serverSocket.accept() ;

new Thread(()->handler(socket)).start();

}

}

}

private void handler(Socket socket2) {

String msg = null;

try (Socket socket = socket2 ; InputStream input = socket.getInputStream(); OutputStream out = socket.getOutputStream()) {

msg = receiveMsg(input, socket);

System.out.println("msg:" + msg);

doBusinessLogic(msg,out);

} catch (Exception e) {

e.printStackTrace();

}

}

// 處理業務邏輯

private void doBusinessLogic(String msg,OutputStream out) throws Exception {

// todo Business Logic

msg = formatMsg(msg);

out.write(msg.getBytes(CHARSET_NAME));

out.flush();

}

private String formatMsg(String msg) {

byte[] bodyBytes = msg.getBytes(Charset.forName(CHARSET_NAME));

int bodyLength = bodyBytes.length;

NumberFormat numberFormat = NumberFormat.getNumberInstance();

numberFormat.setMinimumIntegerDigits(6);

numberFormat.setGroupingUsed(false);

return numberFormat.format(bodyLength) + msg;

}

private String receiveMsg(InputStream input, Socket socket) throws Exception {

byte[] lengthBytes = new byte[6];

int count = input.read(lengthBytes);

int length = Integer.valueOf(new String(lengthBytes));

byte[] buffer = new byte[length + 2];

int readBytes = 0;

while (readBytes < length) {

count = input.read(buffer, readBytes, length - readBytes);

if (count == -1) {

break;

}

readBytes += count;

}

return new String(buffer, Charset.forName("GBK"));

}

public static void main(String[] args) throws Exception {

TCPBlockServer server = new TCPBlockServer();

server.start();

}

}

3.1.2 服務端偽異步I/O模型:

上面實現方面存在的一些不足之處:

1:服務器創建和銷毀工作線程的開銷很大。如果服務器需要和許多客戶通信,并且與每個客戶的通信時間都很短,那么有可能服務器為客戶創建新線程的開銷比實際與客戶通信的開銷還大。

2:除了創建和銷毀線程的開銷之外,活動的線程也消耗系統資源。并且每個線程本身也會占用一定的內存(每個線程大約需要1MB內存),如果同時有大量客戶連接到服務器,就必須創建大量的工作線程,他們會消耗大量內存,可能會導致系統內存不足,應用產生OOM的錯誤。

3:如果線程數目固定,并且每個線程都有很長的生命周期,那么線程切換也是相對固定的。不同的操作系統有不同的切換周期,一般在20毫秒左右。這里所說的線程切換是指Java虛擬機,以及底層操作系統的調度下,線程之間轉讓CPU的使用權。如果頻繁創建和銷毀線程,那么將導致頻繁的切換線程,因為一個線程被銷毀后,必然要把CPU轉移給另外一個已經就緒的線程,是該線程獲得運行機會。這種情況下,線程間的切換不再遵循系統的固定切換周期,切換線程的開銷甚至比創建及銷毀的開銷還大。

為了改進客戶端訪問就會創建線程的場景,改為由一個線程池去管理固定數量的線程來執行客戶所需業務邏輯。實現線程池線程和客戶端 N(N>= 1): M的關系。如下圖所示:

相關實現代碼如下,根據實際場景需要設置線程池中合適的線程數量:

import java.io.*;

import java.net.*;

import java.nio.charset.Charset;

import java.text.NumberFormat;

import java.util.concurrent.*;

import javax.annotation.PostConstruct;

public class TCPBlockThreadPoolServer {

// 服務IP

private final String SERVER_IP = "127.0.0.1";

// 服務端口

private final int SERVER_PORT = 8888;

private final int BACKLOG = 150;

private final int THREADS = 150 ;

private final String CHARSET_NAME = "GBK";

private ExecutorService executorService ;

@PostConstruct

public void start() throws Exception {

System.out.println("server Socket 啟動 。。。。。。。");

executorService = Executors.newFixedThreadPool(THREADS) ;

// 這里使用了Java的自動關閉的語法

try (ServerSocket serverSocket = new ServerSocket(SERVER_PORT, BACKLOG, InetAddress.getByName(SERVER_IP))) {

while (true) {

Socket socket = serverSocket.accept() ;

executorService.execute(()->handler(socket));

}

}

}

private void handler(Socket socket2) {

String msg = null;

try (Socket socket = socket2 ; InputStream input = socket.getInputStream(); OutputStream out = socket.getOutputStream()) {

msg = receiveMsg(input, socket);

System.out.println("msg:" + msg);

doBusinessLogic(msg,out);

} catch (Exception e) {

e.printStackTrace();

}

}

// 處理業務邏輯

private void doBusinessLogic(String msg,OutputStream out) throws Exception {

// todo Business Logic

msg = formatMsg(msg);

out.write(msg.getBytes(CHARSET_NAME));

out.flush();

}

private String formatMsg(String msg) {

byte[] bodyBytes = msg.getBytes(Charset.forName(CHARSET_NAME));

int bodyLength = bodyBytes.length;

NumberFormat numberFormat = NumberFormat.getNumberInstance();

numberFormat.setMinimumIntegerDigits(6);

numberFormat.setGroupingUsed(false);

return numberFormat.format(bodyLength) + msg;

}

private String receiveMsg(InputStream input, Socket socket) throws Exception {

byte[] lengthBytes = new byte[6];

int count = input.read(lengthBytes);

int length = Integer.valueOf(new String(lengthBytes));

byte[] buffer = new byte[length + 2];

int readBytes = 0;

while (readBytes < length) {

count = input.read(buffer, readBytes, length - readBytes);

if (count == -1) {

break;

}

readBytes += count;

}

return new String(buffer, Charset.forName("GBK"));

}

public static void main(String[] args) throws Exception {

TCPBlockServer server = new TCPBlockServer();

server.start();

}

}

3.1.3 客戶端

簡單的客戶端實現如下:

import java.io.*;

import java.net.Socket;

import java.nio.charset.Charset;

import org.apache.commons.lang3.StringUtils;

public class Client {

public String sendAndRecv(String content, String charsetName,String ip,int port) throws Exception {

try(Socket socket = new Socket(ip,port)){

socket.setTcpNoDelay(true);

socket.setKeepAlive(true);

socket.setSoTimeout(60000);

try(OutputStream output = socket.getOutputStream();InputStream input = socket.getInputStream()){

output.write(content.getBytes(charsetName));

output.flush();

BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(input, Charset.forName("GBK")));

StringBuffer buffer = new StringBuffer();

String message = null ;

while((message = bufferedReader.readLine()) != null){

buffer.append(message);

}

return StringUtils.substring(buffer.toString(), 6);

}

}

}

}

3.2 NIO 模式

相對于BIO(阻塞通信)模型來說,NIO模型非常復雜,以至于花費很大的精力去學習也不太容易能夠精通,難以編寫出一個沒有缺陷,高效且適應各種意外情況的穩定的NIO通信模塊。之所以有這樣的問題,是因為NIO編程不是單純的一個技術點,而是涵蓋了一系列的相關技術、專業知識、編程經驗和編程技巧的復雜工程,所以精通這些技術相當有難度。

和BIO相比NIO有如下幾個新的概念:

1. 通道(Channel)

Channel對應BIO中Stream的模型,到任何目的地(或來自任何地方)的所有數據都必須通過一個Channel對象。但是Channel和Stream不同的地方在于,Channel是雙向的而Stream是單向的(分為InputStream和OutputStream),所以Channel可以用于讀/寫,或同時用于讀寫。

2. 緩沖區(Buffer)

雖然Channel用于讀寫數據,但是我們不能直接操作Channel進行讀寫,必須通過緩沖區來完成(Buffer)。NIO設計了一個全新的數據結構Buffer,具體的緩存區有這些:ByteBuffe、CharBuffer、 ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer等。

Buffer中有3個重要的參數:位置(position)、容量(capactiy)和上限(limit)

參數

寫模式

讀模式

位置(position)

當前緩沖區的位置,將從position的下一個位置寫數據

當前緩存區讀取的位置,將從此位置后讀取數據。

容量(capacity)

緩存區總容量的上限

緩存區總容量的上限

上限(limit)

緩存區實際上限,它總是小于等于容量。通常情況下和容量相等

代表可讀取的總容量,和上次寫入的容量相等。

3. 選擇器(Selector)

Selector 可以同時檢測多個Channel的事件以實現異步I/O,我們可以將感興趣的事件注冊到Selector上面,當事件發生時可以通過Selector獲取事件發生的Channel,并進行相關的事件處理操作。一個Selector可以同時輪詢多個Channel。

3.2.1 服務端

import java.io.IOException;

import java.net.*;

import java.nio.ByteBuffer;

import java.nio.channels.*;

import java.nio.charset.Charset;

import java.text.NumberFormat;

import java.util.Iterator;

import javax.annotation.PostConstruct;

import lombok.extern.slf4j.Slf4j;

@Slf4j

public class TCPNioServer {

// 服務IP

private final String SERVER_IP = "127.0.0.1";

// 服務端口

private final int SERVER_PORT = 8888;

private final int BACKLOG = 150;

private final String CHARSET_NAME = "GBK";

private Selector selector;

public TCPNioServer() throws Exception {

ServerSocketChannel serverChannel = ServerSocketChannel.open();

// 設置通道為非阻塞

serverChannel.configureBlocking(false);

// 將該通道所對應的serverSocket綁定到指定的ip和port端口

InetAddress inetAddress = InetAddress.getByName(SERVER_IP);

serverChannel.socket().bind(new InetSocketAddress(inetAddress, SERVER_PORT), BACKLOG);

// 獲得一個通道管理器(選擇器)

selector = Selector.open();

/*

* 將通道管理器和該通道綁定,并為該通道注冊selectionKey.OP_ACCEPT事件

* 注冊該事件后,當事件到達的時候,selector.select()會返回, 如果事件沒有到達selector.select()會一直阻塞

*/

serverChannel.register(selector, SelectionKey.OP_ACCEPT);

}

/**

* 采用輪詢的方式監聽selector上是否有需要處理的事件,如果有,進行處理

*/

@PostConstruct

public void start() throws Exception {

log.info("==start server ip {} , port {}. ==", SERVER_IP, SERVER_PORT);

while (true) {

selector.select();//此方法會阻塞,直到至少有一個以注冊的事件被觸發

//獲取發生事件的SelectionKey集合

Iterator iterator = this.selector.selectedKeys().iterator();

while (iterator.hasNext()) {

try {

SelectionKey selectedKey = iterator.next();

if (selectedKey.isValid()) { // 如果key的狀態是有效的

if (selectedKey.isAcceptable()) { //如key是阻塞狀態,調用accept()方法

accept(selectedKey);

}

if (selectedKey.isReadable()) { //如key是可讀狀態,調用handle()方法

handle(selectedKey);

}

}

} catch (Exception e) {

iterator.remove();

} finally {

iterator.remove();//從集合中移除,避免重復處理

}

}

}

}

private void accept(SelectionKey key) throws IOException {

// 1 獲取服務器通道

ServerSocketChannel server = (ServerSocketChannel) key.channel();

// 2 執行阻塞方法

SocketChannel chennel = server.accept();

// 3 設置阻塞模式為非阻塞

chennel.configureBlocking(false);

// 4 注冊到多路復用選擇器上,并設置讀取標識

chennel.register(selector, SelectionKey.OP_READ);

}

private void handle(SelectionKey key) throws Exception {

// 獲取之前注冊的SocketChannel通道

try (SocketChannel channel = (SocketChannel) key.channel()) {

int length = getMsgLength(key, channel);

String msg = recvMsg(key, channel, length);

System.out.println("Server:" + msg);

doBusinessLogic(msg, channel);

}

}

private byte[] read(SelectionKey key, SocketChannel channel,int capacity) throws Exception {

ByteBuffer buffer = ByteBuffer.allocate(capacity);

channel.read(buffer);

// 將channel中的數據放入buffer中

int count = channel.read(buffer);

if (count == -1) { // == -1表示通道中沒有數據

key.channel().close();

key.cancel();

return null;

}

// 讀取到了數據,將buffer的position復位到0

buffer.flip();

byte[] bytes = new byte[buffer.remaining()];

// 將buffer中的數據寫入byte[]中

buffer.get(bytes);

return bytes ;

}

private int getMsgLength(SelectionKey key, SocketChannel channel) throws Exception {

byte[] bytes = this.read(key, channel, 6) ;

String length = new String(bytes, CHARSET_NAME);

return new Integer(length);

}

private String recvMsg(SelectionKey key, SocketChannel channel,int msgLength) throws Exception{

byte[] bytes = this.read(key, channel, msgLength) ;

return new String(bytes, CHARSET_NAME);

}

// 處理業務邏輯

private void doBusinessLogic(String msg, SocketChannel channel) throws Exception {

// todo Business Logic

msg = formatMsg(msg);

ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes(CHARSET_NAME));

channel.write(outBuffer);

}

private String formatMsg(String msg) {

byte[] bodyBytes = msg.getBytes(Charset.forName(CHARSET_NAME));

int bodyLength = bodyBytes.length;

NumberFormat numberFormat = NumberFormat.getNumberInstance();

numberFormat.setMinimumIntegerDigits(6);

numberFormat.setGroupingUsed(false);

return numberFormat.format(bodyLength) + msg;

}

public static void main(String[] args) throws Exception {

TCPNioServer server = new TCPNioServer();

server.start();

}

}

3.3 AIO模式

與NIO不同,當進行讀寫操作時,只須直接調用API的read或write方法即可。這兩種方法均為異步的,對于讀操作而言,當有流可讀取時,操作系統會將可讀的流傳入read方法的緩沖區,并通知應用程序;對于寫操作而言,當操作系統將write方法傳遞的流寫入完畢時,操作系統主動通知應用程序。 即可以理解為,read/write方法都是異步的,完成后會主動調用回調函數。 在JDK1.7中,這部分內容被稱作NIO2,主要在java.nio.channels包下增加了下面四個異步通道:

AsynchronousSocketChannel

對應BIO中的ServerSocket和NIO中的ServerSocketChannel,用于server端網絡程序

AsynchronousServerSocketChannel

對應BIO中的Socket和NIO中的SocketChannel,用于client端網絡應用

AsynchronousFileChannel

AsynchronousDatagramChannel

異步channel API提供了兩種方式監控/控制異步操作(connect,accept, read,write等)。

第一種方式是返回java.util.concurrent.Future對象, 檢查Future的狀態可以得到操作是完成還是失敗,還是進行中, future.get會阻塞當前進程。

第二種方式為操作提供一個回調參數java.nio.channels.CompletionHandler,這個回調類包含completed,failed兩個方法。channel的每個I/O操作都為這兩種方式提供了相應的方法, 你可以根據自己的需要選擇合適的方式編程。

下面的例子中在accept和read方法中使用了回調CompletionHandler的方式,而發送數據(write)使用了future的方式,當然write也可以采用回調CompletionHandler的方式。因為CompletionHandler是完全異步的,所以需要在mian方法中使用一個 while循環確保程序不退出,或者也可以在start方法的最后使用channelGroup.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);

3.3.1 服務端

import java.io.*;

import java.net.*;

import java.nio.ByteBuffer;

import java.nio.channels.*;

import java.nio.charset.Charset;

import java.text.NumberFormat;

import java.util.concurrent.*;

import lombok.extern.slf4j.Slf4j;

@Slf4j

public class TCPAioServer {

// 服務IP

private final String SERVER_IP = "127.0.0.1";

// 服務端口

private final int SERVER_PORT = 8888;

private final int BACKLOG = 150;

private final String CHARSET_NAME = "GBK";

private ExecutorService executorService;

private AsynchronousChannelGroup channelGroup;

private AsynchronousServerSocketChannel serverSocketChannel;

public void start() throws IOException, Exception {

// 創建線程池

executorService = Executors.newCachedThreadPool();

// 創建線程組

channelGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1);

// 創建服務器通道

serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup);

// 綁定地址

InetAddress inetAddress = InetAddress.getByName(SERVER_IP);

serverSocketChannel.bind(new InetSocketAddress(inetAddress, SERVER_PORT), BACKLOG);

log.info("server start, ip: {} , port:{}", SERVER_IP, SERVER_PORT);

serverSocketChannel.accept(this, new ServerCompletionHandler());

//channelGroup.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);

}

class ServerCompletionHandler implements CompletionHandler {

@Override

public void completed(AsynchronousSocketChannel channel, TCPAioServer attachment) {

try {

handle(channel);

} finally {

// 當有下一個客戶端接入的時候,直接調用Server的accept方法,這樣反復執行下去,保證多個客戶端都可以阻塞

serverSocketChannel.accept(attachment, this);

}

}

private void handle(AsynchronousSocketChannel channel) {

ByteBuffer buffer = allocateByteBuffer(channel);

channel.read(buffer, buffer, new CompletionHandler() {

@Override

public void completed(Integer result, ByteBuffer attachment) {

attachment.flip();

String msg = null;

try {

msg = new String(attachment.array(), CHARSET_NAME);

} catch (UnsupportedEncodingException e) {

e.printStackTrace();

}

log.info("Server 收到客戶端發送的數據為:{}", msg);

doBusinessLogic(msg, channel);

}

@Override

public void failed(Throwable exc, ByteBuffer attachment) {

exc.printStackTrace();

}

});

}

private ByteBuffer allocateByteBuffer(AsynchronousSocketChannel channel) {

ByteBuffer buffer = ByteBuffer.allocate(6);

try {

channel.read(buffer).get(1000, TimeUnit.SECONDS);

// 讀取到了數據,將buffer的position復位到0

buffer.flip();

byte[] bytes = new byte[buffer.remaining()];

// 將buffer中的數據寫入byte[]中

buffer.get(bytes);

String length = new String(bytes, CHARSET_NAME);

buffer = ByteBuffer.allocate(new Integer(length));

} catch (InterruptedException | ExecutionException | TimeoutException | UnsupportedEncodingException e1) {

e1.printStackTrace();

}

return buffer;

}

// 處理業務邏輯

private void doBusinessLogic(String msg, AsynchronousSocketChannel result) {

try (AsynchronousSocketChannel channel = result) {

msg = formatMsg(msg);

byte[] bodyBytes = msg.getBytes(Charset.forName(CHARSET_NAME));

ByteBuffer buffer = ByteBuffer.allocate(bodyBytes.length);

buffer.put(bodyBytes);

buffer.flip();

channel.write(buffer).get();

} catch (Exception e) {

e.printStackTrace();

}

}

private String formatMsg(String msg) {

byte[] bodyBytes = msg.getBytes(Charset.forName(CHARSET_NAME));

int bodyLength = bodyBytes.length;

NumberFormat numberFormat = NumberFormat.getNumberInstance();

numberFormat.setMinimumIntegerDigits(6);

numberFormat.setGroupingUsed(false);

return numberFormat.format(bodyLength) + msg;

}

@Override

public void failed(Throwable exc, TCPAioServer attachment) {

exc.printStackTrace();

}

}

public static void main(String[] args) throws Exception {

TCPAioServer server = new TCPAioServer();

server.start();

while (true) {

Thread.sleep(1000);

}

}

}

目前Linux上的AIO實現主要有兩種:Posix AIO 與Kernel Native AIO,前者是用戶態實現的,而后者是內核態實現的。所以Kernel Native AIO的性能和前景要好于他的前輩Posix AIO,比較有名的的軟件如Nginx,MySQL等在高版本中都有支持Kernel Native AIO,但是只應用在少部分功能中。因為當下Linux的AIO實現還不是很完美,充斥著各種Bug,并且AIO Socket 還并非真正的異步I/O機制,使用AIO所帶來的性能提升也不太明顯,穩定性并非十分可靠,如是Kernel Native AIO引起的問題,解決的難度會非常大。但是AIO是未來的發展方向,需要我們持續的關注。

3.4 開源框架Netty實現的Socket服務

Netty是一個高性能、異步事件驅動的NIO框架,它提供了對TCP、UDP和文件傳輸的支持,作為一個異步NIO框架,Netty的所有IO操作都是異步非阻塞的,通過Future-Listener機制,用戶可以方便的主動獲取或者通過通知機制獲得IO操作結果。作為當前最流行的NIO框架,Netty在互聯網領域、大數據分布式計算領域、游戲行業、通信行業等獲得了廣泛的應用,一些業界著名的開源軟件也基于Netty的NIO框架構建,如Spark、RocketMQ、Dubbo、Elasticsearch等等。

Netty的優點

1、API使用簡單,有豐富的例子,開發門檻低。

2、功能強大,預置了多種編解碼功能,支持多種主流協議。

3、定制功能強,可以通過ChannelHandler對通信框架進行靈活的擴展。

4、性能高,通過與其他業界主流的NIO框架對比,Netty綜合性能最優。

5、成熟、穩定,Netty修復了已經發現的NIO所有BUG。

6、社區活躍。

7、經歷了很多商用項目的考驗。

3.4.1 服務端(Netty4.X)

import java.nio.ByteOrder;

import java.nio.charset.Charset;

import java.text.NumberFormat;

import java.util.concurrent.ThreadFactory;

import java.util.concurrent.atomic.AtomicInteger;

import javax.annotation.*;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.buffer.ByteBuf;

import io.netty.channel.*;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

import io.netty.handler.codec.string.*;

import io.netty.handler.logging.*;

import io.netty.util.ReferenceCountUtil;

import lombok.extern.slf4j.Slf4j;

@Slf4j

public class NettySocketServer {

private final String CHARSET_NAME = "GBK";

private final int bosscount = 2;

private final int workerCount = 8;

private final int tcpPort = 8888;

private final int backlog = 100;

private final int receiveBufferSize = 1048576;

private ServerBootstrap serverBootstrap;

private ChannelFuture serverChannelFuture;

public NamedThreadFactory bossThreadFactory() {

return new NamedThreadFactory("Server-Worker");

}

public NioEventLoopGroup bossGroup() {

return new NioEventLoopGroup(bosscount, bossThreadFactory());

}

public NamedThreadFactory workerThreadFactory() {

return new NamedThreadFactory("Server-Worker");

}

public NioEventLoopGroup workerGroup() {

return new NioEventLoopGroup(workerCount, workerThreadFactory());

}

public ServerBootstrap bootstrap() {

ServerBootstrap bootstrap = new ServerBootstrap();

bootstrap.group(bossGroup(), workerGroup()).channel(NioServerSocketChannel.class)

.option(ChannelOption.SO_BACKLOG, backlog)

.childHandler(new ChannelInitializer() {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast("logging", new LoggingHandler(LogLevel.ERROR))

.addLast("stringEncoder", new StringEncoder(Charset.forName("GBK")))

.addLast("frameDecoder", new MsgLengthFieldBasedFrameDecoder(receiveBufferSize, 0, 6, 0, 6))

.addLast("stringDecoder", new StringDecoder(Charset.forName("GBK")))

.addLast("messageHandler", new ServerMessageHandler());

}

});

return bootstrap;

}

@PostConstruct

public void start() throws Exception {

serverBootstrap = bootstrap();

serverChannelFuture = serverBootstrap.bind(tcpPort).sync();

log.info("Starting server at tcpPort {}" , tcpPort);

}

@PreDestroy

public void stop() throws Exception {

serverChannelFuture.channel().closeFuture().sync();

}

static class NamedThreadFactory implements ThreadFactory {

public static AtomicInteger counter = new AtomicInteger(1);

private String name = this.getClass().getName();

private boolean deamon ;//守護線程

private int priority ; //線程優先級

public NamedThreadFactory(String name){

this(name, false);

}

public NamedThreadFactory(String name,boolean deamon){

this(name, deamon, -1);

}

public NamedThreadFactory(String name,boolean deamon,int priority){

this.name = name ;

this.deamon = deamon ;

this.priority = priority ;

}

@Override

public Thread newThread(Runnable r) {

Thread thread = new Thread(r,name+"["+counter.getAndIncrement()+"]");

thread.setDaemon(deamon);

if(priority != -1){

thread.setPriority(priority);

}

return thread;

}

}

//拆包

class MsgLengthFieldBasedFrameDecoder extends LengthFieldBasedFrameDecoder {

/**

* @param maxFrameLength 解碼時,處理每個幀數據的最大長度

* @param lengthFieldOffset 該幀數據中,存放該幀數據的長度的數據的起始位置

* @param lengthFieldLength 記錄該幀數據長度的字段本身的長度

* @param lengthAdjustment 修改幀數據長度字段中定義的值,可以為負數

* @param initialBytesToStrip解析的時候需要跳過的字節數

*/

public MsgLengthFieldBasedFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,

int lengthAdjustment, int initialBytesToStrip) {

super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);

}

@Override

protected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) {

if(length == 6){

buf = buf.order(order);

byte[] lengthBytes = new byte[6];

buf.readBytes(lengthBytes);

buf.resetReaderIndex();

return Integer.valueOf(new String(lengthBytes));

} else {

return super.getUnadjustedFrameLength(buf, offset, length, order);

}

}

}

class ServerMessageHandler extends ChannelInboundHandlerAdapter {

/**

* 功能:讀取服務器發送過來的信息

*/

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

if (msg instanceof String) {

try {

doBusinessLogic(ctx,(String)msg);

} finally {

ReferenceCountUtil.release(msg);

}

}

}

// 處理業務邏輯

private void doBusinessLogic(ChannelHandlerContext ctx,String msg) throws Exception {

// todo Business Logic

msg = formatMsg(msg);

ctx.channel().writeAndFlush(msg).addListener(ChannelFutureListener.CLOSE);

}

private String formatMsg(String msg) {

byte[] bodyBytes = msg.getBytes(Charset.forName(CHARSET_NAME));

int bodyLength = bodyBytes.length;

NumberFormat numberFormat = NumberFormat.getNumberInstance();

numberFormat.setMinimumIntegerDigits(6);

numberFormat.setGroupingUsed(false);

return numberFormat.format(bodyLength) + msg;

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

ctx.close();

}

}

public static void main(String[] args) throws Exception{

NettySocketServer server = new NettySocketServer();

server.start();

}

}

總結

同步阻塞IO

偽異步IO

非阻塞IO

異步IO

Netty的非阻塞IO

客戶端:服務端

1:1

N:M(M>=1)

N:M(M>=1,單線程非阻塞,多線程非阻塞)

N:0(不需要啟動額外的IO線程,被動回調)

N:M(M>=1)

IO類型

BIO

BIO

NIO

AIO

NIO

API使用難度

簡單

簡單

非常復雜

復雜

簡單

可靠性

相當差

高+

吞吐量

高+

并發

高+

參考文獻

▲http://www.ibm.com/developerworks/cn/linux/l-async/

▲http://openjdk.java.net/projects/nio/presentations/TS-4222.pdf

▲http://blog.csdn.net/anxpp/article/details/51512200

▲Netty權威指南

▲Asynchronous I/O Tricks and Tips

責任編輯:

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/529424.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/529424.shtml
英文地址,請注明出處:http://en.pswp.cn/news/529424.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

7系統軟raid_使用圖形界面來配置RAID

RAID 配置起來要比 LVM 方便&#xff0c;因為它不像 LVM 那樣分了物理卷、卷組和邏輯卷三層&#xff0c;而且每層都需要配置。我們在圖形安裝界面中配置 RAID 1和 RAID 5&#xff0c;先來看看 RAID 1 的配置方法。RAID 1 配置配置 RAID 1 時同樣需要啟動圖形安裝程序&#xff0…

python字典的內置函數_python – 用于字典轉換的特殊方法名稱的內置函數

我一直在深入研究Python類中的運算符重載和特殊方法,并且我注意到許多內置函數具有等效的特殊方法名稱&#xff1a;> int(x)調用x .__ int __()> next(x)在Python 2中調用x .__ next __()或x.next()但是,一些函數,即tuple()和dict(),沒有任何等價物.我知道對于這種特殊方…

合并相同數據的行_R語言筆記(六):數據框重塑(reshape2)

數據處理主要內容包括&#xff1a;1. 特殊值處理1.1 缺失值1.2 離群值1.3 日期2. 數據轉換&#xff08;base vs. dplyr&#xff09;2.1 篩選&#xff08;subset vs. filter/select/rename&#xff09;2.2 排序&#xff08;order vs. arrange&#xff09;2.3 轉換&#xff08;tr…

華為交換機s2700怎么重置_華為交換機忘記console的密碼,怎么恢復出廠設置

展開全部1、啟動時&#xff0c;32313133353236313431303231363533e58685e5aeb931333366303064按CtrlB進入BOOTROM目錄2、輸入BOOTROM的密碼盒式交換機的某些款型支持使用快捷鍵“CtrlE”進入BootROM主菜單&#xff0c;請根據設備的界面提示操作。盒式交換機在V100R006C03之前的…

啟動linux_使用 UEFI 雙啟動 Windows 和 Linux | Linux 中國

這是一份在同一臺機器上設置 Linux 和 Windows 雙重啟動的速成解釋&#xff0c;使用統一可擴展固件接口&#xff08;UEFI&#xff09;。來源&#xff1a;https://linux.cn/article-12891-1.html作者&#xff1a;Alan Formy-duval譯者&#xff1a;鄭&#xff08;本文字數&#x…

域控下發腳本_域用戶登陸腳本

如何為一個域用戶設置登陸腳本&#xff1f;- BAT可否作為登陸腳本&#xff1f;- 在域用戶“屬性”中&#xff0c;應如何指定登陸腳本名&#xff1f;"D:\x.bat"還是"\\srv\x.bat"&#xff1f;還是其它&#xff1f;- 腳本應該放在何處&#xff1f;- 還有沒有…

mysql增量腳本_mysql全量和增量備份腳本

全量&#xff1a;[rootmaster leo]# cat DBfullBak.sh#!/bin/bash#use mysqldump to fully backup mysql dataBakDir/root/leo/fullLogFile/root/leo/full/bak.logDatedate %Y%m%dBegindate "%Y年%m月%d日 %H:%M:%S"cd $BakDirDumpFile$Date.sqlGZDumpFile$Date.sql.…

mysql 事務 引擎_mysql引擎和事務

對于應用程序和用戶來說&#xff0c;同樣一張表的數據無論用什么引擎來存儲&#xff0c;看到的數據都是一樣的&#xff0c;只是不同的引擎在功能、占用空間大小、讀取性能等方面可能有所差別。mysql最常用的存儲引擎為Innodb、MyISAM和全文索引5.5.5以前默認存儲引擎為MyISAM&a…

shell mysql eof_shell EOF

1、考慮下面的需求&#xff0c;在主shell執行命令&#xff0c;進入其他的命令&#xff0c;后面的輸入&#xff0c;想作為命令的輸入&#xff0c;而不是主shell的輸入&#xff0c;怎么辦&#xff1f;2、使用<3、這里的EOF只是分界符&#xff0c;使用其他的字符也可以。4、比如…

MySQL查詢實驗報告_實驗報告數據庫的基本查詢'

《實驗報告數據庫的基本查詢》由會員分享&#xff0c;可在線閱讀&#xff0c;更多相關《實驗報告數據庫的基本查詢(5頁珍藏版)》請在人人文庫網上搜索。1、一、實驗目的&#xff1a;通過該實驗掌握應用SQL 查詢數據庫的基本方法&#xff0c;包括單表、多表查詢。二、實驗原理數…

mysql+odbc+ado_MFC ado+mysql+odbc技術分享

第一步&#xff1a;建立數據庫假設有一個sql文件mysql>use dbname; //創建一個數據庫名為dbname的數據庫(空數據庫)mysql>set names utf8; //編碼&#xff0c;mysql>source D:/dbname.sql; //導入一個數據庫源文件創建數據庫內容我做…

mysql 5.7 window x64_window環境配置Mysql 5.7.21 windowx64.zip免安裝版教程詳解

1.從官網下載mysql-5.7.21-windowx64.zip mysql下載頁面2.解壓到合適的位置(E:mysql) 這名字是我改過的3.配置環境變量&#xff0c;將E:mysqlbin 添加到PATH中4.在mysql目錄下(E:mysql) 創建 my.ini文件&#xff0c;內容如下&#xff1a;[mysql]# 設置mysql客戶端默認字符集def…

mysql設置查詢結果最大值_查找MySQL查詢結果字段的最大值

將它連接到僅有最大計數的第二個查詢。每天最內部查詢(對于給定用戶)每天計數的一組行數。從那以后&#xff0c;下一個外部執行從該集合中選擇MAX()來查找并獲得一個代表最高日數的記錄...因為它總是返回一行&#xff0c;并且加入到原始的numRequest表中它將是一個笛卡爾&#…

MySQL建表兩個單引號報錯_極客起源 - geekori.com - 問題詳情 - mysql建表報錯,查手冊看不懂,求解?...

創建帶索引的數據庫表需要為表名和屬性添加反單引號&#xff0c;并且你當前的primary key的位置需要調整一下&#xff1a;create table abc(id int unsigned auto_increment,usename char(20) not null default ,gender char(1) not null default ,weight tinyint unsigned not…

js 用下標獲取map值_javascript怎么獲取map的值?

Map對象保存鍵/值對&#xff0c;是鍵/值對的集合。任何值(對象或者原始值) 都可以作為一個鍵或一個值。Object結構提供了“字符串—值”的對應&#xff0c;Map結構提供了“值—值”的對應。JavaScript獲取map值示例&#xff1a;map對象如下&#xff1a;var mapObject {id1001:…

python attention機制_從零開始學Python自然語言處理(26)—— 強大的Attention機制...

前文傳送門&#xff1a;在上一次面試失利后&#xff0c;我回來仔細研究了一下Attention機制&#xff0c;研究完我不禁感悟&#xff0c;這機制真的厲害啊&#xff01;因為我之前面試被問到的Encoder - Decoder框架中有個瓶頸是編碼的結果以固定長度的中間向量表示&#xff0c;這…

[機器人-2]:開源MIT Min cheetah機械狗設計(二):機械結構設計

目錄 1、四肢朝向的選擇 2、電機布局形式的選擇 3、電機的選型及測試&#xff08;非常重要&#xff09; 4、結構優化 5、尺寸效應 6、其他 1、四肢朝向的選擇 機械狗的結構設計&#xff0c;第一個擺在我們面前的就說四肢的朝向問題&#xff0c;如下圖&#xff0c;我們是…

python傳文件給java_用java pyhont通過HTTP協議傳輸文件流

// 代碼網上抄的 忘記鏈接了 抱歉哈packageupload;importjava.io.BufferedReader;importjava.io.DataOutputStream;importjava.io.FileInputStream;importjava.io.IOException;importjava.io.InputStream;importjava.io.InputStreamReader;importjava.net.HttpURLConnection;im…

mysql挪到小數點位置_mysql數據庫遷移到另一個硬盤上

archliun系統mysql數據庫1、對新硬盤分區與格式化1)# fdisk /dev/sdb2) # mkfs.ext4 /dev/sdb12、停止MYSQL服務systemctl stop mysqld3、對數據庫文件拷貝# cp -Rp data /mnt/data/4、刪除原data文件# rm -rf /data5、禁止開機自啟MYSQL服務# systemctl disable mysqld6、對自…

mysql用戶權限表join_MyBatis映射利用mysql left join 解決N+1查詢問題

1.權限是幾乎每個系統都需要的2.一般在用戶請求某個url的時候&#xff0c;都需要驗證用戶是否擁有該url的訪問權限3.最簡單的權限系統需要 用戶表&#xff0c;角色表&#xff0c;用戶角色表&#xff0c;權限表&#xff0c;角色權限表# Host: 127.0.0.1 (Version: 5.6.22)# Date…