大綱
1.網絡編程簡介
2.BIO網絡編程
3.AIO網絡編程
4.NIO網絡編程之Buffer
5.NIO網絡編程之實戰
6.NIO網絡編程之Reactor模式
5.NIO網絡編程之Buffer
(1)Buffer的作用
Buffer的作用是方便讀寫通道(Channel)中的數據。首先數據是從通道(Channel)讀入緩沖區,從緩沖區寫入通道(Channel)的。應用程序發送數據時,會先將數據寫入緩沖區,然后再通過通道發送緩沖區的數據。應用數據讀取數據時,會先將數從通道中讀到緩沖區,然后再讀取緩沖區的數據。
緩沖區本質上是一塊可以寫入數據、可以讀取數據的內存。這塊內存被包裝成NIO的Buffer對象,并提供了一組方法用來方便訪問該塊內存。所以Buffer的本質是一塊可以寫入數據、可以讀取數據的內存。
(2)Buffer的重要屬性
一.capacity
Buffer作為一個內存塊有一個固定的大小值,叫capacity。我們只能往Buffer中寫capacity個byte、long,char等類型。一旦Buffer滿了,需要將其清空(讀取數據或清除數據)才能繼續寫數據。
二.position
當往Buffer中寫數據時,position表示當前的位置,position的初始值為0。當一個數據寫到Buffer后,position會移動到下一個可插入數據的位置。所以position的最大值為capacity – 1。
當從Buffer中讀取數據時,需要從某個特定的position位置讀數據。如果將Buffer從寫模式切換到讀模式,那么position會被重置為0。當從Buffer的position處讀到一個數據時,position會移動到下一個可讀位置。
三.limit
在寫模式下,Buffer的limit表示最多能往Buffer里寫多少數據。在寫模式下,Buffer的limit等于Buffer的capacity。
當Buffer從寫模式切換到讀模式時, limit表示最多能讀到多少數據。因此,當Buffer切換到讀模式時,limit會被設置成寫模式下的position值。
(3)Buffer的分配
要想獲得一個Buffer對象首先要進行分配,每一個Buffer類都有allocate()方法。可以在堆上分配,也可以在直接內存上分配。
//分配一個capacity為48字節的ByteBuffer的例子
ByteBuffer buf = ByteBuffer.allocate(48);//分配一個capacity為1024個字符的CharBuffer的例子
CharBuffer buf = CharBuffer.allocate(1024);
一般建議使用在堆上分配。如果應用偏計算,就用堆上分配。如果應用偏網絡通訊頻繁,就用直接內存。
wrap()方法可以把一個byte數組或byte數組的一部分包裝成ByteBuffer對象。
ByteBuffer wrap(byte [] array);
ByteBuffer wrap(byte [] array, int offset, int length);
Buffer分配的例子:
//類說明: Buffer的分配
public class AllocateBuffer {//輸出結果如下://----------Test allocate--------//before allocate, 虛擬機可用的內存大小: 253386384//buffer = java.nio.HeapByteBuffer[pos=0 lim=102400000 cap=102400000]//after allocate, 虛擬機可用的內存大小: 150986368//directBuffer = java.nio.DirectByteBuffer[pos=0 lim=102400000 cap=102400000]//after direct allocate, 虛擬機可用的內存大小: 150986368//----------Test wrap--------//java.nio.HeapByteBuffer[pos=0 lim=32 cap=32]//java.nio.HeapByteBuffer[pos=10 lim=20 cap=32]public static void main(String[] args) {System.out.println("----------Test allocate--------");System.out.println("before allocate, 虛擬機可用的內存大小: " + Runtime.getRuntime().freeMemory());//堆上分配ByteBuffer buffer = ByteBuffer.allocate(102400000);System.out.println("buffer = " + buffer);System.out.println("after allocate, 虛擬機可用的內存大小: " + Runtime.getRuntime().freeMemory());//這部分用的直接內存ByteBuffer directBuffer = ByteBuffer.allocateDirect(102400000);System.out.println("directBuffer = " + directBuffer);System.out.println("after direct allocate, 虛擬機可用的內存大小: " + Runtime.getRuntime().freeMemory());System.out.println("----------Test wrap--------");byte[] bytes = new byte[32];buffer = ByteBuffer.wrap(bytes);System.out.println(buffer);buffer = ByteBuffer.wrap(bytes, 10, 10);System.out.println(buffer);}
}
(4)Buffer的讀寫
一.向Buffer中寫數據
將數據寫到Buffer有兩種方式:
方式一:從Channel讀出數據寫到Buffer
方式二:通過Buffer的put()方法將數據寫到Buffer
//從Channel寫到Buffer的例子
int bytesRead = inChannel.read(buf);//從channel讀出數據寫到buffer
//通過put()方法將數據寫到Buffer的例子
buf.put(127);
二.從Buffer中讀取數據
從Buffer中讀取數據有兩種方式:
方式一:從Buffer中讀取數據寫入到Channel
方式二:使用get()方法從Buffer中讀取數據
//從Buffer讀取數據到Channel的例子
int bytesWritten = inChannel.write(buf);
//使用get()方法從Buffer中讀取數據的例子
byte aByte = buf.get();
三.使用Buffer讀寫數據常見步驟
步驟一:寫入數據到Buffer
步驟二:調用flip()方法
步驟三:從Buffer中讀取數據
步驟四:調用clear()方法或compact()方法
flip()方法會將Buffer從寫模式切換到讀模式,調用flip()方法會將position設回0,并將limit設置成之前的position值。
當向buffer寫入數據時,buffer會記錄下寫了多少數據。一旦要讀取數據,需要通過flip()方法將Buffer從寫模式切換到讀模式。在讀模式下,可以讀取之前寫入到buffer的所有數據。一旦讀完了所有的數據,就需要清空緩沖區,讓它可以再次被寫入。
有兩種方式能清空緩沖區:調用clear()方法或compact()方法。clear()方法會清空整個緩沖區,compact()方法只會清除已經讀過的數據。任何未讀的數據都被移到緩沖區的起始處,新寫入的數據將放到緩沖區未讀數據的后面。
四.其他常用操作
操作一:rewind()方法
Buffer.rewind()將position設回0,所以可以重讀Buffer中的所有數據。limit保持不變,仍然表示能從Buffer中讀取多少個元素(byte、char等)。
操作二:clear()與compact()方法
一旦讀取完Buffer中的數據,需要讓Buffer準備好再次被寫入,這時候可以通過clear()方法或compact()方法來完成。
如果調用的是clear()方法,position將被設為0,limit被設為capacity的值。此時Buffer被認為是清空了,但是Buffer中的數據并未清除,只是這些標記能告訴我們可以從哪里開始往Buffer里寫數據。
如果Buffer中有一些未讀的數據,調用clear()方法,數據將被遺忘。意味著不再有任何標記會告訴你哪些數據被讀過,哪些還沒有。
如果Buffer中仍有未讀的數據,且后續還需要這些數據,但是此時想要先寫些數據,那么可以使用compact()方法。compact()方法會將所有未讀的數據拷貝到Buffer起始處,然后將position設到最后一個未讀元素正后面,limit屬性依然像clear()方法一樣設置成capacity。現在Buffer準備好寫數據了,但是不會覆蓋未讀的數據。
操作三:mark()與reset()方法
通過調用Buffer.mark()方法,可以標記Buffer中的一個特定position,之后可以通過調用Buffer.reset()方法恢復到這個position。
//類說明: Buffer方法演示
public class BufferMethod {public static void main(String[] args) {System.out.println("------Test get-------------");ByteBuffer buffer = ByteBuffer.allocate(32);buffer.put((byte) 'a')//0.put((byte) 'b')//1.put((byte) 'c')//2.put((byte) 'd')//3.put((byte) 'e')//4.put((byte) 'f');//5//before flip()java.nio.HeapByteBuffer[pos=6 lim=32 cap=32]System.out.println("before flip()" + buffer);//轉換為讀取模式: pos置為0, lim置為轉換前pos的值buffer.flip();//before get():java.nio.HeapByteBuffer[pos=0 lim=6 cap=32]System.out.println("before get():" + buffer);//get()會影響position的位置, 這是相對取;System.out.println((char) buffer.get());//after get():java.nio.HeapByteBuffer[pos=1 lim=6 cap=32]System.out.println("after get():" + buffer);//get(index)不影響position的值, 這是絕對取;System.out.println((char) buffer.get(2));//after get(index):java.nio.HeapByteBuffer[pos=1 lim=6 cap=32]System.out.println("after get(index):" + buffer);byte[] dst = new byte[10];//position移動兩位buffer.get(dst, 0, 2);//after get(dst, 0, 2):java.nio.HeapByteBuffer[pos=3 lim=6 cap=32]System.out.println("after get(dst, 0, 2):" + buffer);System.out.println("dst:" + new String(dst));//dst:bcSystem.out.println("--------Test put-------");ByteBuffer bb = ByteBuffer.allocate(32);//before put(byte):java.nio.HeapByteBuffer[pos=0 lim=32 cap=32]System.out.println("before put(byte):" + bb);//put()不帶索引會改變pos, after put(byte):java.nio.HeapByteBuffer[pos=1 lim=32 cap=32]System.out.println("after put(byte):" + bb.put((byte) 'z'));//put(2,(byte) 'c')不改變position的位置bb.put(2, (byte) 'c');//after put(2,(byte) 'c'):java.nio.HeapByteBuffer[pos=1 lim=32 cap=32]System.out.println("after put(2,(byte) 'c'):" + bb);System.out.println(new String(bb.array()));//這里的buffer是abcdef[pos=3 lim=6 cap=32]bb.put(buffer);//after put(buffer):java.nio.HeapByteBuffer[pos=4 lim=32 cap=32]System.out.println("after put(buffer):" + bb);System.out.println(new String(bb.array()));System.out.println("--------Test reset----------");buffer = ByteBuffer.allocate(20);System.out.println("buffer = " + buffer);buffer.clear();buffer.position(5);//移動position到5buffer.mark();//記錄當前position的位置buffer.position(10);//移動position到10System.out.println("before reset:" + buffer);buffer.reset();//復位position到記錄的地址System.out.println("after reset:" + buffer);System.out.println("--------Test rewind--------");buffer.clear();buffer.position(10);//移動position到10buffer.limit(15);//限定最大可寫入的位置為15System.out.println("before rewind:" + buffer);buffer.rewind();//將position設回0System.out.println("before rewind:" + buffer);System.out.println("--------Test compact--------");buffer.clear();//放入4個字節,position移動到下個可寫入的位置,也就是4buffer.put("abcd".getBytes());System.out.println("before compact:" + buffer);System.out.println(new String(buffer.array()));buffer.flip();//將position設回0,并將limit設置成之前position的值System.out.println("after flip:" + buffer);//從Buffer中讀取數據的例子,每讀一次,position移動一次System.out.println((char) buffer.get());System.out.println((char) buffer.get());System.out.println((char) buffer.get());System.out.println("after three gets:" + buffer);System.out.println(new String(buffer.array()));//compact()方法將所有未讀的數據拷貝到Buffer起始處//然后將position設到最后一個未讀元素正后面buffer.compact();System.out.println("after compact:" + buffer);System.out.println(new String(buffer.array()));}
}
(5)Buffer常用方法總結
6.NIO網絡編程之實戰
(1)Selector
(2)Channels
(3)SelectionKey
(4)NIO的開發流程
(5)NIO的開發例子
(1)Selector
Selector的含義是選擇器,也可以稱為輪詢代理器、事件訂閱器。Selector的作用是注冊事件和對Channel進行管理。
應用程序可以向Selector對象注冊它關注的Channel,以及具體的某一個Channel會對哪些IO事件感興趣,Selector中會維護一個已經注冊的Channel的容器。
(2)Channels
Channel可以和操作系統進行內容傳遞,應用程序可以通過Channel讀數據,也可以通過Channel向操作系統寫數據,當然寫數據和讀數據都要通過Buffer來實現。
所有被Selector注冊的Channel都是繼承SelectableChannel的子類,通道中的數據總是要先讀到一個Buffer,或者總是要從一個Buffer中寫入。ScoketChannel和ServerSocketChannel都是SelectableChannel類的子類。
(3)SelectionKey
NIO中的SelectionKey共定義了四種事件類型:OP_ACCEPT、OP_READ、OP_WRITE、OP_CONNECT,分別對應接受連接、讀、寫、請求連接的網絡Socket操作。
ServerSocketChannel和SocketChannel可以注冊自己感興趣的操作類型,當對應操作類型的就緒條件滿足時操作系統就會通知這些Channel。
每個操作類型(事件類型)的就緒條件:
(4)NIO的開發流程
步驟一:服務端啟動ServerSocketChannel,關注OP_ACCEPT事件。
步驟二:客戶端啟動SocketChannel,連接服務端,關注OP_CONNECT事件。
步驟三:服務端接受連接,然后啟動一個SocketChannel。該SocketChannel可以關注OP_READ、OP_WRITE事件,一般連接建立后會直接關注OP_READ事件。
步驟四:客戶端的SocketChannel發現連接建立后,關注OP_READ、OP_WRITE事件,一般客戶端需要發送數據了才能關注OP_READ事件。
步驟五:連接建立后,客戶端與服務器端開始相互發送消息(讀寫),然后根據實際情況來關注OP_READ、OP_WRITE事件。
(5)NIO的開發例子
一.客戶端的代碼
//類說明: NIO通信的客戶端
public class NioClient {private static NioClientHandle nioClientHandle;public static void start() {if (nioClientHandle != null) {nioClientHandle.stop();}nioClientHandle = new NioClientHandle(DEFAULT_SERVER_IP, DEFAULT_PORT);new Thread(nioClientHandle, "Client").start();}//向服務器發送消息public static boolean sendMsg(String msg) throws Exception {nioClientHandle.sendMsg(msg);return true;}public static void main(String[] args) throws Exception {start();Scanner scanner = new Scanner(System.in);while(NioClient.sendMsg(scanner.next()));}
}//類說明: NIO通信的客戶端處理器
public class NioClientHandle implements Runnable {private String host;private int port;private Selector selector;private SocketChannel socketChannel;private volatile boolean started;public NioClientHandle(String ip, int port) {this.host = ip;this.port = port;try {//創建選擇器selector = Selector.open();//打開通道socketChannel = SocketChannel.open();//如果為true, 則此通道將被置于阻塞模式; 如果為false, 則此通道將被置于非阻塞模式;//另外, IO復用本身就是非阻塞模式, 所以設置false;socketChannel.configureBlocking(false);//表示連接已經打開started = true;} catch (IOException e) {e.printStackTrace();}}public void stop() {started = false;}private void doConnect() throws IOException {//關于socketChannel.connect方法的說明://如果此通道處于非阻塞模式, 則調用此方法將啟動一個非阻塞連接的操作;//如果該連接建立得非常快, 就像本地連接可能發生的那樣, 則此方法返回true;//否則, 此方法返回false, 稍后必須通過調用finishConnect方法完成連接操作//如果成功連接則什么都不做if (socketChannel.connect(new InetSocketAddress(host, port))) {} else {//連接還未完成, 所以注冊連接就緒事件, 向selector表示關注這個事件socketChannel.register(selector, SelectionKey.OP_CONNECT);}}@Overridepublic void run() {try {//發起連接doConnect();} catch (IOException e) {e.printStackTrace();System.exit(1);}//循環遍歷selectorwhile(started) {try {//selector.select()會阻塞, 只有當至少一個注冊的事件發生的時候才會繼續selector.select();//獲取當前有哪些事件可以使用Set<SelectionKey> keys = selector.selectedKeys();//轉換為迭代器Iterator<SelectionKey> it = keys.iterator();SelectionKey key = null;while(it.hasNext()) {key = it.next();it.remove();//拿到key后從迭代器移除try {handleInput(key);} catch (IOException e) {e.printStackTrace();if (key != null) {key.cancel();if (key.channel() != null) {key.channel().close();}}}}} catch (IOException e) {e.printStackTrace();}}//selector關閉后會自動釋放里面管理的資源if (selector != null) {try {selector.close();} catch (IOException e) {e.printStackTrace();}}}//具體的事件處理方法private void handleInput(SelectionKey key) throws IOException {if (key.isValid()) {//根據SelectionKey獲得關心當前事件的channelSocketChannel sc = (SocketChannel)key.channel();//處理連接事件if (key.isConnectable()) {if (sc.finishConnect()) {socketChannel.register(selector, SelectionKey.OP_READ);} else{System.exit(1);}}//有數據可讀事件if (key.isReadable()) {//創建ByteBuffer, 并開辟一個1M的緩沖區ByteBuffer buffer = ByteBuffer.allocate(1024);//讀取請求碼流, 返回讀取到的字節數, 從channel讀出來并寫到buffer里去int readBytes = sc.read(buffer);//讀取到字節,對字節進行編解碼if (readBytes > 0) {//將緩沖區當前的limit設置為position,position=0,//用于后續對緩沖區的讀取操作buffer.flip();//根據緩沖區可讀字節數創建字節數組byte[] bytes = new byte[buffer.remaining()];//將緩沖區可讀字節數組復制到新建的數組中buffer.get(bytes);String result = new String(bytes,"UTF-8");System.out.println("accept message: " + result);} else if (readBytes < 0) {key.cancel();sc.close();}}}}//發送消息private void doWrite(SocketChannel channel, String request) throws IOException {//將消息編碼為字節數組byte[] bytes = request.getBytes();//根據數組容量創建ByteBufferByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);//將字節數組復制到緩沖區writeBuffer.put(bytes);//flip操作writeBuffer.flip();//發送緩沖區的字節數組channel.write(writeBuffer);}//寫數據對外暴露的APIpublic void sendMsg(String msg) throws Exception {socketChannel.register(selector, SelectionKey.OP_READ);doWrite(socketChannel, msg);}
}
二.服務端的代碼
//類說明: NIO通信的服務端
public class NioServer {private static NioServerHandle nioServerHandle;public static void start() {if (nioServerHandle != null) {nioServerHandle.stop();}nioServerHandle = new NioServerHandle(DEFAULT_PORT);new Thread(nioServerHandle,"Server").start();}public static void main(String[] args) {start();}
}//類說明: NIO通信的服務端處理器
public class NioServerHandle implements Runnable {private Selector selector;private ServerSocketChannel serverChannel;private volatile boolean started;//構造方法public NioServerHandle(int port) {try {//創建選擇器selector = Selector.open();//打開通道serverChannel = ServerSocketChannel.open();//如果為true, 則此通道將被置于阻塞模式; 如果為false, 則此通道將被置于非阻塞模式;//另外, IO復用本身就是非阻塞模式, 所以設置false;serverChannel.configureBlocking(false);//指定監聽的端口serverChannel.socket().bind(new InetSocketAddress(port));//注冊服務端關心的事件: 連接事件serverChannel.register(selector, SelectionKey.OP_ACCEPT);//表示連接已經打開started = true;System.out.println("服務器已啟動, 端口號: " + port);} catch (IOException e) {e.printStackTrace();}}public void stop() {started = false;}@Overridepublic void run() {//循環遍歷selectorwhile(started) {try {//阻塞, 只有當至少一個注冊的事件發生的時候才會繼續selector.select();Set<SelectionKey> keys = selector.selectedKeys();Iterator<SelectionKey> it = keys.iterator();SelectionKey key = null;while(it.hasNext()) {key = it.next();it.remove();try {handleInput(key);} catch(Exception e) {if (key != null) {key.cancel();if (key.channel() != null) {key.channel().close();}}}}} catch(Throwable t) {t.printStackTrace();}}//selector關閉后會自動釋放里面管理的資源if (selector != null) {try {selector.close();} catch (Exception e) {e.printStackTrace();}}}private void handleInput(SelectionKey key) throws IOException {if (key.isValid()) {//處理新接入的請求消息if (key.isAcceptable()) {//獲得關心當前事件的channelServerSocketChannel ssc = (ServerSocketChannel)key.channel();//通過ServerSocketChannel的accept創建SocketChannel實例//完成該操作意味著完成TCP三次握手, TCP物理鏈路正式建立SocketChannel sc = ssc.accept();System.out.println("======socket channel 建立連接");//設置為非阻塞的sc.configureBlocking(false);//連接已經完成了, 可以開始關心讀事件了sc.register(selector, SelectionKey.OP_READ);}//讀消息if (key.isReadable()) {System.out.println("======socket channel 數據準備完成, " + "可以去讀==讀取=======");SocketChannel sc = (SocketChannel) key.channel();//創建ByteBuffer, 并開辟一個1M的緩沖區ByteBuffer buffer = ByteBuffer.allocate(1024);//讀取請求碼流, 返回讀取到的字節數int readBytes = sc.read(buffer);//讀取到字節, 對字節進行編解碼if (readBytes > 0) {//將緩沖區當前的limit設置為position=0, 用于后續對緩沖區的讀取操作buffer.flip();//根據緩沖區可讀字節數創建字節數組byte[] bytes = new byte[buffer.remaining()];//將緩沖區可讀字節數組復制到新建的數組中buffer.get(bytes);String message = new String(bytes,"UTF-8");System.out.println("服務器收到消息: " + message);//處理數據String result = response(message) ;//發送應答消息doWrite(sc, result);} else if (readBytes < 0) {//鏈路已經關閉, 釋放資源key.cancel();sc.close();}}}}//發送應答消息private void doWrite(SocketChannel channel,String response) throws IOException {//將消息編碼為字節數組byte[] bytes = response.getBytes();//根據數組容量創建ByteBufferByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);//將字節數組復制到緩沖區writeBuffer.put(bytes);//flip操作writeBuffer.flip();//發送緩沖區的字節數組channel.write(writeBuffer);}
}
6.NIO網絡編程之Reactor模式
(1)單線程的Reactor模式
(2)單線程Reactor模式的改進
(3)多線程的Reactor模式
(1)單線程的Reactor模式
一.單線程Reactor模式的流程
首先服務端的Reactor其實是一個線程對象。Reactor會啟動事件循環,并使用Selector(選擇器)來實現IO多路復用。
然后服務端啟動時會注冊一個Acceptor事件處理器到Reactor中。這個Acceptor事件處理器會關注accept事件,這樣Reactor監聽到accept事件就會交給Acceptor事件處理器進行處理了。
當客戶端向服務器端發起一個連接請求時,Reactor就會監聽到一個accept事件,于是會將該accept事件派發給Acceptor處理器進行處理。
接著Acceptor處理器通過accept()方法便能得到這個客戶端對應的連接(SocketChannel),然后將該連接(SocketChannel)所關注的read事件及對應的read事件處理器注冊到Reactor中,這樣Reactor監聽到該連接的read事件就會交給對應的read事件處理器進行處理。
當Reactor監聽到客戶端的連接(SocketChannel)有讀寫事件發生時,就會將讀寫事件派發給對應的讀寫事件處理器進行處理。比如讀事件處理器會通過SocketChannel的read()方法讀取數據,此時的read()方法可以直接讀取到數據,不需要阻塞等待可讀數據的到來。
每當Acceptor處理器和讀寫事件處理器處理完所有就緒的感興趣的IO事件后,Reactor線程會再次執行select()方法阻塞等待新的事件就緒并將其分派給對應處理器進行處理。
二.單線程Reactor模式的問題
注意:單線程的Reactor模式中的單線程主要是針對IO操作而言的,也就是所有的IO的accept、read、write、connect操作都在一個線程上完成。
由于在單線程Reactor模式中,不僅IO操作在Reactor線程上,而且非IO的業務操作也在Reactor線程上進行處理,這會大大降低IO請求的響應。所以應將非IO的業務操作從Reactor線程上剝離,以提高Reactor線程對IO請求的響應。
(2)單線程Reactor模式的改進
一.增加工作線程池來進行改進
為了改善單線程Reactor模式中Reactor線程還要處理業務邏輯,可以添加一個工作線程池。將非IO操作(解碼、計算、編碼)從Reactor線程中移出,然后轉交給這個工作線程池來執行。所有IO操作依舊由單個Reactor線程來完成,如IO的accept、read、write以及connect操作。這樣就能提高Reactor線程的IO響應,避免因耗時的業務邏輯而降低對后續IO請求的處理。
二.使用線程池的好處
合理使用線程池,可以帶來很多好處:
好處一:減少頻繁創建和銷毀線程的性能開銷
好處二:重復利用線程,避免對每個任務都創建線程,可以提高響應速度
好處三:合理設置線程池的大小,可以避免因為線程池過大影響性能
三.單線程Reactor不適合高并發場景
對于一些小容量的應用場景,可以使用單線程Reactor模型,但是對于一些高負載、大并發或大數據量的應用場景卻不合適。
原因一:一個NIO線程同時處理成百上千的鏈路,性能上無法支撐。即便NIO線程的CPU負荷達到100%,也無法滿足海量消息的讀取和發送。
原因二:當NIO線程負載過重之后,處理速度將變慢,這會導致大量客戶端連接超時。超時之后往往會進行重發,這更加重了NIO線程的負載。最終會導致大量消息積壓和處理超時,成為系統的性能瓶頸。
(3)多線程的Reactor模式
一.多線程的Reactor模式介紹
在多線程的Reactor模式下,存在一個Reactor線程池,Reactor線程池里的每一個Reactor線程都有自己的Selector和事件分發邏輯。
Reactor線程池里的主反應器線程mainReactor可以只有一個,但子反應器線程subReactor一般會有多個,通常subReactor也是一個線程池。
主反應器線程mainReactor主要負責接收客戶端的連接請求,然后將接收到的SocketChannel傳遞給子反應器線程subReactor,由subReactor來完成和客戶端的通信。
二.多線程的Reactor模式流程
首先服務端的Reactor變成了多個線程對象,分為mainReactor和subReactor。這些Reactor對象也會啟動事件循環,并使用Selector(選擇器)來實現IO多路復用。
然后服務端在啟動時會注冊一個Acceptor事件處理器到mainReactor中。這個Acceptor事件處理器會關注accept事件,這樣mainReactor監聽到accept事件就會交給Acceptor事件處理器進行處理。
當客戶端向服務端發起一個連接請求時,mainReactor就會監聽到一個accept事件,于是就會將這個accept事件派發給Acceptor處理器來進行處理。
接著Acceptor處理器通過accept()方法便能得到這個客戶端對應的連接(SocketChannel),然后將這個連接SocketChannel傳遞給subReactor線程池進行處理。
subReactor線程池會分配一個subReactor線程給這個SocketChannel,并將SocketChannel關注的read事件及對應的read事件處理器注冊到這個subReactor線程中。當然也會注冊關注的write事件及write事件處理器到subReactor線程中以完成IO寫操作。
總之,Reactor線程池中的每一Reactor線程都會有自己的Selector和事件分發邏輯。當有IO事件就緒時,相關的subReactor就將事件派發給響應的處理器處理。
注意,這里subReactor線程只負責完成IO的read()操作,在讀取到數據后將業務邏輯的處理放入到工作線程池中完成。若完成業務邏輯后需要返回數據給客戶端,則IO的write()操作還是會被提交回subReactor線程來完成。這樣所有的IO操作依舊在Reactor線程(mainReactor或subReactor)中完成,而工作線程池僅用來處理非IO操作的邏輯。
多Reactor線程模式將"接收客戶端的連接請求"和"與該客戶端進行讀寫通信"分在了兩個Reactor線程來完成。mainReactor完成接收客戶端連接請求的操作,它不負責與客戶端的通信,而是將建立好的連接轉交給subReactor線程來完成與客戶端的通信,這樣就不會因為read()操作的數據量太大而導致后面的客戶端連接請求得不到及時處理。并且多Reactor線程模式在海量的客戶端并發請求的情況下,還可以通過subReactor線程池將海量的連接分發給多個subReactor線程,在多核的操作系統中這能大大提升應用的負載和吞吐量。
Netty服務端就是使用了多線程的Reactor模式。