原標題:一個基于TCP協議的Socket通信實例
1. 前言
一般接口對接多以http/https或webservice的方式,socket方式的對接比較少并且會有一些難度。正好前段時間完成了一個socket的接口的對接需求,現將實現的思路做一個整理。
2. 需求概述
2.1 需要提供一個socket服務端,實時接收三方傳遞過來的數據
2.2 實時報文規范說明
2.2.1 通訊及接口格式說明
通訊方式:
通訊采用 TCP 協議, SOCKET 同步短連接方式。
報文結構:
報文為不定長報文,以定長報文頭+不定長報文體的方式
報文基本結構如下圖所示:
報文長度
報文體
6位交易報文長度+交易報文。其中 6 位交易報文長度以 ASCII 碼字符串方式表示(6 個字節),右對齊,左補 0,不包括自身的長度,表示的是報文體的長度。如“000036fbced3fe-7025-4b5c-9cef-2421cd981f39”, 000036 為長度,“fbced3fe-7025-4b5c-9cef-2421cd981f39”為報文內容。
報文結構符合 XML 標準的報文格式,報文以無 BOM 格式的 GBK 編碼。報文根節點為 Transaction節點。除非報文里有特殊說明,報文定義的字段都是 Transaction 節點的子節點。報文格式參考下節示例。
2.2.2 報文示例
請求:
000410<?xml version="1.0" encoding="GBK"?>29greerg+4741414141test02018-06-1516:15:00
響應:
000683<?xml version="1.0" encoding="GBK"?>1
OK0c2c002f-ccc6-4c7b-86e1-c7871b1c98b31Message enqueued for sendingSMS-AFFS-000000100+47419155906y06b02hdo001
3 代碼實現
3.1 BIO 阻塞模式
簡單的描述一下BIO的服務端通信模型:采用BIO通信模型的服務端,通常由一個獨立的Acceptor線程負責監聽客戶端的連接,它接收到客戶端連接請求之后為每個客戶端分配一個線程進行業務邏輯處理,通過輸出流返回應答給客戶端,線程銷毀。即典型的請求應答模型。
傳統BIO通信模型圖(此圖來源于網絡)
該模型最大的問題就是缺乏彈性伸縮能力,當客戶端并發訪問量增加后,服務端的線程個數和客戶端并發訪問數呈1:1的正比關系, Java中的線程也是比較寶貴的系統資源,線程數量快速膨脹后,系統的性能將急劇下降,隨著訪問量的繼續增大,系統最終崩潰。
但是這種模式在一些特定的應用場景下效果是最好的,比如只有少量的TCP連接通信,且雙方都非常快速的傳輸數據,此時這種模式的性能最好,實現比較簡單。
實現代碼如下:
3.1.1 服務端同步阻塞模式的:
import java.io.*;
import java.net.*;
import java.nio.charset.Charset;
import java.text.NumberFormat;
import javax.annotation.PostConstruct;
public class TCPBlockServer {
// 服務IP
private final String SERVER_IP = "127.0.0.1";
// 服務端口
private final int SERVER_PORT = 8888;
private final int BACKLOG = 150;
private final String CHARSET_NAME = "GBK";
@PostConstruct
public void start() throws Exception {
System.out.println("server Socket 啟動 。。。。。。。");
// 這里使用了Java的自動關閉的語法
try (ServerSocket serverSocket = new ServerSocket(SERVER_PORT, BACKLOG, InetAddress.getByName(SERVER_IP))) {
while (true) {
Socket socket = serverSocket.accept() ;
new Thread(()->handler(socket)).start();
}
}
}
private void handler(Socket socket2) {
String msg = null;
try (Socket socket = socket2 ; InputStream input = socket.getInputStream(); OutputStream out = socket.getOutputStream()) {
msg = receiveMsg(input, socket);
System.out.println("msg:" + msg);
doBusinessLogic(msg,out);
} catch (Exception e) {
e.printStackTrace();
}
}
// 處理業務邏輯
private void doBusinessLogic(String msg,OutputStream out) throws Exception {
// todo Business Logic
msg = formatMsg(msg);
out.write(msg.getBytes(CHARSET_NAME));
out.flush();
}
private String formatMsg(String msg) {
byte[] bodyBytes = msg.getBytes(Charset.forName(CHARSET_NAME));
int bodyLength = bodyBytes.length;
NumberFormat numberFormat = NumberFormat.getNumberInstance();
numberFormat.setMinimumIntegerDigits(6);
numberFormat.setGroupingUsed(false);
return numberFormat.format(bodyLength) + msg;
}
private String receiveMsg(InputStream input, Socket socket) throws Exception {
byte[] lengthBytes = new byte[6];
int count = input.read(lengthBytes);
int length = Integer.valueOf(new String(lengthBytes));
byte[] buffer = new byte[length + 2];
int readBytes = 0;
while (readBytes < length) {
count = input.read(buffer, readBytes, length - readBytes);
if (count == -1) {
break;
}
readBytes += count;
}
return new String(buffer, Charset.forName("GBK"));
}
public static void main(String[] args) throws Exception {
TCPBlockServer server = new TCPBlockServer();
server.start();
}
}
3.1.2 服務端偽異步I/O模型:
上面實現方面存在的一些不足之處:
1:服務器創建和銷毀工作線程的開銷很大。如果服務器需要和許多客戶通信,并且與每個客戶的通信時間都很短,那么有可能服務器為客戶創建新線程的開銷比實際與客戶通信的開銷還大。
2:除了創建和銷毀線程的開銷之外,活動的線程也消耗系統資源。并且每個線程本身也會占用一定的內存(每個線程大約需要1MB內存),如果同時有大量客戶連接到服務器,就必須創建大量的工作線程,他們會消耗大量內存,可能會導致系統內存不足,應用產生OOM的錯誤。
3:如果線程數目固定,并且每個線程都有很長的生命周期,那么線程切換也是相對固定的。不同的操作系統有不同的切換周期,一般在20毫秒左右。這里所說的線程切換是指Java虛擬機,以及底層操作系統的調度下,線程之間轉讓CPU的使用權。如果頻繁創建和銷毀線程,那么將導致頻繁的切換線程,因為一個線程被銷毀后,必然要把CPU轉移給另外一個已經就緒的線程,是該線程獲得運行機會。這種情況下,線程間的切換不再遵循系統的固定切換周期,切換線程的開銷甚至比創建及銷毀的開銷還大。
為了改進客戶端訪問就會創建線程的場景,改為由一個線程池去管理固定數量的線程來執行客戶所需業務邏輯。實現線程池線程和客戶端 N(N>= 1): M的關系。如下圖所示:
相關實現代碼如下,根據實際場景需要設置線程池中合適的線程數量:
import java.io.*;
import java.net.*;
import java.nio.charset.Charset;
import java.text.NumberFormat;
import java.util.concurrent.*;
import javax.annotation.PostConstruct;
public class TCPBlockThreadPoolServer {
// 服務IP
private final String SERVER_IP = "127.0.0.1";
// 服務端口
private final int SERVER_PORT = 8888;
private final int BACKLOG = 150;
private final int THREADS = 150 ;
private final String CHARSET_NAME = "GBK";
private ExecutorService executorService ;
@PostConstruct
public void start() throws Exception {
System.out.println("server Socket 啟動 。。。。。。。");
executorService = Executors.newFixedThreadPool(THREADS) ;
// 這里使用了Java的自動關閉的語法
try (ServerSocket serverSocket = new ServerSocket(SERVER_PORT, BACKLOG, InetAddress.getByName(SERVER_IP))) {
while (true) {
Socket socket = serverSocket.accept() ;
executorService.execute(()->handler(socket));
}
}
}
private void handler(Socket socket2) {
String msg = null;
try (Socket socket = socket2 ; InputStream input = socket.getInputStream(); OutputStream out = socket.getOutputStream()) {
msg = receiveMsg(input, socket);
System.out.println("msg:" + msg);
doBusinessLogic(msg,out);
} catch (Exception e) {
e.printStackTrace();
}
}
// 處理業務邏輯
private void doBusinessLogic(String msg,OutputStream out) throws Exception {
// todo Business Logic
msg = formatMsg(msg);
out.write(msg.getBytes(CHARSET_NAME));
out.flush();
}
private String formatMsg(String msg) {
byte[] bodyBytes = msg.getBytes(Charset.forName(CHARSET_NAME));
int bodyLength = bodyBytes.length;
NumberFormat numberFormat = NumberFormat.getNumberInstance();
numberFormat.setMinimumIntegerDigits(6);
numberFormat.setGroupingUsed(false);
return numberFormat.format(bodyLength) + msg;
}
private String receiveMsg(InputStream input, Socket socket) throws Exception {
byte[] lengthBytes = new byte[6];
int count = input.read(lengthBytes);
int length = Integer.valueOf(new String(lengthBytes));
byte[] buffer = new byte[length + 2];
int readBytes = 0;
while (readBytes < length) {
count = input.read(buffer, readBytes, length - readBytes);
if (count == -1) {
break;
}
readBytes += count;
}
return new String(buffer, Charset.forName("GBK"));
}
public static void main(String[] args) throws Exception {
TCPBlockServer server = new TCPBlockServer();
server.start();
}
}
3.1.3 客戶端
簡單的客戶端實現如下:
import java.io.*;
import java.net.Socket;
import java.nio.charset.Charset;
import org.apache.commons.lang3.StringUtils;
public class Client {
public String sendAndRecv(String content, String charsetName,String ip,int port) throws Exception {
try(Socket socket = new Socket(ip,port)){
socket.setTcpNoDelay(true);
socket.setKeepAlive(true);
socket.setSoTimeout(60000);
try(OutputStream output = socket.getOutputStream();InputStream input = socket.getInputStream()){
output.write(content.getBytes(charsetName));
output.flush();
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(input, Charset.forName("GBK")));
StringBuffer buffer = new StringBuffer();
String message = null ;
while((message = bufferedReader.readLine()) != null){
buffer.append(message);
}
return StringUtils.substring(buffer.toString(), 6);
}
}
}
}
3.2 NIO 模式
相對于BIO(阻塞通信)模型來說,NIO模型非常復雜,以至于花費很大的精力去學習也不太容易能夠精通,難以編寫出一個沒有缺陷,高效且適應各種意外情況的穩定的NIO通信模塊。之所以有這樣的問題,是因為NIO編程不是單純的一個技術點,而是涵蓋了一系列的相關技術、專業知識、編程經驗和編程技巧的復雜工程,所以精通這些技術相當有難度。
和BIO相比NIO有如下幾個新的概念:
1. 通道(Channel)
Channel對應BIO中Stream的模型,到任何目的地(或來自任何地方)的所有數據都必須通過一個Channel對象。但是Channel和Stream不同的地方在于,Channel是雙向的而Stream是單向的(分為InputStream和OutputStream),所以Channel可以用于讀/寫,或同時用于讀寫。
2. 緩沖區(Buffer)
雖然Channel用于讀寫數據,但是我們不能直接操作Channel進行讀寫,必須通過緩沖區來完成(Buffer)。NIO設計了一個全新的數據結構Buffer,具體的緩存區有這些:ByteBuffe、CharBuffer、 ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer等。
Buffer中有3個重要的參數:位置(position)、容量(capactiy)和上限(limit)
參數
寫模式
讀模式
位置(position)
當前緩沖區的位置,將從position的下一個位置寫數據
當前緩存區讀取的位置,將從此位置后讀取數據。
容量(capacity)
緩存區總容量的上限
緩存區總容量的上限
上限(limit)
緩存區實際上限,它總是小于等于容量。通常情況下和容量相等
代表可讀取的總容量,和上次寫入的容量相等。
3. 選擇器(Selector)
Selector 可以同時檢測多個Channel的事件以實現異步I/O,我們可以將感興趣的事件注冊到Selector上面,當事件發生時可以通過Selector獲取事件發生的Channel,并進行相關的事件處理操作。一個Selector可以同時輪詢多個Channel。
3.2.1 服務端
import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.text.NumberFormat;
import java.util.Iterator;
import javax.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TCPNioServer {
// 服務IP
private final String SERVER_IP = "127.0.0.1";
// 服務端口
private final int SERVER_PORT = 8888;
private final int BACKLOG = 150;
private final String CHARSET_NAME = "GBK";
private Selector selector;
public TCPNioServer() throws Exception {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
// 設置通道為非阻塞
serverChannel.configureBlocking(false);
// 將該通道所對應的serverSocket綁定到指定的ip和port端口
InetAddress inetAddress = InetAddress.getByName(SERVER_IP);
serverChannel.socket().bind(new InetSocketAddress(inetAddress, SERVER_PORT), BACKLOG);
// 獲得一個通道管理器(選擇器)
selector = Selector.open();
/*
* 將通道管理器和該通道綁定,并為該通道注冊selectionKey.OP_ACCEPT事件
* 注冊該事件后,當事件到達的時候,selector.select()會返回, 如果事件沒有到達selector.select()會一直阻塞
*/
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
}
/**
* 采用輪詢的方式監聽selector上是否有需要處理的事件,如果有,進行處理
*/
@PostConstruct
public void start() throws Exception {
log.info("==start server ip {} , port {}. ==", SERVER_IP, SERVER_PORT);
while (true) {
selector.select();//此方法會阻塞,直到至少有一個以注冊的事件被觸發
//獲取發生事件的SelectionKey集合
Iterator iterator = this.selector.selectedKeys().iterator();
while (iterator.hasNext()) {
try {
SelectionKey selectedKey = iterator.next();
if (selectedKey.isValid()) { // 如果key的狀態是有效的
if (selectedKey.isAcceptable()) { //如key是阻塞狀態,調用accept()方法
accept(selectedKey);
}
if (selectedKey.isReadable()) { //如key是可讀狀態,調用handle()方法
handle(selectedKey);
}
}
} catch (Exception e) {
iterator.remove();
} finally {
iterator.remove();//從集合中移除,避免重復處理
}
}
}
}
private void accept(SelectionKey key) throws IOException {
// 1 獲取服務器通道
ServerSocketChannel server = (ServerSocketChannel) key.channel();
// 2 執行阻塞方法
SocketChannel chennel = server.accept();
// 3 設置阻塞模式為非阻塞
chennel.configureBlocking(false);
// 4 注冊到多路復用選擇器上,并設置讀取標識
chennel.register(selector, SelectionKey.OP_READ);
}
private void handle(SelectionKey key) throws Exception {
// 獲取之前注冊的SocketChannel通道
try (SocketChannel channel = (SocketChannel) key.channel()) {
int length = getMsgLength(key, channel);
String msg = recvMsg(key, channel, length);
System.out.println("Server:" + msg);
doBusinessLogic(msg, channel);
}
}
private byte[] read(SelectionKey key, SocketChannel channel,int capacity) throws Exception {
ByteBuffer buffer = ByteBuffer.allocate(capacity);
channel.read(buffer);
// 將channel中的數據放入buffer中
int count = channel.read(buffer);
if (count == -1) { // == -1表示通道中沒有數據
key.channel().close();
key.cancel();
return null;
}
// 讀取到了數據,將buffer的position復位到0
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
// 將buffer中的數據寫入byte[]中
buffer.get(bytes);
return bytes ;
}
private int getMsgLength(SelectionKey key, SocketChannel channel) throws Exception {
byte[] bytes = this.read(key, channel, 6) ;
String length = new String(bytes, CHARSET_NAME);
return new Integer(length);
}
private String recvMsg(SelectionKey key, SocketChannel channel,int msgLength) throws Exception{
byte[] bytes = this.read(key, channel, msgLength) ;
return new String(bytes, CHARSET_NAME);
}
// 處理業務邏輯
private void doBusinessLogic(String msg, SocketChannel channel) throws Exception {
// todo Business Logic
msg = formatMsg(msg);
ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes(CHARSET_NAME));
channel.write(outBuffer);
}
private String formatMsg(String msg) {
byte[] bodyBytes = msg.getBytes(Charset.forName(CHARSET_NAME));
int bodyLength = bodyBytes.length;
NumberFormat numberFormat = NumberFormat.getNumberInstance();
numberFormat.setMinimumIntegerDigits(6);
numberFormat.setGroupingUsed(false);
return numberFormat.format(bodyLength) + msg;
}
public static void main(String[] args) throws Exception {
TCPNioServer server = new TCPNioServer();
server.start();
}
}
3.3 AIO模式
與NIO不同,當進行讀寫操作時,只須直接調用API的read或write方法即可。這兩種方法均為異步的,對于讀操作而言,當有流可讀取時,操作系統會將可讀的流傳入read方法的緩沖區,并通知應用程序;對于寫操作而言,當操作系統將write方法傳遞的流寫入完畢時,操作系統主動通知應用程序。 即可以理解為,read/write方法都是異步的,完成后會主動調用回調函數。 在JDK1.7中,這部分內容被稱作NIO2,主要在java.nio.channels包下增加了下面四個異步通道:
AsynchronousSocketChannel
對應BIO中的ServerSocket和NIO中的ServerSocketChannel,用于server端網絡程序
AsynchronousServerSocketChannel
對應BIO中的Socket和NIO中的SocketChannel,用于client端網絡應用
AsynchronousFileChannel
AsynchronousDatagramChannel
異步channel API提供了兩種方式監控/控制異步操作(connect,accept, read,write等)。
第一種方式是返回java.util.concurrent.Future對象, 檢查Future的狀態可以得到操作是完成還是失敗,還是進行中, future.get會阻塞當前進程。
第二種方式為操作提供一個回調參數java.nio.channels.CompletionHandler,這個回調類包含completed,failed兩個方法。channel的每個I/O操作都為這兩種方式提供了相應的方法, 你可以根據自己的需要選擇合適的方式編程。
下面的例子中在accept和read方法中使用了回調CompletionHandler的方式,而發送數據(write)使用了future的方式,當然write也可以采用回調CompletionHandler的方式。因為CompletionHandler是完全異步的,所以需要在mian方法中使用一個 while循環確保程序不退出,或者也可以在start方法的最后使用channelGroup.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
3.3.1 服務端
import java.io.*;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.text.NumberFormat;
import java.util.concurrent.*;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TCPAioServer {
// 服務IP
private final String SERVER_IP = "127.0.0.1";
// 服務端口
private final int SERVER_PORT = 8888;
private final int BACKLOG = 150;
private final String CHARSET_NAME = "GBK";
private ExecutorService executorService;
private AsynchronousChannelGroup channelGroup;
private AsynchronousServerSocketChannel serverSocketChannel;
public void start() throws IOException, Exception {
// 創建線程池
executorService = Executors.newCachedThreadPool();
// 創建線程組
channelGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1);
// 創建服務器通道
serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup);
// 綁定地址
InetAddress inetAddress = InetAddress.getByName(SERVER_IP);
serverSocketChannel.bind(new InetSocketAddress(inetAddress, SERVER_PORT), BACKLOG);
log.info("server start, ip: {} , port:{}", SERVER_IP, SERVER_PORT);
serverSocketChannel.accept(this, new ServerCompletionHandler());
//channelGroup.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
}
class ServerCompletionHandler implements CompletionHandler {
@Override
public void completed(AsynchronousSocketChannel channel, TCPAioServer attachment) {
try {
handle(channel);
} finally {
// 當有下一個客戶端接入的時候,直接調用Server的accept方法,這樣反復執行下去,保證多個客戶端都可以阻塞
serverSocketChannel.accept(attachment, this);
}
}
private void handle(AsynchronousSocketChannel channel) {
ByteBuffer buffer = allocateByteBuffer(channel);
channel.read(buffer, buffer, new CompletionHandler() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
String msg = null;
try {
msg = new String(attachment.array(), CHARSET_NAME);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
log.info("Server 收到客戶端發送的數據為:{}", msg);
doBusinessLogic(msg, channel);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}
});
}
private ByteBuffer allocateByteBuffer(AsynchronousSocketChannel channel) {
ByteBuffer buffer = ByteBuffer.allocate(6);
try {
channel.read(buffer).get(1000, TimeUnit.SECONDS);
// 讀取到了數據,將buffer的position復位到0
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
// 將buffer中的數據寫入byte[]中
buffer.get(bytes);
String length = new String(bytes, CHARSET_NAME);
buffer = ByteBuffer.allocate(new Integer(length));
} catch (InterruptedException | ExecutionException | TimeoutException | UnsupportedEncodingException e1) {
e1.printStackTrace();
}
return buffer;
}
// 處理業務邏輯
private void doBusinessLogic(String msg, AsynchronousSocketChannel result) {
try (AsynchronousSocketChannel channel = result) {
msg = formatMsg(msg);
byte[] bodyBytes = msg.getBytes(Charset.forName(CHARSET_NAME));
ByteBuffer buffer = ByteBuffer.allocate(bodyBytes.length);
buffer.put(bodyBytes);
buffer.flip();
channel.write(buffer).get();
} catch (Exception e) {
e.printStackTrace();
}
}
private String formatMsg(String msg) {
byte[] bodyBytes = msg.getBytes(Charset.forName(CHARSET_NAME));
int bodyLength = bodyBytes.length;
NumberFormat numberFormat = NumberFormat.getNumberInstance();
numberFormat.setMinimumIntegerDigits(6);
numberFormat.setGroupingUsed(false);
return numberFormat.format(bodyLength) + msg;
}
@Override
public void failed(Throwable exc, TCPAioServer attachment) {
exc.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
TCPAioServer server = new TCPAioServer();
server.start();
while (true) {
Thread.sleep(1000);
}
}
}
目前Linux上的AIO實現主要有兩種:Posix AIO 與Kernel Native AIO,前者是用戶態實現的,而后者是內核態實現的。所以Kernel Native AIO的性能和前景要好于他的前輩Posix AIO,比較有名的的軟件如Nginx,MySQL等在高版本中都有支持Kernel Native AIO,但是只應用在少部分功能中。因為當下Linux的AIO實現還不是很完美,充斥著各種Bug,并且AIO Socket 還并非真正的異步I/O機制,使用AIO所帶來的性能提升也不太明顯,穩定性并非十分可靠,如是Kernel Native AIO引起的問題,解決的難度會非常大。但是AIO是未來的發展方向,需要我們持續的關注。
3.4 開源框架Netty實現的Socket服務
Netty是一個高性能、異步事件驅動的NIO框架,它提供了對TCP、UDP和文件傳輸的支持,作為一個異步NIO框架,Netty的所有IO操作都是異步非阻塞的,通過Future-Listener機制,用戶可以方便的主動獲取或者通過通知機制獲得IO操作結果。作為當前最流行的NIO框架,Netty在互聯網領域、大數據分布式計算領域、游戲行業、通信行業等獲得了廣泛的應用,一些業界著名的開源軟件也基于Netty的NIO框架構建,如Spark、RocketMQ、Dubbo、Elasticsearch等等。
Netty的優點
1、API使用簡單,有豐富的例子,開發門檻低。
2、功能強大,預置了多種編解碼功能,支持多種主流協議。
3、定制功能強,可以通過ChannelHandler對通信框架進行靈活的擴展。
4、性能高,通過與其他業界主流的NIO框架對比,Netty綜合性能最優。
5、成熟、穩定,Netty修復了已經發現的NIO所有BUG。
6、社區活躍。
7、經歷了很多商用項目的考驗。
3.4.1 服務端(Netty4.X)
import java.nio.ByteOrder;
import java.nio.charset.Charset;
import java.text.NumberFormat;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.*;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.string.*;
import io.netty.handler.logging.*;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class NettySocketServer {
private final String CHARSET_NAME = "GBK";
private final int bosscount = 2;
private final int workerCount = 8;
private final int tcpPort = 8888;
private final int backlog = 100;
private final int receiveBufferSize = 1048576;
private ServerBootstrap serverBootstrap;
private ChannelFuture serverChannelFuture;
public NamedThreadFactory bossThreadFactory() {
return new NamedThreadFactory("Server-Worker");
}
public NioEventLoopGroup bossGroup() {
return new NioEventLoopGroup(bosscount, bossThreadFactory());
}
public NamedThreadFactory workerThreadFactory() {
return new NamedThreadFactory("Server-Worker");
}
public NioEventLoopGroup workerGroup() {
return new NioEventLoopGroup(workerCount, workerThreadFactory());
}
public ServerBootstrap bootstrap() {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup(), workerGroup()).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, backlog)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("logging", new LoggingHandler(LogLevel.ERROR))
.addLast("stringEncoder", new StringEncoder(Charset.forName("GBK")))
.addLast("frameDecoder", new MsgLengthFieldBasedFrameDecoder(receiveBufferSize, 0, 6, 0, 6))
.addLast("stringDecoder", new StringDecoder(Charset.forName("GBK")))
.addLast("messageHandler", new ServerMessageHandler());
}
});
return bootstrap;
}
@PostConstruct
public void start() throws Exception {
serverBootstrap = bootstrap();
serverChannelFuture = serverBootstrap.bind(tcpPort).sync();
log.info("Starting server at tcpPort {}" , tcpPort);
}
@PreDestroy
public void stop() throws Exception {
serverChannelFuture.channel().closeFuture().sync();
}
static class NamedThreadFactory implements ThreadFactory {
public static AtomicInteger counter = new AtomicInteger(1);
private String name = this.getClass().getName();
private boolean deamon ;//守護線程
private int priority ; //線程優先級
public NamedThreadFactory(String name){
this(name, false);
}
public NamedThreadFactory(String name,boolean deamon){
this(name, deamon, -1);
}
public NamedThreadFactory(String name,boolean deamon,int priority){
this.name = name ;
this.deamon = deamon ;
this.priority = priority ;
}
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r,name+"["+counter.getAndIncrement()+"]");
thread.setDaemon(deamon);
if(priority != -1){
thread.setPriority(priority);
}
return thread;
}
}
//拆包
class MsgLengthFieldBasedFrameDecoder extends LengthFieldBasedFrameDecoder {
/**
* @param maxFrameLength 解碼時,處理每個幀數據的最大長度
* @param lengthFieldOffset 該幀數據中,存放該幀數據的長度的數據的起始位置
* @param lengthFieldLength 記錄該幀數據長度的字段本身的長度
* @param lengthAdjustment 修改幀數據長度字段中定義的值,可以為負數
* @param initialBytesToStrip解析的時候需要跳過的字節數
*/
public MsgLengthFieldBasedFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,
int lengthAdjustment, int initialBytesToStrip) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
}
@Override
protected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) {
if(length == 6){
buf = buf.order(order);
byte[] lengthBytes = new byte[6];
buf.readBytes(lengthBytes);
buf.resetReaderIndex();
return Integer.valueOf(new String(lengthBytes));
} else {
return super.getUnadjustedFrameLength(buf, offset, length, order);
}
}
}
class ServerMessageHandler extends ChannelInboundHandlerAdapter {
/**
* 功能:讀取服務器發送過來的信息
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof String) {
try {
doBusinessLogic(ctx,(String)msg);
} finally {
ReferenceCountUtil.release(msg);
}
}
}
// 處理業務邏輯
private void doBusinessLogic(ChannelHandlerContext ctx,String msg) throws Exception {
// todo Business Logic
msg = formatMsg(msg);
ctx.channel().writeAndFlush(msg).addListener(ChannelFutureListener.CLOSE);
}
private String formatMsg(String msg) {
byte[] bodyBytes = msg.getBytes(Charset.forName(CHARSET_NAME));
int bodyLength = bodyBytes.length;
NumberFormat numberFormat = NumberFormat.getNumberInstance();
numberFormat.setMinimumIntegerDigits(6);
numberFormat.setGroupingUsed(false);
return numberFormat.format(bodyLength) + msg;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
public static void main(String[] args) throws Exception{
NettySocketServer server = new NettySocketServer();
server.start();
}
}
總結
同步阻塞IO
偽異步IO
非阻塞IO
異步IO
Netty的非阻塞IO
客戶端:服務端
1:1
N:M(M>=1)
N:M(M>=1,單線程非阻塞,多線程非阻塞)
N:0(不需要啟動額外的IO線程,被動回調)
N:M(M>=1)
IO類型
BIO
BIO
NIO
AIO
NIO
API使用難度
簡單
簡單
非常復雜
復雜
簡單
可靠性
相當差
差
高
高
高+
吞吐量
低
中
高
高
高+
并發
低
中
高
高
高+
參考文獻
▲http://www.ibm.com/developerworks/cn/linux/l-async/
▲http://openjdk.java.net/projects/nio/presentations/TS-4222.pdf
▲http://blog.csdn.net/anxpp/article/details/51512200
▲Netty權威指南
▲Asynchronous I/O Tricks and Tips
責任編輯: