文章目錄
- 概述
- 一、Socket
- 二、NIO三大組件與事件
- 三、Reactor模式
- 四、NIO通信案例
- 4.1、服務端
- 4.2、客戶端
本文為個人學習筆記整理,僅供交流參考,非專業教學資料,內容請自行甄別
概述
??前篇中提到,BIO是阻塞的IO,阻塞體現在建立連接和通信時,并且線程模型是1:1的。即使使用線程池進行處理,也受限于最大線程數以及cpu上下文的切換。
??NIO則是非阻塞的IO,利用了Reactor反應器模式和多路復用機制。可以實現服務端一個線程應對多個客戶端的連接和請求而不阻塞。
一、Socket
??在計算機網絡中,TCP、UDP是運輸層的協議,應用層需要依靠運輸層提供服務。Socket是位于應用層和運輸層之間的一個中間軟件抽象層
,屏蔽了運輸層協議的一些實現細節,例如TCP的三次握手,四次揮手,滑動窗口算法等。
??從設計模式的角度來看,Socket屬于典型的外觀模式
,用戶無需關心底層的具體實現細節,通過Socket提供的API即可實現網絡編程。
??主機 A 的應用程序要能和主機 B 的應用程序通信,必須通過 Socket 建立連接,無論是客戶端連接上服務端,還是服務端接收了客戶端的連接,都會產生一個Socket 實例:
創建服務端
接收客戶端的連接
啟動客戶端
二、NIO三大組件與事件
??NIO的三大組件:
- Selector:選擇器,客戶端和服務端會將自己感興趣的事件注冊到選擇器上,在有事件到達時,選擇器通知訂閱者去執行相應的操作。(Reactor模式),通過一個Selector去管理多個Channel,就稱為NIO的多路復用機制。
- Channel:通道,是應用間傳遞數據的渠道,是雙工通信,應用程序可以通過通道讀取數據,也可以通過通道向操作系統寫數據,而且可以同時進行讀寫。
- Buffer:緩沖區,BIO和NIO的區別其一就是,BIO是面向字節流的,而NIO是面向緩沖區的。數據是從通道讀入緩沖區,從緩沖區寫入到通道中。
- 在寫入時,應用程序將數據寫入Buffer,再通過Channel將Buffer中的數據傳遞出去。
- 在讀取時,先將Channel中的數據讀入Buffer,應用程序再從Buffer中讀取數據。
??與Selector和Channel相關的,還有一個SelectionKey。SelectionKey是一個抽象類,維護了Selector和Channel的關系。SelectionKey定義了四個事件:
- OP_READ:讀事件,當緩沖區中有數據可讀時,觸發該事件,通常需要主動去關注,以接收到緩沖區有數據可讀的通知,并進行處理。
- OP_WRITE:寫事件,當緩沖區有空閑時,就會觸發該事件。一般情況下是無需關注該事件的,因為緩沖區不會被占滿,關注該事件浪費CPU資源。
- OP_CONNECT:連接事件,當
SocketChannel.connect()
請求連接成功后就緒,只允許客戶端關注。 - OP_ACCEPT:接收連接事件,當服務器接收到客戶端的連接后就需,只允許服務端關注。
??對于服務端而言,通常是ServerSocketChannel
創建后注冊OP_ACCEPT
事件,在接收到客戶端的連接,產生SocketChannel
后,關注OP_READ
事件。
??對于客戶端而言,通常是SocketChannel
創建后,執行連接服務端的操作,因為通過SocketChannel
的configureBlocking
方法,將通道設置成了非阻塞
,所以調用connect()
方法可能立即返回,而不會等待連接建立。連接過程在后臺異步進行,線程可以繼續執行其他任務,就要根據connect()
方法的返回值,執行不同的操作:
- 如果
connect()
方法返回true,代表連接建立成功,就關注OP_READ
事件。 - 如果
connect()
方法返回false,代表連接沒有建立完成,繼續關注OP_CONNECT
事件。
??后續在進行事件處理時,如果關注了OP_CONNECT
事件,就再次判斷通道是否建立完成,如果建立完成,就關注OP_READ
事件。
三、Reactor模式
??Reactor模式是NIO的底層實現機制,體現在NIO的Selector選擇器上。
圖片來源:圖靈學院
??Reactor模式的思想,簡單來說,就是用戶向選擇器進行注冊,告訴選擇器自己所關心的事件,當對應的事件發生時,再由選擇器去通知用戶,然后用戶執行自己的操作。所以選擇器是Reactor模式的核心組件。
??Reactor模式主要由兩個核心部分組成:
- Reactor:負責監聽和分發I/O事件(如連接事件、讀寫事件)。它運行在一個獨立線程中,通過選擇器(Selector)實現I/O多路復用,不斷輪詢注冊的事件源。當事件就緒時,Reactor將其分發給對應的事件處理器。
- 處理資源池:通常是一個線程池,負責執行事件處理邏輯(如讀取數據、執行業務邏輯、發送響應)。這避免了耗時操作阻塞Reactor線程。
??假設一個Web服務器使用Reactor模式:
3. 主線程(Reactor線程)注冊SocketChannel的READ事件,并調用selector.select()阻塞。
4. 同時,一個工作線程(來自處理資源池)可以處理之前接收到的HTTP請求(如解析數據、訪問緩存)。
5. 當新數據到達時,Reactor線程喚醒,派發READ事件給處理器,然后工作線程接管處理。
??Reactor模式也有三種實現:
- 單線程的reactor:連接,讀取,發送數據,處理業務,都在一個線程中執行。
- 多線程工作reactor:連接,讀取,發送數據,由一個線程完成,業務工作由線程池去處理。
- 多線程主從reactor:連接由一個線程完成,讀取,發送數據,由另一個線程完成,業務工作由線程池去處理。
四、NIO通信案例
4.1、服務端
public class NioServerDemo {public static void main(String[] args) throws IOException {//netty服務端 開啟ssc//everSocketChannel:對連接建立的事件感興趣,是對SeverSocket的包裝。ServerSocketChannel ssc = ServerSocketChannel.open();//開啟選擇器Selector selector = Selector.open();//設置為非阻塞ssc.configureBlocking(false);//綁定端口ssc.socket().bind(new InetSocketAddress(8080));//Selector登記SSC對連接事件感興趣。//SelectionKey是SSC和SC向Selector注冊的相關的事件的代號。標記了不同的事件。讀,寫,連接,接受連接事件。ssc.register(selector, SelectionKey.OP_ACCEPT);while (true) {//獲取當前有哪些事件selector.select(1000);Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();//防止事件重復被處理iterator.remove();//進行處理handle(key, selector);}}}private static void handle(SelectionKey key, Selector selector) throws IOException {if (key.isValid()) {/*處理新加入客戶端的請求假設此時有新連接到達Selector*/if (key.isAcceptable()) {//拿到服務端的ssc//Selector將新連接分配給SSC。ServerSocketChannel ssc = (ServerSocketChannel) key.channel();//接受請求,得到sc//SSC接收完連接后,和客戶端進行三次握手,產生一個Socket,然后將Socket包裝成SCSocketChannel sc = ssc.accept();System.out.println("==========建立連接=========");//同樣設置為非阻塞sc.configureBlocking(false);//Selector登記SC對讀事件感興趣//關注讀事件sc.register(selector, SelectionKey.OP_READ);}/*處理讀事件假設此時客戶端發送了數據到Selector,通知SC有數據過來了。*/ else if (key.isReadable()) {//sc:socketchannel 處理讀事件//由SC和客戶端進行實際的網絡讀寫。SocketChannel sc = (SocketChannel) key.channel();//創建緩沖區//并非直接將數據交付給應用層,而是經過buffer緩沖區。發送數據也是要經過buffer。//buffer本質上是內存中的一塊區域,需要讀寫模式切換ByteBuffer byteBuffer = ByteBuffer.allocate(1024);//從通道讀消息,寫入緩沖區int bytes = sc.read(byteBuffer);if (bytes > 0) {//切換模式byteBuffer.flip();byte[] bs = new byte[byteBuffer.remaining()];byteBuffer.get(bs);String message = new String(bs, StandardCharsets.UTF_8);System.out.println("服務器收到消息:" + message);//發送應答消息到客戶端doWrite(sc, message);} else if (bytes < 0) {/*取消特定的注冊關系*/key.cancel();/*關閉通道*/sc.close();}}}}/*發送應答消息*/private static void doWrite(SocketChannel sc, String response) throws IOException {byte[] bytes = response.getBytes();ByteBuffer buffer = ByteBuffer.allocate(bytes.length);//將服務端的響應存入buffer中buffer.put(bytes);//切換模式buffer.flip();//寫入客戶端(從buffer中讀,寫入對方 )sc.write(buffer);}}
4.2、客戶端
public class NioClientDemo {private static SocketChannel sc;public static void main(String[] args) throws IOException {new Thread(()->{try {client();} catch (IOException e) {throw new RuntimeException(e);}}).start();Scanner scanner = new Scanner(System.in);String next = scanner.next();doWrite(sc,next);}private static void client() throws IOException {//創建選擇器Selector selector = Selector.open();//創建客戶端通道sc = SocketChannel.open();//設置通道為非阻塞sc.configureBlocking(false);//非阻塞的連接//如果sc.connect返回true,代表連接已經建立完成if (sc.connect(new InetSocketAddress("127.0.0.1", 8080))) {//可以關注讀取事件sc.register(selector, SelectionKey.OP_READ);}//否則連接還沒有建立完成else {//繼續關注建立連接事件sc.register(selector, SelectionKey.OP_CONNECT);}//輪詢選擇器while (true) {selector.select(1000);Iterator<SelectionKey> it = selector.selectedKeys().iterator();while (it.hasNext()) {SelectionKey sk = it.next();it.remove();if (sk.isValid()) {//獲得關心當前事件的channelSocketChannel socketChannel = (SocketChannel) sk.channel();//連接事件if (sk.isConnectable()) {//查看通道連接是否建立完成if (sc.finishConnect()) {//建立完成,注冊讀事件socketChannel.register(selector,SelectionKey.OP_READ);} else{System.exit(1);}}else if (sk.isReadable()){//創建ByteBuffer,并開辟一個1M的緩沖區ByteBuffer buffer = ByteBuffer.allocate(1024);//將channel中的數據讀取到buffer中int readBytes = sc.read(buffer);//讀取到字節,對字節進行編解碼if(readBytes>0){//將緩沖區當前的limit設置為position,position=0,// 用于后續對緩沖區的讀取操作buffer.flip();//根據緩沖區可讀字節數創建字節數組byte[] bytes = new byte[buffer.remaining()];//將緩沖區可讀字節數組復制到新建的數組中buffer.get(bytes);String result = new String(bytes,"UTF-8");System.out.println("客戶端收到消息:" + result);}//鏈路已經關閉,釋放資源else if(readBytes<0){sk.cancel();sc.close();}}}}}}private static void doWrite(SocketChannel channel,String request)throws IOException {//將消息編碼為字節數組byte[] bytes = request.getBytes();//根據數組容量創建ByteBufferByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);//將字節數組復制到緩沖區writeBuffer.put(bytes);//flip操作writeBuffer.flip();//發送緩沖區的字節數組/*關心事件和讀寫網絡并不沖突*/channel.write(writeBuffer);}
}