4.3.4 并發性
選擇器對象是線程安全的,但它們包含的鍵集合不是。通過keys( )和selectKeys( )返回的鍵的集合是Selector對象內部的私有的Set對象集合的直接引用。這些集合可能在任意時間被改變。已注冊的鍵的集合是只讀的。如果您試圖修改它,那么您得到的獎品將是一個java.lang.UnsupportedOperationException,但是當您在觀察它們的時候,它們可能發生了改變的話,您仍然會遇到麻煩。Iterator對象是快速失敗的(fail-fast):如果底層的Set被改變了,它們將會拋出java.util.ConcurrentModificationException,因此如果您期望在多個線程間共享選擇器和/或鍵,請對此做好準備。您可以直接修改選擇鍵,但請注意您這么做時可能會徹底破壞另一個線程的Iterator。
如果在多個線程并發地訪問一個選擇器的鍵的集合的時候存在任何問題,您可以采取一些步驟來合理地同步訪問。在執行選擇操作時,選擇器在Selector對象上進行同步,然后是已注冊的鍵的集合,最后是已選擇的鍵的集合,按照這樣的順序。已取消的鍵的集合也在選擇過程的的第1步和第3步之間保持同步(當與已取消的鍵的集合相關的通道被注銷時)
在多線程的場景中,如果您需要對任何一個鍵的集合進行更改,不管是直接更改還是其他操作帶來的副作用,您都需要首先以相同的順序,在同一對象上進行同步。鎖的過程是非常重要的。如果競爭的線程沒有以相同的順序請求鎖,就將會有死鎖的潛在隱患。如果您可以確保否其他線程不會同時訪問選擇器,那么就不必要進行同步了。
Selector類的close( )方法與slect( )方法的同步方式是一樣的,因此也有一直阻塞的可能性。在選擇過程還在進行的過程中,所有對close( )的調用都會被阻塞,直到選擇過程結束,或者執行選擇的線程進入睡眠。在后面的情況下,執行選擇的線程將會在執行關閉的線程獲得鎖是立即被喚醒,并關閉選擇器(參見4.3.2小節)。
4.4 異步可關閉性
任何時候都有可能關閉一個通道或者取消一個選擇鍵。除非您采取步驟進行同步,否則鍵的狀態及相關的通道將發生意料之外的改變。一個特定的鍵的集合中的一個鍵的存在并不保證鍵仍然是有效的,或者它相關的通道仍然是打開的。
關閉通道的過程不應該是一個耗時的操作。NIO的設計者們特別想要阻止這樣的可能性:一個線程在關閉一個處于選擇操作中的通道時,被阻塞于無限期的等待。當一個通道關閉時,它相關的鍵也就都被取消了。這并不會影響正在進行的select( ),但這意味著在您調用select( )之前仍然是有效的鍵,在返回時可能會變為無效。您總是可以使用由選擇器的selectKeys( )方法返回的已選擇的鍵的集合:請不要自己維護鍵的集合。理解3.4.5小節描述的選擇過程,對于避免遇到問題而言是非常重要的。
您可以參考4.3.2小節,以詳細了解一個在select( )中阻塞的線程是如何被喚醒的。如果您試圖使用一個已經失效的鍵,大多數方法將拋出CancelledKeyException。但是,您可以安全地從從已取消的鍵中獲取通道的句柄。如果通道已經關閉時,仍然試圖使用它的話,在大多數情況下將引發ClosedChannelException
?
4.5 選擇過程的可擴展性
我多次提到選擇器可以簡化用單線程同時管理多個可選擇通道的實現。使用一個線程來為多個通道提供服務,通過消除管理各個線程的額外開銷,可能會降低復雜性并可能大幅提升性能。但只使用一個線程來服務所有可選擇的通道是否是一個好主意呢?這要看情況。
對單CPU的系統而言這可能是一個好主意,因為在任何情況下都只有一個線程能夠運行。通過消除在線程之間進行上下文切換帶來的額外開銷,總吞吐量可以得到提高。但對于一個多CPU的系統呢?在一個有n個CPU的系統上,當一個單一的線程線性地輪流處理每一個線程時,可能有n-1個cpu處于空閑狀態
那么讓不同道請求不同的服務類的辦法如何?想象一下,如果一個應用程序為大量的分布式的傳感器記錄信息。每個傳感器在服務線程遍歷每個就緒的通道時需要等待數秒鐘。這在響應時間不重要時是可以的。但對于高優先級的連接(如操作命令),如果只用一個線程為所有通道提供服務,將不得不在隊列中等待。不同的應用程序的要求也是不同的。您采用的策略會受到您嘗試解決的問題的影響。
在第一個場景中,如果您想要將更多的線程來為通道提供服務,請抵抗住使用多個選擇器的欲望。在大量通道上執行就緒選擇并不會有很大的開銷,大多數工作是由底層操作系統完成的。管理多個選擇器并隨機地將通道分派給它們當中的一個并不是這個問題的合理的解決方案。這只會形成這個場景的一個更小的版本。一個更好的策略是對所有的可選擇通道使用一個選擇器,并將對就緒通道的服務委托給其他線程。您只用一個線程監控通道的就緒狀態并使用一個協調好的工作線程池來處理共接收到的數據。根據部署的條件,線程池的大小是可以調整的(或者它自己進行動態的調整)。對可選擇通道的管理仍然是簡單的,而簡單的就是好的。
第二個場景中,某些通道要求比其他通道更高的響應速度,可以通過使用兩個選擇器來解決:一個為命令連接服務,另一個為普通連接服務。但這種場景也可以使用與第一個場景十分相似的辦法來解決。與將所有準備好的通道放到同一個線程池的做法不同,通道可以根據功能由不同的工作線程來處理。它們可能可以是日志線程池,命令/控制線程池,狀態請求線程池,等等。
例 4-2的代碼是例4-1的一般性的選擇循環的擴展。它覆寫了readDataFromSocket( )方法,并使用線程池來為準備好數據用于讀取的通道提供服務。與在主線程中同步地讀取數據不同,這個版本的實現將SelectionKey對象傳遞給為其服務的工作線程。
?
例 4-2. 使用線程池來為通道提供服務
?
/*** */ package test.noi.select;import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator;/*** * Simple echo-back server which listens for incoming stream connections and* * echoes back whatever it reads. A single Selector object is used to listen to* * the server socket (to accept new connections) and all the active socket* * channels.* * @author Ron Hitchens (ron@ronsoft.com)*/ public class SelectSockets {public static int PORT_NUMBER = 1234;public static void main(String[] argv) throws Exception {new SelectSockets().go(argv);}public void go(String[] argv) throws Exception {int port = PORT_NUMBER;if (argv.length > 0) {// Override default listen portport = Integer.parseInt(argv[0]);}System.out.println("Listening on port " + port);ServerSocketChannel serverChannel = ServerSocketChannel.open();// Get the associated ServerSocket to bind it withServerSocket serverSocket = serverChannel.socket();// Create a new Selector for use belowSelector selector = Selector.open();// Set the port the server channel will listen toserverSocket.bind(new InetSocketAddress(port));// Set nonblocking mode for the listening socketserverChannel.configureBlocking(false);// Register the ServerSocketChannel with the Selector serverChannel.register(selector, SelectionKey.OP_ACCEPT);while (true) {// This may block for a long time. Upon returning, the// selected set contains keys of the ready channels.int n = selector.select();if (n == 0) {continue;// nothing to do }// Get an iterator over the set of selected keysIterator it = selector.selectedKeys().iterator();// Look at each key in the selected setwhile (it.hasNext()) {SelectionKey key = (SelectionKey) it.next();// Is a new connection coming in?if (key.isAcceptable()) {ServerSocketChannel server = (ServerSocketChannel) key.channel();SocketChannel channel = server.accept();registerChannel(selector, channel, SelectionKey.OP_READ);sayHello(channel);}// Is there data to read on this channel?if (key.isReadable()) {readDataFromSocket(key);}// Remove key from selected set; it's been handled it.remove();}}}/*** * Register the given channel with the given selector for the given ** operations of interest*/protected void registerChannel(Selector selector,SelectableChannel channel, int ops) throws Exception {if (channel == null) {return; // could happen}// Set the new channel nonblockingchannel.configureBlocking(false);// Register it with the selector channel.register(selector, ops);}// ----------------------------------------------------------// Use the same byte buffer for all channels. A single thread is// servicing all the channels, so no danger of concurrent acccess.private ByteBuffer buffer = ByteBuffer.allocateDirect(1024);/*** * Sample data handler method for a channel with data ready to read. * * @param* key * A SelectionKey object associated with a channel determined by * the* selector to be ready for reading. If the channel returns* * * an EOF condition, it is closed here, which automatically * invalidates* the associated key. The selector will then * de-register the channel on* the next select call.*/protected void readDataFromSocket(SelectionKey key) throws Exception {SocketChannel socketChannel = (SocketChannel) key.channel();int count;buffer.clear();// Empty buffer// Loop while data is available;channel is nonblockingwhile ((count = socketChannel.read(buffer)) > 0) {buffer.flip();// Make buffer readable// Send the data; don't assume it goes all at oncewhile (buffer.hasRemaining()) {socketChannel.write(buffer);}// WARNING: the above loop is evil. Because// it's writing back to the same nonblocking// channel it read the data from, this code can// potentially spin in a busy loop. In real life// you'd do something more useful than this. buffer.clear();// Empty buffer }if (count < 0) {// Close channel on EOF, invalidates the key socketChannel.close();}}/*** * Spew a greeting to the incoming client connection. * * @param channel ** The newly connected SocketChannel to say hello to.*/private void sayHello(SocketChannel channel) throws Exception {buffer.clear();buffer.put("Hi there!\r\n".getBytes());buffer.flip();channel.write(buffer);} }
?
?
?
由于執行選擇過程的線程將重新循環并幾乎立即再次調用select( ),鍵的interest集合將被修改,并將interest(感興趣的操作)從讀取就緒(read-rreadiness)狀態中移除。這將防止選擇器重復地調用readDataFromSocket( )(因為通道仍然會準備好讀取數據,直到工作線程從它那里讀取數據)。當工作線程結束為通道提供的服務時,它將再次更新鍵的ready集合,來將interest重新放到讀取就緒集合中。它也會在選擇器上顯式地調用wakeup( )。如果主線程在select( )中被阻塞,這將使它繼續執行。這個選擇循環會再次執行一個輪回(可能什么也沒做)并帶著被更新的鍵重新進入select( )。
?
?
?
以上內容出自 nio 一書
?