2019獨角獸企業重金招聘Python工程師標準>>>
前言
Java NIO 由以下幾個核心部分組成:
1、Buffer
2、Channel
3、Selector
Buffer和Channel在深入淺出NIO之Channel、Buffer一文中已經介紹過,本文主要講解NIO的Selector實現原理。
之前進行socket編程時,accept方法會一直阻塞,直到有客戶端請求的到來,并返回socket進行相應的處理。整個過程是流水線的,處理完一個請求,才能去獲取并處理后面的請求,當然也可以把獲取socket和處理socket的過程分開,一個線程負責accept,一個線程池負責處理請求。
但NIO提供了更好的解決方案,采用選擇器(Selector)返回已經準備好的socket,并按順序處理,基于通道(Channel)和緩沖區(Buffer)來進行數據的傳輸。
Selector
這里出來一個新概念,selector,具體是一個什么樣的東西?
想想一個場景:在一個養雞場,有這么一個人,每天的工作就是不停檢查幾個特殊的雞籠,如果有雞進來,有雞出去,有雞生蛋,有雞生病等等,就把相應的情況記錄下來,如果雞場的負責人想知道情況,只需要詢問那個人即可。
在這里,這個人就相當Selector,每個雞籠相當于一個SocketChannel,每個線程通過一個Selector可以管理多個SocketChannel。
為了實現Selector管理多個SocketChannel,必須將具體的SocketChannel對象注冊到Selector,并聲明需要監聽的事件(這樣Selector才知道需要記錄什么數據),一共有4種事件:
1、connect:客戶端連接服務端事件,對應值為SelectionKey.OP_CONNECT(8)
2、accept:服務端接收客戶端連接事件,對應值為SelectionKey.OP_ACCEPT(16)
3、read:讀事件,對應值為SelectionKey.OP_READ(1)
4、write:寫事件,對應值為SelectionKey.OP_WRITE(4)
這個很好理解,每次請求到達服務器,都是從connect開始,connect成功后,服務端開始準備accept,準備就緒,開始讀數據,并處理,最后寫回數據返回。
所以,當SocketChannel有對應的事件發生時,Selector都可以觀察到,并進行相應的處理。
服務端代碼
為了更好的理解,先看一段服務端的示例代碼
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.socket().bind(new InetSocketAddress(port));
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while(true){int n = selector.select();if (n == 0) continue;Iterator ite = this.selector.selectedKeys().iterator();while(ite.hasNext()){SelectionKey key = (SelectionKey)ite.next();if (key.isAcceptable()){SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept();clntChan.configureBlocking(false);//將選擇器注冊到連接到的客戶端信道,//并指定該信道key值的屬性為OP_READ,//同時為該信道指定關聯的附件clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufSize));}if (key.isReadable()){handleRead(key);}if (key.isWritable() && key.isValid()){handleWrite(key);}if (key.isConnectable()){System.out.println("isConnectable = true");}ite.remove();}
}
服務端操作過程
1、創建ServerSocketChannel實例,并綁定指定端口;
2、創建Selector實例;
3、將serverSocketChannel注冊到selector,并指定事件OP_ACCEPT,最底層的socket通過channel和selector建立關聯;
4、如果沒有準備好的socket,select方法會被阻塞一段時間并返回0;
5、如果底層有socket已經準備好,selector的select方法會返回socket的個數,而且selectedKeys方法會返回socket對應的事件(connect、accept、read or write);
6、根據事件類型,進行不同的處理邏輯;
在步驟3中,selector只注冊了serverSocketChannel的OP_ACCEPT事件
1、如果有客戶端A連接服務,執行select方法時,可以通過serverSocketChannel獲取客戶端A的socketChannel,并在selector上注冊socketChannel的OP_READ事件。
2、如果客戶端A發送數據,會觸發read事件,這樣下次輪詢調用select方法時,就能通過socketChannel讀取數據,同時在selector上注冊該socketChannel的OP_WRITE事件,實現服務器往客戶端寫數據。
Selector實現原理
SocketChannel、ServerSocketChannel和Selector的實例初始化都通過SelectorProvider類實現,其中Selector是整個NIO Socket的核心實現。
public static SelectorProvider provider() {synchronized (lock) {if (provider != null)return provider;return AccessController.doPrivileged(new PrivilegedAction<SelectorProvider>() {public SelectorProvider run() {if (loadProviderFromProperty())return provider;if (loadProviderAsService())return provider;provider = sun.nio.ch.DefaultSelectorProvider.create();return provider;}});}
}
SelectorProvider在windows和linux下有不同的實現,provider方法會返回對應的實現。
這里不禁要問,Selector是如何做到同時管理多個socket?
下面我們看看Selector的具體實現,Selector初始化時,會實例化PollWrapper、SelectionKeyImpl數組和Pipe。
WindowsSelectorImpl(SelectorProvider sp) throws IOException {super(sp);pollWrapper = new PollArrayWrapper(INIT_CAP);wakeupPipe = Pipe.open();wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();// Disable the Nagle algorithm so that the wakeup is more immediateSinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();(sink.sc).socket().setTcpNoDelay(true);wakeupSinkFd = ((SelChImpl)sink).getFDVal();pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}
pollWrapper用Unsafe類申請一塊物理內存pollfd,存放socket句柄fdVal和events,其中pollfd共8位,0-3位保存socket句柄,4-7位保存events。
?
pollWrapper提供了fdVal和event數據的相應操作,如添加操作通過Unsafe的putInt和putShort實現。
void putDescriptor(int i, int fd) {pollArray.putInt(SIZE_POLLFD * i + FD_OFFSET, fd);
}
void putEventOps(int i, int event) {pollArray.putShort(SIZE_POLLFD * i + EVENT_OFFSET, (short)event);
}
先看看serverChannel.register(selector, SelectionKey.OP_ACCEPT)
是如何實現的
public final SelectionKey register(Selector sel, int ops, Object att)throws ClosedChannelException {synchronized (regLock) {SelectionKey k = findKey(sel);if (k != null) {k.interestOps(ops);k.attach(att);}if (k == null) {// New registrationsynchronized (keyLock) {if (!isOpen())throw new ClosedChannelException();k = ((AbstractSelector)sel).register(this, ops, att);addKey(k);}}return k;}
}
- 如果該channel和selector已經注冊過,則直接添加事件和附件。
- 否則通過selector實現注冊過程。
protected final SelectionKey register(AbstractSelectableChannel ch,int ops, Object attachment) {if (!(ch instanceof SelChImpl))throw new IllegalSelectorException();SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);k.attach(attachment);synchronized (publicKeys) {implRegister(k);}k.interestOps(ops);return k;
}protected void implRegister(SelectionKeyImpl ski) {synchronized (closeLock) {if (pollWrapper == null)throw new ClosedSelectorException();growIfNeeded();channelArray[totalChannels] = ski;ski.setIndex(totalChannels);fdMap.put(ski);keys.add(ski);pollWrapper.addEntry(totalChannels, ski);totalChannels++;}
}
1、以當前channel和selector為參數,初始化SelectionKeyImpl 對象selectionKeyImpl ,并添加附件attachment。
2、如果當前channel的數量totalChannels等于SelectionKeyImpl數組大小,對SelectionKeyImpl數組和pollWrapper進行擴容操作。
3、如果totalChannels % MAX_SELECTABLE_FDS == 0,則多開一個線程處理selector。
4、pollWrapper.addEntry將把selectionKeyImpl中的socket句柄添加到對應的pollfd。
5、k.interestOps(ops)方法最終也會把event添加到對應的pollfd。
所以,不管serverSocketChannel,還是socketChannel,在selector注冊的事件,最終都保存在pollArray中。
接著,再來看看selector中的select是如何實現一次獲取多個有事件發生的channel的,底層由selector實現類的doSelect方法實現,如下:
protected int doSelect(long timeout) throws IOException {if (channelArray == null)throw new ClosedSelectorException();this.timeout = timeout; // set selector timeoutprocessDeregisterQueue();if (interruptTriggered) {resetWakeupSocket();return 0;}// Calculate number of helper threads needed for poll. If necessary// threads are created here and start waiting on startLockadjustThreadsCount();finishLock.reset(); // reset finishLock// Wakeup helper threads, waiting on startLock, so they start polling.// Redundant threads will exit here after wakeup.startLock.startThreads();// do polling in the main thread. Main thread is responsible for// first MAX_SELECTABLE_FDS entries in pollArray.try {begin();try {subSelector.poll();} catch (IOException e) {finishLock.setException(e); // Save this exception}// Main thread is out of poll(). Wakeup others and wait for themif (threads.size() > 0)finishLock.waitForHelperThreads();} finally {end();}// Done with poll(). Set wakeupSocket to nonsignaled for the next run.finishLock.checkForException();processDeregisterQueue();int updated = updateSelectedKeys();// Done with poll(). Set wakeupSocket to nonsignaled for the next run.resetWakeupSocket();return updated;}
其中 subSelector.poll() 是select的核心,由native函數poll0實現,readFds、writeFds 和exceptFds數組用來保存底層select的結果,數組的第一個位置都是存放發生事件的socket的總數,其余位置存放發生事件的socket句柄fd。
private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];
private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];
private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];
private int poll() throws IOException{ // poll for the main threadreturn poll0(pollWrapper.pollArrayAddress,Math.min(totalChannels, MAX_SELECTABLE_FDS),readFds, writeFds, exceptFds, timeout);
}
執行 selector.select() ,poll0函數把指向socket句柄和事件的內存地址傳給底層函數。
1、如果之前沒有發生事件,程序就阻塞在select處,當然不會一直阻塞,因為epoll在timeout時間內如果沒有事件,也會返回;
2、一旦有對應的事件發生,poll0方法就會返回;
3、processDeregisterQueue方法會清理那些已經cancelled的SelectionKey;
4、updateSelectedKeys方法統計有事件發生的SelectionKey數量,并把符合條件發生事件的SelectionKey添加到selectedKeys哈希表中,提供給后續使用。
在早期的JDK1.4和1.5 update10版本之前,Selector基于select/poll模型實現,是基于IO復用技術的非阻塞IO,不是異步IO。在JDK1.5 update10和linux core2.6以上版本,sun優化了Selctor的實現,底層使用epoll替換了select/poll。
read實現
通過遍歷selector中的SelectionKeyImpl數組,獲取發生事件的socketChannel對象,其中保存了對應的socket,實現如下
public int read(ByteBuffer buf) throws IOException {if (buf == null)throw new NullPointerException();synchronized (readLock) {if (!ensureReadOpen())return -1;int n = 0;try {begin();synchronized (stateLock) {if (!isOpen()) { return 0;}readerThread = NativeThread.current();}for (;;) {n = IOUtil.read(fd, buf, -1, nd);if ((n == IOStatus.INTERRUPTED) && isOpen()) {// The system call was interrupted but the channel// is still open, so retrycontinue;}return IOStatus.normalize(n);}} finally {readerCleanup(); // Clear reader thread// The end method, which end(n > 0 || (n == IOStatus.UNAVAILABLE));// Extra case for socket channels: Asynchronous shutdown//synchronized (stateLock) {if ((n <= 0) && (!isInputOpen))return IOStatus.EOF;}assert IOStatus.check(n);}}
}
最終通過Buffer的方式讀取socket的數據。
wakeup實現
public Selector wakeup() {synchronized (interruptLock) {if (!interruptTriggered) {setWakeupSocket();interruptTriggered = true;}}return this;
}// Sets Windows wakeup socket to a signaled state.
private void setWakeupSocket() {setWakeupSocket0(wakeupSinkFd);
}
private native void setWakeupSocket0(int wakeupSinkFd);
看來wakeupSinkFd這個變量是為wakeup方法使用的。
其中interruptTriggered為中斷已觸發標志,當pollWrapper.interrupt()之后,該標志即為true了;因為這個標志,連續兩次wakeup,只會有一次效果。
epoll原理
epoll是Linux下的一種IO多路復用技術,可以非常高效的處理數以百萬計的socket句柄。
三個epoll相關的系統調用:
- int epoll_create(int size)
epoll_create建立一個epoll對象。參數size是內核保證能夠正確處理的最大句柄數,多于這個最大數時內核可不保證效果。 - int epoll_ctl(int epfd, int op, int fd, struct epoll_event event)
epoll_ctl可以操作epoll_create創建的epoll,如將socket句柄加入到epoll中讓其監控,或把epoll正在監控的某個socket句柄移出epoll。 - int epoll_wait(int epfd, struct epoll_event events,int maxevents, int timeout)
epoll_wait在調用時,在給定的timeout時間內,所監控的句柄中有事件發生時,就返回用戶態的進程。
epoll內部實現大概如下:
- epoll初始化時,會向內核注冊一個文件系統,用于存儲被監控的句柄文件,調用epoll_create時,會在這個文件系統中創建一個file節點。同時epoll會開辟自己的內核高速緩存區,以紅黑樹的結構保存句柄,以支持快速的查找、插入、刪除。還會再建立一個list鏈表,用于存儲準備就緒的事件。
- 當執行epoll_ctl時,除了把socket句柄放到epoll文件系統里file對象對應的紅黑樹上之外,還會給內核中斷處理程序注冊一個回調函數,告訴內核,如果這個句柄的中斷到了,就把它放到準備就緒list鏈表里。所以,當一個socket上有數據到了,內核在把網卡上的數據copy到內核中后,就把socket插入到就緒鏈表里。
- 當epoll_wait調用時,僅僅觀察就緒鏈表里有沒有數據,如果有數據就返回,否則就sleep,超時時立刻返回。
覺得不錯請點贊支持,歡迎留言或進我的個人群855801563領取【架構資料專題目合集90期】、【BATJTMD大廠JAVA面試真題1000+】,本群專用于學習交流技術、分享面試機會,拒絕廣告,我也會在群內不定期答題、探討。