Netty基礎—2.網絡編程基礎四

大綱

1.網絡編程簡介

2.BIO網絡編程

3.AIO網絡編程

4.NIO網絡編程之Buffer

5.NIO網絡編程之實戰

6.NIO網絡編程之Reactor模式

5.NIO網絡編程之Buffer

(1)Buffer的作用

Buffer的作用是方便讀寫通道(Channel)中的數據。首先數據是從通道(Channel)讀入緩沖區,從緩沖區寫入通道(Channel)的。應用程序發送數據時,會先將數據寫入緩沖區,然后再通過通道發送緩沖區的數據。應用數據讀取數據時,會先將數從通道中讀到緩沖區,然后再讀取緩沖區的數據。

緩沖區本質上是一塊可以寫入數據、可以讀取數據的內存。這塊內存被包裝成NIO的Buffer對象,并提供了一組方法用來方便訪問該塊內存。所以Buffer的本質是一塊可以寫入數據、可以讀取數據的內存。

(2)Buffer的重要屬性

一.capacity

Buffer作為一個內存塊有一個固定的大小值,叫capacity。我們只能往Buffer中寫capacity個byte、long,char等類型。一旦Buffer滿了,需要將其清空(讀取數據或清除數據)才能繼續寫數據。

二.position

當往Buffer中寫數據時,position表示當前的位置,position的初始值為0。當一個數據寫到Buffer后,position會移動到下一個可插入數據的位置。所以position的最大值為capacity – 1。

當從Buffer中讀取數據時,需要從某個特定的position位置讀數據。如果將Buffer從寫模式切換到讀模式,那么position會被重置為0。當從Buffer的position處讀到一個數據時,position會移動到下一個可讀位置。

三.limit

在寫模式下,Buffer的limit表示最多能往Buffer里寫多少數據。在寫模式下,Buffer的limit等于Buffer的capacity。

當Buffer從寫模式切換到讀模式時, limit表示最多能讀到多少數據。因此,當Buffer切換到讀模式時,limit會被設置成寫模式下的position值。

圖片

(3)Buffer的分配

要想獲得一個Buffer對象首先要進行分配,每一個Buffer類都有allocate()方法。可以在堆上分配,也可以在直接內存上分配。

//分配一個capacity為48字節的ByteBuffer的例子
ByteBuffer buf = ByteBuffer.allocate(48);//分配一個capacity為1024個字符的CharBuffer的例子
CharBuffer buf = CharBuffer.allocate(1024);

一般建議使用在堆上分配。如果應用偏計算,就用堆上分配。如果應用偏網絡通訊頻繁,就用直接內存。

wrap()方法可以把一個byte數組或byte數組的一部分包裝成ByteBuffer對象。

ByteBuffer wrap(byte [] array);
ByteBuffer wrap(byte [] array, int offset, int length);

Buffer分配的例子:

//類說明: Buffer的分配
public class AllocateBuffer {//輸出結果如下://----------Test allocate--------//before allocate, 虛擬機可用的內存大小: 253386384//buffer = java.nio.HeapByteBuffer[pos=0 lim=102400000 cap=102400000]//after allocate, 虛擬機可用的內存大小: 150986368//directBuffer = java.nio.DirectByteBuffer[pos=0 lim=102400000 cap=102400000]//after direct allocate, 虛擬機可用的內存大小: 150986368//----------Test wrap--------//java.nio.HeapByteBuffer[pos=0 lim=32 cap=32]//java.nio.HeapByteBuffer[pos=10 lim=20 cap=32]public static void main(String[] args) {System.out.println("----------Test allocate--------");System.out.println("before allocate, 虛擬機可用的內存大小: " + Runtime.getRuntime().freeMemory());//堆上分配ByteBuffer buffer = ByteBuffer.allocate(102400000);System.out.println("buffer = " + buffer);System.out.println("after allocate, 虛擬機可用的內存大小: " + Runtime.getRuntime().freeMemory());//這部分用的直接內存ByteBuffer directBuffer = ByteBuffer.allocateDirect(102400000);System.out.println("directBuffer = " + directBuffer);System.out.println("after direct allocate, 虛擬機可用的內存大小: " + Runtime.getRuntime().freeMemory());System.out.println("----------Test wrap--------");byte[] bytes = new byte[32];buffer = ByteBuffer.wrap(bytes);System.out.println(buffer);buffer = ByteBuffer.wrap(bytes, 10, 10);System.out.println(buffer);}
}

(4)Buffer的讀寫

一.向Buffer中寫數據

將數據寫到Buffer有兩種方式:

方式一:從Channel讀出數據寫到Buffer

方式二:通過Buffer的put()方法將數據寫到Buffer

//從Channel寫到Buffer的例子
int bytesRead = inChannel.read(buf);//從channel讀出數據寫到buffer
//通過put()方法將數據寫到Buffer的例子
buf.put(127);

二.從Buffer中讀取數據

從Buffer中讀取數據有兩種方式:

方式一:從Buffer中讀取數據寫入到Channel

方式二:使用get()方法從Buffer中讀取數據

//從Buffer讀取數據到Channel的例子
int bytesWritten = inChannel.write(buf);
//使用get()方法從Buffer中讀取數據的例子
byte aByte = buf.get();

三.使用Buffer讀寫數據常見步驟

步驟一:寫入數據到Buffer

步驟二:調用flip()方法

步驟三:從Buffer中讀取數據

步驟四:調用clear()方法或compact()方法

flip()方法會將Buffer從寫模式切換到讀模式,調用flip()方法會將position設回0,并將limit設置成之前的position值。

當向buffer寫入數據時,buffer會記錄下寫了多少數據。一旦要讀取數據,需要通過flip()方法將Buffer從寫模式切換到讀模式。在讀模式下,可以讀取之前寫入到buffer的所有數據。一旦讀完了所有的數據,就需要清空緩沖區,讓它可以再次被寫入。

有兩種方式能清空緩沖區:調用clear()方法或compact()方法。clear()方法會清空整個緩沖區,compact()方法只會清除已經讀過的數據。任何未讀的數據都被移到緩沖區的起始處,新寫入的數據將放到緩沖區未讀數據的后面。

四.其他常用操作

操作一:rewind()方法

Buffer.rewind()將position設回0,所以可以重讀Buffer中的所有數據。limit保持不變,仍然表示能從Buffer中讀取多少個元素(byte、char等)。

操作二:clear()與compact()方法

一旦讀取完Buffer中的數據,需要讓Buffer準備好再次被寫入,這時候可以通過clear()方法或compact()方法來完成。

如果調用的是clear()方法,position將被設為0,limit被設為capacity的值。此時Buffer被認為是清空了,但是Buffer中的數據并未清除,只是這些標記能告訴我們可以從哪里開始往Buffer里寫數據。

如果Buffer中有一些未讀的數據,調用clear()方法,數據將被遺忘。意味著不再有任何標記會告訴你哪些數據被讀過,哪些還沒有。

如果Buffer中仍有未讀的數據,且后續還需要這些數據,但是此時想要先寫些數據,那么可以使用compact()方法。compact()方法會將所有未讀的數據拷貝到Buffer起始處,然后將position設到最后一個未讀元素正后面,limit屬性依然像clear()方法一樣設置成capacity。現在Buffer準備好寫數據了,但是不會覆蓋未讀的數據。

操作三:mark()與reset()方法

通過調用Buffer.mark()方法,可以標記Buffer中的一個特定position,之后可以通過調用Buffer.reset()方法恢復到這個position。

//類說明: Buffer方法演示
public class BufferMethod {public static void main(String[] args) {System.out.println("------Test get-------------");ByteBuffer buffer = ByteBuffer.allocate(32);buffer.put((byte) 'a')//0.put((byte) 'b')//1.put((byte) 'c')//2.put((byte) 'd')//3.put((byte) 'e')//4.put((byte) 'f');//5//before flip()java.nio.HeapByteBuffer[pos=6 lim=32 cap=32]System.out.println("before flip()" + buffer);//轉換為讀取模式: pos置為0, lim置為轉換前pos的值buffer.flip();//before get():java.nio.HeapByteBuffer[pos=0 lim=6 cap=32]System.out.println("before get():" + buffer);//get()會影響position的位置, 這是相對取;System.out.println((char) buffer.get());//after get():java.nio.HeapByteBuffer[pos=1 lim=6 cap=32]System.out.println("after get():" + buffer);//get(index)不影響position的值, 這是絕對取;System.out.println((char) buffer.get(2));//after get(index):java.nio.HeapByteBuffer[pos=1 lim=6 cap=32]System.out.println("after get(index):" + buffer);byte[] dst = new byte[10];//position移動兩位buffer.get(dst, 0, 2);//after get(dst, 0, 2):java.nio.HeapByteBuffer[pos=3 lim=6 cap=32]System.out.println("after get(dst, 0, 2):" + buffer);System.out.println("dst:" + new String(dst));//dst:bcSystem.out.println("--------Test put-------");ByteBuffer bb = ByteBuffer.allocate(32);//before put(byte):java.nio.HeapByteBuffer[pos=0 lim=32 cap=32]System.out.println("before put(byte):" + bb);//put()不帶索引會改變pos, after put(byte):java.nio.HeapByteBuffer[pos=1 lim=32 cap=32]System.out.println("after put(byte):" + bb.put((byte) 'z'));//put(2,(byte) 'c')不改變position的位置bb.put(2, (byte) 'c');//after put(2,(byte) 'c'):java.nio.HeapByteBuffer[pos=1 lim=32 cap=32]System.out.println("after put(2,(byte) 'c'):" + bb);System.out.println(new String(bb.array()));//這里的buffer是abcdef[pos=3 lim=6 cap=32]bb.put(buffer);//after put(buffer):java.nio.HeapByteBuffer[pos=4 lim=32 cap=32]System.out.println("after put(buffer):" + bb);System.out.println(new String(bb.array()));System.out.println("--------Test reset----------");buffer = ByteBuffer.allocate(20);System.out.println("buffer = " + buffer);buffer.clear();buffer.position(5);//移動position到5buffer.mark();//記錄當前position的位置buffer.position(10);//移動position到10System.out.println("before reset:" + buffer);buffer.reset();//復位position到記錄的地址System.out.println("after reset:" + buffer);System.out.println("--------Test rewind--------");buffer.clear();buffer.position(10);//移動position到10buffer.limit(15);//限定最大可寫入的位置為15System.out.println("before rewind:" + buffer);buffer.rewind();//將position設回0System.out.println("before rewind:" + buffer);System.out.println("--------Test compact--------");buffer.clear();//放入4個字節,position移動到下個可寫入的位置,也就是4buffer.put("abcd".getBytes());System.out.println("before compact:" + buffer);System.out.println(new String(buffer.array()));buffer.flip();//將position設回0,并將limit設置成之前position的值System.out.println("after flip:" + buffer);//從Buffer中讀取數據的例子,每讀一次,position移動一次System.out.println((char) buffer.get());System.out.println((char) buffer.get());System.out.println((char) buffer.get());System.out.println("after three gets:" + buffer);System.out.println(new String(buffer.array()));//compact()方法將所有未讀的數據拷貝到Buffer起始處//然后將position設到最后一個未讀元素正后面buffer.compact();System.out.println("after compact:" + buffer);System.out.println(new String(buffer.array()));}
}

(5)Buffer常用方法總結

圖片

6.NIO網絡編程之實戰

(1)Selector

(2)Channels

(3)SelectionKey

(4)NIO的開發流程

(5)NIO的開發例子

(1)Selector

Selector的含義是選擇器,也可以稱為輪詢代理器、事件訂閱器。Selector的作用是注冊事件和對Channel進行管理。

應用程序可以向Selector對象注冊它關注的Channel,以及具體的某一個Channel會對哪些IO事件感興趣,Selector中會維護一個已經注冊的Channel的容器。

(2)Channels

Channel可以和操作系統進行內容傳遞,應用程序可以通過Channel讀數據,也可以通過Channel向操作系統寫數據,當然寫數據和讀數據都要通過Buffer來實現。

所有被Selector注冊的Channel都是繼承SelectableChannel的子類,通道中的數據總是要先讀到一個Buffer,或者總是要從一個Buffer中寫入。ScoketChannel和ServerSocketChannel都是SelectableChannel類的子類。

(3)SelectionKey

NIO中的SelectionKey共定義了四種事件類型:OP_ACCEPT、OP_READ、OP_WRITE、OP_CONNECT,分別對應接受連接、讀、寫、請求連接的網絡Socket操作。

ServerSocketChannel和SocketChannel可以注冊自己感興趣的操作類型,當對應操作類型的就緒條件滿足時操作系統就會通知這些Channel。

每個操作類型(事件類型)的就緒條件:

圖片

(4)NIO的開發流程

步驟一:服務端啟動ServerSocketChannel,關注OP_ACCEPT事件。

步驟二:客戶端啟動SocketChannel,連接服務端,關注OP_CONNECT事件。

步驟三:服務端接受連接,然后啟動一個SocketChannel。該SocketChannel可以關注OP_READ、OP_WRITE事件,一般連接建立后會直接關注OP_READ事件。

步驟四:客戶端的SocketChannel發現連接建立后,關注OP_READ、OP_WRITE事件,一般客戶端需要發送數據了才能關注OP_READ事件。

步驟五:連接建立后,客戶端與服務器端開始相互發送消息(讀寫),然后根據實際情況來關注OP_READ、OP_WRITE事件。

圖片

(5)NIO的開發例子

一.客戶端的代碼

//類說明: NIO通信的客戶端
public class NioClient {private static NioClientHandle nioClientHandle;public static void start() {if (nioClientHandle != null) {nioClientHandle.stop();}nioClientHandle = new NioClientHandle(DEFAULT_SERVER_IP, DEFAULT_PORT);new Thread(nioClientHandle, "Client").start();}//向服務器發送消息public static boolean sendMsg(String msg) throws Exception {nioClientHandle.sendMsg(msg);return true;}public static void main(String[] args) throws Exception {start();Scanner scanner = new Scanner(System.in);while(NioClient.sendMsg(scanner.next()));}
}//類說明: NIO通信的客戶端處理器
public class NioClientHandle implements Runnable {private String host;private int port;private Selector selector;private SocketChannel socketChannel;private volatile boolean started;public NioClientHandle(String ip, int port) {this.host = ip;this.port = port;try {//創建選擇器selector = Selector.open();//打開通道socketChannel = SocketChannel.open();//如果為true, 則此通道將被置于阻塞模式; 如果為false, 則此通道將被置于非阻塞模式;//另外, IO復用本身就是非阻塞模式, 所以設置false;socketChannel.configureBlocking(false);//表示連接已經打開started = true;} catch (IOException e) {e.printStackTrace();}}public void stop() {started = false;}private void doConnect() throws IOException {//關于socketChannel.connect方法的說明://如果此通道處于非阻塞模式, 則調用此方法將啟動一個非阻塞連接的操作;//如果該連接建立得非常快, 就像本地連接可能發生的那樣, 則此方法返回true;//否則, 此方法返回false, 稍后必須通過調用finishConnect方法完成連接操作//如果成功連接則什么都不做if (socketChannel.connect(new InetSocketAddress(host, port))) {} else {//連接還未完成, 所以注冊連接就緒事件, 向selector表示關注這個事件socketChannel.register(selector, SelectionKey.OP_CONNECT);}}@Overridepublic void run() {try {//發起連接doConnect();} catch (IOException e) {e.printStackTrace();System.exit(1);}//循環遍歷selectorwhile(started) {try {//selector.select()會阻塞, 只有當至少一個注冊的事件發生的時候才會繼續selector.select();//獲取當前有哪些事件可以使用Set<SelectionKey> keys = selector.selectedKeys();//轉換為迭代器Iterator<SelectionKey> it = keys.iterator();SelectionKey key = null;while(it.hasNext()) {key = it.next();it.remove();//拿到key后從迭代器移除try {handleInput(key);} catch (IOException e) {e.printStackTrace();if (key != null) {key.cancel();if (key.channel() != null) {key.channel().close();}}}}} catch (IOException e) {e.printStackTrace();}}//selector關閉后會自動釋放里面管理的資源if (selector != null) {try {selector.close();} catch (IOException e) {e.printStackTrace();}}}//具體的事件處理方法private void handleInput(SelectionKey key) throws IOException {if (key.isValid()) {//根據SelectionKey獲得關心當前事件的channelSocketChannel sc = (SocketChannel)key.channel();//處理連接事件if (key.isConnectable()) {if (sc.finishConnect()) {socketChannel.register(selector, SelectionKey.OP_READ);} else{System.exit(1);}}//有數據可讀事件if (key.isReadable()) {//創建ByteBuffer, 并開辟一個1M的緩沖區ByteBuffer buffer = ByteBuffer.allocate(1024);//讀取請求碼流, 返回讀取到的字節數, 從channel讀出來并寫到buffer里去int readBytes = sc.read(buffer);//讀取到字節,對字節進行編解碼if (readBytes > 0) {//將緩沖區當前的limit設置為position,position=0,//用于后續對緩沖區的讀取操作buffer.flip();//根據緩沖區可讀字節數創建字節數組byte[] bytes = new byte[buffer.remaining()];//將緩沖區可讀字節數組復制到新建的數組中buffer.get(bytes);String result = new String(bytes,"UTF-8");System.out.println("accept message: " + result);} else if (readBytes < 0) {key.cancel();sc.close();}}}}//發送消息private void doWrite(SocketChannel channel, String request) throws IOException {//將消息編碼為字節數組byte[] bytes = request.getBytes();//根據數組容量創建ByteBufferByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);//將字節數組復制到緩沖區writeBuffer.put(bytes);//flip操作writeBuffer.flip();//發送緩沖區的字節數組channel.write(writeBuffer);}//寫數據對外暴露的APIpublic void sendMsg(String msg) throws Exception {socketChannel.register(selector, SelectionKey.OP_READ);doWrite(socketChannel, msg);}
}

二.服務端的代碼

//類說明: NIO通信的服務端
public class NioServer {private static NioServerHandle nioServerHandle;public static void start() {if (nioServerHandle != null) {nioServerHandle.stop();}nioServerHandle = new NioServerHandle(DEFAULT_PORT);new Thread(nioServerHandle,"Server").start();}public static void main(String[] args) {start();}
}//類說明: NIO通信的服務端處理器
public class NioServerHandle implements Runnable {private Selector selector;private ServerSocketChannel serverChannel;private volatile boolean started;//構造方法public NioServerHandle(int port) {try {//創建選擇器selector = Selector.open();//打開通道serverChannel = ServerSocketChannel.open();//如果為true, 則此通道將被置于阻塞模式; 如果為false, 則此通道將被置于非阻塞模式;//另外, IO復用本身就是非阻塞模式, 所以設置false;serverChannel.configureBlocking(false);//指定監聽的端口serverChannel.socket().bind(new InetSocketAddress(port));//注冊服務端關心的事件: 連接事件serverChannel.register(selector, SelectionKey.OP_ACCEPT);//表示連接已經打開started = true;System.out.println("服務器已啟動, 端口號: " + port);} catch (IOException e) {e.printStackTrace();}}public void stop() {started = false;}@Overridepublic void run() {//循環遍歷selectorwhile(started) {try {//阻塞, 只有當至少一個注冊的事件發生的時候才會繼續selector.select();Set<SelectionKey> keys = selector.selectedKeys();Iterator<SelectionKey> it = keys.iterator();SelectionKey key = null;while(it.hasNext()) {key = it.next();it.remove();try {handleInput(key);} catch(Exception e) {if (key != null) {key.cancel();if (key.channel() != null) {key.channel().close();}}}}} catch(Throwable t) {t.printStackTrace();}}//selector關閉后會自動釋放里面管理的資源if (selector != null) {try {selector.close();} catch (Exception e) {e.printStackTrace();}}}private void handleInput(SelectionKey key) throws IOException {if (key.isValid()) {//處理新接入的請求消息if (key.isAcceptable()) {//獲得關心當前事件的channelServerSocketChannel ssc = (ServerSocketChannel)key.channel();//通過ServerSocketChannel的accept創建SocketChannel實例//完成該操作意味著完成TCP三次握手, TCP物理鏈路正式建立SocketChannel sc = ssc.accept();System.out.println("======socket channel 建立連接");//設置為非阻塞的sc.configureBlocking(false);//連接已經完成了, 可以開始關心讀事件了sc.register(selector, SelectionKey.OP_READ);}//讀消息if (key.isReadable()) {System.out.println("======socket channel 數據準備完成, " + "可以去讀==讀取=======");SocketChannel sc = (SocketChannel) key.channel();//創建ByteBuffer, 并開辟一個1M的緩沖區ByteBuffer buffer = ByteBuffer.allocate(1024);//讀取請求碼流, 返回讀取到的字節數int readBytes = sc.read(buffer);//讀取到字節, 對字節進行編解碼if (readBytes > 0) {//將緩沖區當前的limit設置為position=0, 用于后續對緩沖區的讀取操作buffer.flip();//根據緩沖區可讀字節數創建字節數組byte[] bytes = new byte[buffer.remaining()];//將緩沖區可讀字節數組復制到新建的數組中buffer.get(bytes);String message = new String(bytes,"UTF-8");System.out.println("服務器收到消息: " + message);//處理數據String result = response(message) ;//發送應答消息doWrite(sc, result);} else if (readBytes < 0) {//鏈路已經關閉, 釋放資源key.cancel();sc.close();}}}}//發送應答消息private void doWrite(SocketChannel channel,String response) throws IOException {//將消息編碼為字節數組byte[] bytes = response.getBytes();//根據數組容量創建ByteBufferByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);//將字節數組復制到緩沖區writeBuffer.put(bytes);//flip操作writeBuffer.flip();//發送緩沖區的字節數組channel.write(writeBuffer);}
}

6.NIO網絡編程之Reactor模式

(1)單線程的Reactor模式

(2)單線程Reactor模式的改進

(3)多線程的Reactor模式

(1)單線程的Reactor模式

圖片

一.單線程Reactor模式的流程

首先服務端的Reactor其實是一個線程對象。Reactor會啟動事件循環,并使用Selector(選擇器)來實現IO多路復用。

然后服務端啟動時會注冊一個Acceptor事件處理器到Reactor中。這個Acceptor事件處理器會關注accept事件,這樣Reactor監聽到accept事件就會交給Acceptor事件處理器進行處理了。

當客戶端向服務器端發起一個連接請求時,Reactor就會監聽到一個accept事件,于是會將該accept事件派發給Acceptor處理器進行處理。

接著Acceptor處理器通過accept()方法便能得到這個客戶端對應的連接(SocketChannel),然后將該連接(SocketChannel)所關注的read事件及對應的read事件處理器注冊到Reactor中,這樣Reactor監聽到該連接的read事件就會交給對應的read事件處理器進行處理。

當Reactor監聽到客戶端的連接(SocketChannel)有讀寫事件發生時,就會將讀寫事件派發給對應的讀寫事件處理器進行處理。比如讀事件處理器會通過SocketChannel的read()方法讀取數據,此時的read()方法可以直接讀取到數據,不需要阻塞等待可讀數據的到來。

每當Acceptor處理器和讀寫事件處理器處理完所有就緒的感興趣的IO事件后,Reactor線程會再次執行select()方法阻塞等待新的事件就緒并將其分派給對應處理器進行處理。

二.單線程Reactor模式的問題

注意:單線程的Reactor模式中的單線程主要是針對IO操作而言的,也就是所有的IO的accept、read、write、connect操作都在一個線程上完成。

由于在單線程Reactor模式中,不僅IO操作在Reactor線程上,而且非IO的業務操作也在Reactor線程上進行處理,這會大大降低IO請求的響應。所以應將非IO的業務操作從Reactor線程上剝離,以提高Reactor線程對IO請求的響應。

(2)單線程Reactor模式的改進

圖片

一.增加工作線程池來進行改進

為了改善單線程Reactor模式中Reactor線程還要處理業務邏輯,可以添加一個工作線程池。將非IO操作(解碼、計算、編碼)從Reactor線程中移出,然后轉交給這個工作線程池來執行。所有IO操作依舊由單個Reactor線程來完成,如IO的accept、read、write以及connect操作。這樣就能提高Reactor線程的IO響應,避免因耗時的業務邏輯而降低對后續IO請求的處理。

二.使用線程池的好處

合理使用線程池,可以帶來很多好處:

好處一:減少頻繁創建和銷毀線程的性能開銷

好處二:重復利用線程,避免對每個任務都創建線程,可以提高響應速度

好處三:合理設置線程池的大小,可以避免因為線程池過大影響性能

三.單線程Reactor不適合高并發場景

對于一些小容量的應用場景,可以使用單線程Reactor模型,但是對于一些高負載、大并發或大數據量的應用場景卻不合適。

原因一:一個NIO線程同時處理成百上千的鏈路,性能上無法支撐。即便NIO線程的CPU負荷達到100%,也無法滿足海量消息的讀取和發送。

原因二:當NIO線程負載過重之后,處理速度將變慢,這會導致大量客戶端連接超時。超時之后往往會進行重發,這更加重了NIO線程的負載。最終會導致大量消息積壓和處理超時,成為系統的性能瓶頸。

(3)多線程的Reactor模式

圖片

一.多線程的Reactor模式介紹

在多線程的Reactor模式下,存在一個Reactor線程池,Reactor線程池里的每一個Reactor線程都有自己的Selector和事件分發邏輯。

Reactor線程池里的主反應器線程mainReactor可以只有一個,但子反應器線程subReactor一般會有多個,通常subReactor也是一個線程池。

主反應器線程mainReactor主要負責接收客戶端的連接請求,然后將接收到的SocketChannel傳遞給子反應器線程subReactor,由subReactor來完成和客戶端的通信。

二.多線程的Reactor模式流程

首先服務端的Reactor變成了多個線程對象,分為mainReactor和subReactor。這些Reactor對象也會啟動事件循環,并使用Selector(選擇器)來實現IO多路復用。

然后服務端在啟動時會注冊一個Acceptor事件處理器到mainReactor中。這個Acceptor事件處理器會關注accept事件,這樣mainReactor監聽到accept事件就會交給Acceptor事件處理器進行處理。

當客戶端向服務端發起一個連接請求時,mainReactor就會監聽到一個accept事件,于是就會將這個accept事件派發給Acceptor處理器來進行處理。

接著Acceptor處理器通過accept()方法便能得到這個客戶端對應的連接(SocketChannel),然后將這個連接SocketChannel傳遞給subReactor線程池進行處理。

subReactor線程池會分配一個subReactor線程給這個SocketChannel,并將SocketChannel關注的read事件及對應的read事件處理器注冊到這個subReactor線程中。當然也會注冊關注的write事件及write事件處理器到subReactor線程中以完成IO寫操作。

總之,Reactor線程池中的每一Reactor線程都會有自己的Selector和事件分發邏輯。當有IO事件就緒時,相關的subReactor就將事件派發給響應的處理器處理。

注意,這里subReactor線程只負責完成IO的read()操作,在讀取到數據后將業務邏輯的處理放入到工作線程池中完成。若完成業務邏輯后需要返回數據給客戶端,則IO的write()操作還是會被提交回subReactor線程來完成。這樣所有的IO操作依舊在Reactor線程(mainReactor或subReactor)中完成,而工作線程池僅用來處理非IO操作的邏輯。

多Reactor線程模式將"接收客戶端的連接請求"和"與該客戶端進行讀寫通信"分在了兩個Reactor線程來完成。mainReactor完成接收客戶端連接請求的操作,它不負責與客戶端的通信,而是將建立好的連接轉交給subReactor線程來完成與客戶端的通信,這樣就不會因為read()操作的數據量太大而導致后面的客戶端連接請求得不到及時處理。并且多Reactor線程模式在海量的客戶端并發請求的情況下,還可以通過subReactor線程池將海量的連接分發給多個subReactor線程,在多核的操作系統中這能大大提升應用的負載和吞吐量。

Netty服務端就是使用了多線程的Reactor模式。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/73075.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/73075.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/73075.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Git前言(版本控制)

1.Git 目前世界上最先進的分布式版本控制系統。 git官網&#xff1a;https://git-scm.com/ 2.版本控制 2.1什么是版本控制 版本控制(Revision control)是一種在開發的過程中用于管理我們對文件、目錄或工程等內容修改歷史&#xff0c;方便查看更改歷史記錄備份以便恢復以前…

調試正常 ≠ 運行正常:Keil5中MicroLIB的“量子態BUG”破解實錄

調試正常 ≠ 運行正常&#xff1a;Keil5中MicroLIB的“量子態BUG”破解實錄——從勾選一個選項到理解半主機模式&#xff0c;嵌入式開發的認知升級 &#x1f4cc; 現象描述&#xff1a;調試與燒錄的詭異差異 在線調試時 程序正常運行 - 獨立運行時 設備無響應 ! 編譯過程 0 Err…

算法每日一練 (9)

&#x1f4a2;歡迎來到張胤塵的技術站 &#x1f4a5;技術如江河&#xff0c;匯聚眾志成。代碼似星辰&#xff0c;照亮行征程。開源精神長&#xff0c;傳承永不忘。攜手共前行&#xff0c;未來更輝煌&#x1f4a5; 文章目錄 算法每日一練 (9)最小路徑和題目描述解題思路解題代碼…

【高項】信息系統項目管理師(四)項目整合管理【4分】

一、管理基礎 項目整合管理的責任不能被授權或轉移&#xff0c;項目經理必須對整個項目承擔最終責任。 執行項目整合時項目經理承擔雙重角色&#xff1a; 1、組織層面上&#xff0c;項目經理扮演重要角色&#xff0c;與項目發起人攜手合作&#xff0c;了解戰略目標并確保項目目…

ECEF與ENU坐標系定義及C語言實現

一、ECEF與ENU坐標系定義 ECEF坐標系&#xff08;地心地固坐標系&#xff09; 原點&#xff1a;地球質心X軸&#xff1a;指向本初子午線與赤道交點Y軸&#xff1a;在赤道平面內與X軸垂直Z軸&#xff1a;指向北極數學表示&#xff1a; P e c e f ( x , y , z ) P_{ecef} (x,…

sql語句分頁的關鍵字是?

在 SQL 中&#xff0c;分頁通常是通過限制查詢結果的數量并指定從哪一行開始獲取數據來實現的。不同的數據庫系統使用不同的分頁關鍵字。 以下是常見數據庫系統的分頁關鍵字&#xff1a; MySQL / PostgreSQL / SQLite 使用 LIMIT 和 OFFSET 來進行分頁&#xff1a; LIMIT 限…

大模型中的剪枝、蒸餾是什么意思?

環境&#xff1a; 剪枝 蒸餾 問題描述&#xff1a; 大模型中的剪枝、蒸餾是什么意思&#xff1f; 解決方案&#xff1a; 大模型的剪枝&#xff08;Pruning&#xff09;和蒸餾&#xff08;Distillation&#xff09;是兩種常見的模型優化技術&#xff0c;用于減少模型的大小…

初次體驗Tauri和Sycamore(3)通道實現

? 原創作者&#xff1a;莊曉立&#xff08;LIIGO&#xff09; 原創時間&#xff1a;2025年03月10日&#xff08;發布時間&#xff09; 原創鏈接&#xff1a;https://blog.csdn.net/liigo/article/details/146159327 版權所有&#xff0c;轉載請注明出處。 20250310 LIIGO備注&…

代碼隨想錄|二叉樹|07二叉樹周末總結

對前面01~06二叉樹內容進行小結&#xff0c;直接看下面的總結文檔&#xff1a; 本周小結&#xff01;&#xff08;二叉樹&#xff09; | 代碼隨想錄

藍耘賦能通義萬相 2.1:用 C++ 構建高效 AI 視頻生成生態

目錄 開篇&#xff1a;AI 視頻生成新時代的號角 通義萬相 2.1&#xff1a;AI 視頻生成的領軍者 核心技術揭秘 功能特點展示 與其他模型的全面對比 C&#xff1a;高效編程的基石 C 的發展歷程與特性 C 在 AI 領域的廣泛應用 通義萬相 2.1 與 C 的完美融合 融合的意義與…

【一句話經驗】ubuntu vi/vim 模式自動設置為paste

從centos過來&#xff0c;發現ubutun有些地方不習慣&#xff0c;尤其是vi的粘貼&#xff0c;默認自動進去了代碼模式&#xff0c;導致每次粘貼必須得set paste&#xff0c;否則會出現問題。 解決辦法非常簡單&#xff0c;按照下面命令執行即可&#xff1a; cd ~ echo "…

自然語言處理文本分析:從詞袋模型到認知智能的進化之旅

清晨&#xff0c;當智能音箱準確識別出"播放周杰倫最新專輯"的模糊語音指令時&#xff1b;午間&#xff0c;企業輿情系統自動標記出十萬條評論中的負面情緒&#xff1b;深夜&#xff0c;科研人員用GPT-4解析百萬篇論文發現新材料線索——這些場景背后&#xff0c;是自…

《Python基礎教程》附錄B筆記:Python參考手冊

《Python基礎教程》第1章筆記&#x1f449;https://blog.csdn.net/holeer/article/details/143052930 附錄B Python參考手冊 Python標準文檔是完整的參考手冊。本附錄只是一個便利的速查表&#xff0c;當你開始使用Python進行編程后&#xff0c;它可幫助你喚醒記憶。 B.1 表…

uniapp+Vue3 組件之間的傳值方法

一、父子傳值&#xff08;props / $emit 、ref / $refs&#xff09; 1、props / $emit 父組件通過 props 向子組件傳遞數據&#xff0c;子組件通過 $emit 觸發事件向父組件傳遞數據。 父組件&#xff1a; // 父組件中<template><view class"container">…

【MySQL篇】MySQL基本查詢詳解

目錄 前言&#xff1a; 1&#xff0c;Create 1.1&#xff0c;單行數據全列插入 1.2&#xff0c;單行數據指定列插入 1.3&#xff0c;多行數據全列插入 1.4&#xff0c;多行數據指定列插入 1.5&#xff0c;插入否則更新 1.6&#xff0c;替換 2&#xff0c;Retrieve …

【Python入門】一篇掌握Python中的字典(創建、訪問、修改、字典方法)【詳細版】

&#x1f308; 個人主頁&#xff1a;十二月的貓-CSDN博客 &#x1f525; 系列專欄&#xff1a; &#x1f3c0;《Python/PyTorch極簡課》_十二月的貓的博客-CSDN博客 &#x1f4aa;&#x1f3fb; 十二月的寒冬阻擋不了春天的腳步&#xff0c;十二點的黑夜遮蔽不住黎明的曙光 目…

每日一題——兩數相加

兩數相加 問題描述問題分析解題思路代碼實現代碼解析注意事項示例運行總結 問題描述 給定兩個非空鏈表&#xff0c;表示兩個非負整數。鏈表中的每個節點存儲一個數字&#xff0c;數字的存儲順序為逆序&#xff08;即個位在鏈表頭部&#xff09;。要求將這兩個數字相加&#xff…

制作自定義鏡像

1. 確定軟件包 確定自己的環境都需要哪些命令&#xff0c;然后&#xff0c;從鏡像文件或者yum源下載響應的安裝包。 bash基本是必選的 &#xff08;bash-5.1.8-10.oe2203sp2.aarch64.rpm&#xff09; vim也是有必要的 &#xff08;vim-enhanced-9.0-15.oe2203sp2.aarch64.rpm…

WHAT - 前端性能指標

目錄 核心 Web Vitals&#xff08;Core Web Vitals&#xff09;加載性能指標網絡相關指標交互和響應性能指標內存與效率指標推薦的監控工具優化策略與建議推薦學習路線 作為前端開發者&#xff0c;理解并掌握關鍵的性能指標對優化 Web 應用至關重要。 以下是前端性能優化中常見…

C++20 模塊:告別頭文件,迎接現代化的模塊系統

文章目錄 引言一、C20模塊簡介1.1 傳統頭文件的局限性1.2 模塊的出現 二、模塊的基本概念2.1 模塊聲明2.2 模塊接口單元2.3 模塊實現單元 三、模塊的優勢3.1 編譯時間大幅減少3.2 更好的依賴管理3.3 命名空間隔離 四、如何使用C20模塊4.1 編譯器支持4.2 示例項目4.3 編譯和運行…