一、簡介
? ? ? ?在我的上一篇文章Java NIO詳解(一)中介紹了關于標準輸入輸出NIO相關知識, 本篇將重點介紹基于網絡編程NIO(異步IO)。
二、異步IO
? ? ? ?異步 I/O?是一種沒有阻塞地讀寫數據的方法。通常,在代碼進行?read()?調用時,代碼會阻塞直至有可供讀取的數據。同樣,?write()調用將會阻塞直至數據能夠寫入,關于同步的IO請參考另一篇文章Java IO。
? ? ? ?另一方面,異步 I/O 調用不但不會阻塞,相反,您可以注冊對特定 I/O 事件諸如數據可讀、新連接到來等等,而在發生這樣感興趣的事件時,系統將會告訴您。
? ? ? ?異步 I/O 的一個優勢在于,它允許您同時根據大量的輸入和輸出執行 I/O。同步程序常常要求助于輪詢,或者創建許許多多的線程以處理大量的連接。使用異步 I/O,您可以監聽任何數量的通道上的事件,不用輪詢,也不用額外的線程。
三、Selector
? ? ? ?在我的Java NIO詳解(一)中已經詳細介紹了Java NIO三個核心對象中的Buffer和Channel,現在我們就重點介紹一下第三個核心對象Selector。Selector是一個對象,它可以注冊到很多個Channel上,監聽各個Channel上發生的事件,并且能夠根據事件情況決定Channel讀寫。這樣,通過一個線程管理多個Channel,就可以處理大量網絡連接了。
1、采用Selector模式的的好處
? ? ? ?有了Selector,我們就可以利用一個線程來處理所有的channels。線程之間的切換對操作系統來說代價是很高的,并且每個線程也會占用一定的系統資源。所以,對系統來說使用的線程越少越好。
? ? ? ?但是,需要記住,現代的操作系統和CPU在多任務方面表現的越來越好,所以多線程的開銷隨著時間的推移,變得越來越小了。實際上,如果一個CPU有多個內核,不使用多任務可能是在浪費CPU能力。不管怎么說,關于那種設計的討論應該放在另一篇不同的文章中。在這里,只要知道使用Selector能夠處理多個通道就足夠了。
下面這幅圖展示了一個線程處理3個 Channel的情況:
2、如何創建一個Selector
? ? ? ?異步 I/O 中的核心對象名為 Selector。Selector 就是您注冊對各種 I/O 事件興趣的地方,而且當那些事件發生時,就是這個對象告訴您所發生的事件。
Selector selector = Selector.open();
? ? ? ?然后,就需要注冊Channel到Selector了。
3、如何注冊Channel到Selector
? ? ? ?為了能讓Channel和Selector配合使用,我們需要把Channel注冊到Selector上。通過調用?channel.register()方法來實現注冊:
channel.configureBlocking(false);
SelectionKey key =channel.register(selector,SelectionKey.OP_READ);
? ? ? ?注意,注冊的Channel?
必須設置成異步模式?才可以,,否則異步IO就無法工作,這就意味著我們不能把一個
FileChannel注冊到Selector,因為FileChannel沒有異步模式,但是網絡編程中的
SocketChannel是可以的。
? ? ? ?需要注意register()方法的第二個參數,它是一個 “interest set”,意思是注冊的Selector對Channel中的哪些時間感興趣,事件類型有四種:
- Connect
- Accept
- Read
- Write
? ? ? ?通道觸發了一個事件意思是該事件已經?Ready(就緒)。所以,某個Channel成功連接到另一個服務器稱為Connect Ready。一個ServerSocketChannel準備好接收新連接稱為?Accept Ready,一個有數據可讀的通道可以說是?Read Ready,等待寫數據的通道可以說是Write Ready。
上面這四個事件對應到SelectionKey中的四個常量:
1. SelectionKey.OP_CONNECT
2. SelectionKey.OP_ACCEPT
3. SelectionKey.OP_READ
4. SelectionKey.OP_WRITE
如果你對多個事件感興趣,可以通過or操作符來連接這些常量:
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
四、關于SelectionKey
? ? ? ?請注意對register()的調用的返回值是一個SelectionKey。 SelectionKey 代表這個通道在此 Selector 上的這個注冊。當某個 Selector 通知您某個傳入事件時,它是通過提供對應于該事件的 SelectionKey 來進行的。SelectionKey 還可以用于取消通道的注冊。SelectionKey中包含如下屬性:
- The interest set
- The ready set
- The Channel
- The Selector
- An attached object (optional)
Interest Set
? ? ? ?就像我們在前面講到的把Channel注冊到Selector來監聽感興趣的事件,interest set就是你要選擇的感興趣的事件的集合。你可以通過SelectionKey對象來讀寫interest set:
int interestSet = selectionKey.interestOps();
boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
? ? ? ?通過上面例子可以看到,我們可以通過用AND 和SelectionKey 中的常量做運算,從SelectionKey中找到我們感興趣的事件。
Ready Set
? ? ? ?ready set?是通道已經準備就緒的操作的集合。在一次選Selection之后,你應該會首先訪問這個ready set。Selection將在下一小節進行解釋。可以這樣訪問ready集合:
int readySet = selectionKey.readyOps();
? ? ? ?可以用像檢測interest集合那樣的方法,來檢測Channel中什么事件或操作已經就緒。但是,也可以使用以下四個方法,它們都會返回一個布爾類型:
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();
Channel 和Selector
? ? ? ?我們可以通過SelectionKey獲得Selector和注冊的Channel:
Channel channel = selectionKey.channel();
Selector selector = selectionKey.selector();
Attach 一個對象
? ? ? ?可以將一個對象或者更多信息 attach?到SelectionKey上,這樣就能方便的識別某個給定的通道。例如,可以附加 與通道一起使用的Buffer,或是包含聚集數據的某個對象。使用方法如下:selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();
? ? ? ?還可以在用register()方法向Selector注冊Channel的時候附加對象。如:?
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
五、通過Selector選擇通道
? ? ? ?一旦向Selector注冊了一或多個通道,就可以調用幾個重載的select()方法。這些方法返回你所感興趣的事件(如連接、接受、讀或寫)已經準備就緒的那些通道。換句話說,如果你對“Read Ready”的通道感興趣,select()方法會返回讀事件已經就緒的那些通道:
- int select(): 阻塞到至少有一個通道在你注冊的事件上就緒
- int select(long timeout):select()一樣,除了最長會阻塞timeout毫秒(參數)
- int selectNow(): 不會阻塞,不管什么通道就緒都立刻返回,此方法執行非阻塞的選擇操作。如果自從前一次選擇操作后,沒有通道變成可選擇的,則此方法直接返回零。
? ? ? ?select()方法返回的int值表示有多少通道已經就緒。亦即,自上次調用select()方法后有多少通道變成就緒狀態。如果調用select()方法,因為有一個通道變成就緒狀態,返回了1,若再次調用select()方法,如果另一個通道就緒了,它會再次返回1。如果對第一個就緒的channel沒有做任何操作,現在就有兩個就緒的通道,但在每次select()方法調用之間,只有一個通道處于就緒狀態。
selectedKeys()
? ? ? ?一旦調用了 select()方法,它就會返回一個數值,表示一個或多個通道已經就緒,然后你就可以通過調用 selector.selectedKeys()方法返回的SelectionKey集合來獲得就緒的Channel。請看演示方法:Set<SelectionKey> selectedKeys = selector.selectedKeys();
? ? ? ?當你通過Selector注冊一個Channel時,
channel.register()方法會返回一個SelectionKey對象,這個對象就代表了你注冊的Channel。這些對象可以通過
selectedKeys()方法獲得。你可以通過迭代這些selected key來獲得就緒的Channel,下面是演示代碼:
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if(key.isAcceptable()) {// a connection was accepted by a ServerSocketChannel.
} else if (key.isConnectable()) {// a connection was established with a remote server.
} else if (key.isReadable()) {// a channel is ready for reading
} else if (key.isWritable()) {// a channel is ready for writing
}
keyIterator.remove();
}
? ? ? ?這個循環遍歷selected key的集合中的每個key,并對每個key做測試來判斷哪個Channel已經就緒。
? ? ? ?請注意循環中最后的keyIterator.remove()方法。Selector對象并不會從自己的selected key集合中自動移除SelectionKey實例。我們需要在處理完一個Channel的時候自己去移除。當下一次Channel就緒的時候,Selector會再次把它添加到selected key集合中。
SelectionKey.channel()方法返回的Channel需要轉換成你具體要處理的類型,比如是ServerSocketChannel或者SocketChannel等等。
WakeUp()和Close()
? ? ? ?某個線程調用select()方法后阻塞了,即使沒有通道就緒,也有辦法讓其從select()方法返回。只要讓其它線程在第一個線程調用select()方法的那個對象上調用Selector.wakeup()
方法即可。阻塞在select()方法上的線程會立馬返回。
? ? ? ?如果有其它線程調用了wakeup()方法,但當前沒有線程阻塞在select()方法上,下個調用select()方法的線程會立即“醒來(wake up)”
? ? ? ?當用完Selector后調應道掉用close()方法,它將關閉Selector并且使注冊到該Selector上的所有SelectionKey實例無效。通道本身并不會關閉。
六、一個完整的例子
下面通過一個MultiPortEcho的例子來演示一下上面整個過程。
public class MultiPortEcho {private int ports[];private ByteBuffer echoBuffer = ByteBuffer.allocate(1024);public MultiPortEcho(int ports[]) throws IOException {this.ports = ports;go();}private void go() throws IOException {// 1. 創建一個selector,select是NIO中的核心對象// 它用來監聽各種感興趣的IO事件Selector selector = Selector.open();// 為每個端口打開一個監聽, 并把這些監聽注冊到selector中for (int i = 0; i < ports.length; ++i) {//2. 打開一個ServerSocketChannel//其實我們沒監聽一個端口就需要一個channelServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);//設置為非阻塞ServerSocket ss = ssc.socket();InetSocketAddress address = new InetSocketAddress(ports[i]);ss.bind(address);//監聽一個端口//3. 注冊到selector//register的第一個參數永遠都是selector//第二個參數是我們要監聽的事件//OP_ACCEPT是新建立連接的事件//也是適用于ServerSocketChannel的唯一事件類型SelectionKey key = ssc.register(selector, SelectionKey.OP_ACCEPT);System.out.println("Going to listen on " + ports[i]);}//4. 開始循環,我們已經注冊了一些IO興趣事件while (true) {//這個方法會阻塞,直到至少有一個已注冊的事件發生。當一個或者更多的事件發生時// select() 方法將返回所發生的事件的數量。int num = selector.select();//返回發生了事件的 SelectionKey 對象的一個 集合Set selectedKeys = selector.selectedKeys();//我們通過迭代 SelectionKeys 并依次處理每個 SelectionKey 來處理事件//對于每一個 SelectionKey,您必須確定發生的是什么 I/O 事件,以及這個事件影響哪些 I/O 對象。Iterator it = selectedKeys.iterator();while (it.hasNext()) {SelectionKey key = (SelectionKey) it.next();//5. 監聽新連接。程序執行到這里,我們僅注冊了 ServerSocketChannel//并且僅注冊它們“接收”事件。為確認這一點//我們對 SelectionKey 調用 readyOps() 方法,并檢查發生了什么類型的事件if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {//6. 接收了一個新連接。因為我們知道這個服務器套接字上有一個傳入連接在等待//所以可以安全地接受它;也就是說,不用擔心 accept() 操作會阻塞ServerSocketChannel ssc = (ServerSocketChannel) key.channel();SocketChannel sc = ssc.accept();sc.configureBlocking(false);// 7. 講新連接注冊到selector。將新連接的 SocketChannel 配置為非阻塞的//而且由于接受這個連接的目的是為了讀取來自套接字的數據,所以我們還必須將 SocketChannel 注冊到 Selector上SelectionKey newKey = sc.register(selector,SelectionKey.OP_READ);it.remove();System.out.println("Got connection from " + sc);} else if ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {// Read the dataSocketChannel sc = (SocketChannel) key.channel();// Echo dataint bytesEchoed = 0;while (true) {echoBuffer.clear();int r = sc.read(echoBuffer);if (r <= 0) {break;}echoBuffer.flip();sc.write(echoBuffer);bytesEchoed += r;}System.out.println("Echoed " + bytesEchoed + " from " + sc);it.remove();}}// System.out.println( "going to clear" );// selectedKeys.clear();// System.out.println( "cleared" );}}static public void main(String args2[]) throws Exception {String args[]={"9001","9002","9003"};if (args.length <= 0) {System.err.println("Usage: java MultiPortEcho port [port port ...]");System.exit(1);}int ports[] = new int[args.length];for (int i = 0; i < args.length; ++i) {ports[i] = Integer.parseInt(args[i]);}new MultiPortEcho(ports);}}