深入淺出NIO之Selector實現原理

2019獨角獸企業重金招聘Python工程師標準>>> hot3.png

前言

Java NIO 由以下幾個核心部分組成:
1、Buffer
2、Channel
3、Selector

Buffer和Channel在深入淺出NIO之Channel、Buffer一文中已經介紹過,本文主要講解NIO的Selector實現原理。

之前進行socket編程時,accept方法會一直阻塞,直到有客戶端請求的到來,并返回socket進行相應的處理。整個過程是流水線的,處理完一個請求,才能去獲取并處理后面的請求,當然也可以把獲取socket和處理socket的過程分開,一個線程負責accept,一個線程池負責處理請求。

但NIO提供了更好的解決方案,采用選擇器(Selector)返回已經準備好的socket,并按順序處理,基于通道(Channel)和緩沖區(Buffer)來進行數據的傳輸。

Selector

這里出來一個新概念,selector,具體是一個什么樣的東西?

想想一個場景:在一個養雞場,有這么一個人,每天的工作就是不停檢查幾個特殊的雞籠,如果有雞進來,有雞出去,有雞生蛋,有雞生病等等,就把相應的情況記錄下來,如果雞場的負責人想知道情況,只需要詢問那個人即可。

在這里,這個人就相當Selector,每個雞籠相當于一個SocketChannel,每個線程通過一個Selector可以管理多個SocketChannel。

為了實現Selector管理多個SocketChannel,必須將具體的SocketChannel對象注冊到Selector,并聲明需要監聽的事件(這樣Selector才知道需要記錄什么數據),一共有4種事件:

1、connect:客戶端連接服務端事件,對應值為SelectionKey.OP_CONNECT(8)
2、accept:服務端接收客戶端連接事件,對應值為SelectionKey.OP_ACCEPT(16)
3、read:讀事件,對應值為SelectionKey.OP_READ(1)
4、write:寫事件,對應值為SelectionKey.OP_WRITE(4)

這個很好理解,每次請求到達服務器,都是從connect開始,connect成功后,服務端開始準備accept,準備就緒,開始讀數據,并處理,最后寫回數據返回。

所以,當SocketChannel有對應的事件發生時,Selector都可以觀察到,并進行相應的處理。

服務端代碼

為了更好的理解,先看一段服務端的示例代碼

ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.socket().bind(new InetSocketAddress(port));
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while(true){int n = selector.select();if (n == 0) continue;Iterator ite = this.selector.selectedKeys().iterator();while(ite.hasNext()){SelectionKey key = (SelectionKey)ite.next();if (key.isAcceptable()){SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept();clntChan.configureBlocking(false);//將選擇器注冊到連接到的客戶端信道,//并指定該信道key值的屬性為OP_READ,//同時為該信道指定關聯的附件clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufSize));}if (key.isReadable()){handleRead(key);}if (key.isWritable() && key.isValid()){handleWrite(key);}if (key.isConnectable()){System.out.println("isConnectable = true");}ite.remove();}
}

服務端操作過程

1、創建ServerSocketChannel實例,并綁定指定端口;
2、創建Selector實例;
3、將serverSocketChannel注冊到selector,并指定事件OP_ACCEPT,最底層的socket通過channel和selector建立關聯;
4、如果沒有準備好的socket,select方法會被阻塞一段時間并返回0;
5、如果底層有socket已經準備好,selector的select方法會返回socket的個數,而且selectedKeys方法會返回socket對應的事件(connect、accept、read or write);
6、根據事件類型,進行不同的處理邏輯;

在步驟3中,selector只注冊了serverSocketChannel的OP_ACCEPT事件
1、如果有客戶端A連接服務,執行select方法時,可以通過serverSocketChannel獲取客戶端A的socketChannel,并在selector上注冊socketChannel的OP_READ事件。
2、如果客戶端A發送數據,會觸發read事件,這樣下次輪詢調用select方法時,就能通過socketChannel讀取數據,同時在selector上注冊該socketChannel的OP_WRITE事件,實現服務器往客戶端寫數據。

Selector實現原理

SocketChannel、ServerSocketChannel和Selector的實例初始化都通過SelectorProvider類實現,其中Selector是整個NIO Socket的核心實現。

public static SelectorProvider provider() {synchronized (lock) {if (provider != null)return provider;return AccessController.doPrivileged(new PrivilegedAction<SelectorProvider>() {public SelectorProvider run() {if (loadProviderFromProperty())return provider;if (loadProviderAsService())return provider;provider = sun.nio.ch.DefaultSelectorProvider.create();return provider;}});}
}

SelectorProvider在windows和linux下有不同的實現,provider方法會返回對應的實現。

這里不禁要問,Selector是如何做到同時管理多個socket?

下面我們看看Selector的具體實現,Selector初始化時,會實例化PollWrapper、SelectionKeyImpl數組和Pipe。

WindowsSelectorImpl(SelectorProvider sp) throws IOException {super(sp);pollWrapper = new PollArrayWrapper(INIT_CAP);wakeupPipe = Pipe.open();wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();// Disable the Nagle algorithm so that the wakeup is more immediateSinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();(sink.sc).socket().setTcpNoDelay(true);wakeupSinkFd = ((SelChImpl)sink).getFDVal();pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}

pollWrapper用Unsafe類申請一塊物理內存pollfd,存放socket句柄fdVal和events,其中pollfd共8位,0-3位保存socket句柄,4-7位保存events。

?

pollWrapper提供了fdVal和event數據的相應操作,如添加操作通過Unsafe的putInt和putShort實現。

void putDescriptor(int i, int fd) {pollArray.putInt(SIZE_POLLFD * i + FD_OFFSET, fd);
}
void putEventOps(int i, int event) {pollArray.putShort(SIZE_POLLFD * i + EVENT_OFFSET, (short)event);
}

先看看serverChannel.register(selector, SelectionKey.OP_ACCEPT)是如何實現的

public final SelectionKey register(Selector sel, int ops, Object att)throws ClosedChannelException {synchronized (regLock) {SelectionKey k = findKey(sel);if (k != null) {k.interestOps(ops);k.attach(att);}if (k == null) {// New registrationsynchronized (keyLock) {if (!isOpen())throw new ClosedChannelException();k = ((AbstractSelector)sel).register(this, ops, att);addKey(k);}}return k;}
}
  1. 如果該channel和selector已經注冊過,則直接添加事件和附件。
  2. 否則通過selector實現注冊過程。
protected final SelectionKey register(AbstractSelectableChannel ch,int ops,  Object attachment) {if (!(ch instanceof SelChImpl))throw new IllegalSelectorException();SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);k.attach(attachment);synchronized (publicKeys) {implRegister(k);}k.interestOps(ops);return k;
}protected void implRegister(SelectionKeyImpl ski) {synchronized (closeLock) {if (pollWrapper == null)throw new ClosedSelectorException();growIfNeeded();channelArray[totalChannels] = ski;ski.setIndex(totalChannels);fdMap.put(ski);keys.add(ski);pollWrapper.addEntry(totalChannels, ski);totalChannels++;}
}

1、以當前channel和selector為參數,初始化SelectionKeyImpl 對象selectionKeyImpl ,并添加附件attachment。
2、如果當前channel的數量totalChannels等于SelectionKeyImpl數組大小,對SelectionKeyImpl數組和pollWrapper進行擴容操作。
3、如果totalChannels % MAX_SELECTABLE_FDS == 0,則多開一個線程處理selector。
4、pollWrapper.addEntry將把selectionKeyImpl中的socket句柄添加到對應的pollfd。
5、k.interestOps(ops)方法最終也會把event添加到對應的pollfd。

所以,不管serverSocketChannel,還是socketChannel,在selector注冊的事件,最終都保存在pollArray中。

接著,再來看看selector中的select是如何實現一次獲取多個有事件發生的channel的,底層由selector實現類的doSelect方法實現,如下:

 protected int doSelect(long timeout) throws IOException {if (channelArray == null)throw new ClosedSelectorException();this.timeout = timeout; // set selector timeoutprocessDeregisterQueue();if (interruptTriggered) {resetWakeupSocket();return 0;}// Calculate number of helper threads needed for poll. If necessary// threads are created here and start waiting on startLockadjustThreadsCount();finishLock.reset(); // reset finishLock// Wakeup helper threads, waiting on startLock, so they start polling.// Redundant threads will exit here after wakeup.startLock.startThreads();// do polling in the main thread. Main thread is responsible for// first MAX_SELECTABLE_FDS entries in pollArray.try {begin();try {subSelector.poll();} catch (IOException e) {finishLock.setException(e); // Save this exception}// Main thread is out of poll(). Wakeup others and wait for themif (threads.size() > 0)finishLock.waitForHelperThreads();} finally {end();}// Done with poll(). Set wakeupSocket to nonsignaled  for the next run.finishLock.checkForException();processDeregisterQueue();int updated = updateSelectedKeys();// Done with poll(). Set wakeupSocket to nonsignaled  for the next run.resetWakeupSocket();return updated;}

其中 subSelector.poll() 是select的核心,由native函數poll0實現,readFds、writeFds 和exceptFds數組用來保存底層select的結果,數組的第一個位置都是存放發生事件的socket的總數,其余位置存放發生事件的socket句柄fd。

private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];
private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];
private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];
private int poll() throws IOException{ // poll for the main threadreturn poll0(pollWrapper.pollArrayAddress,Math.min(totalChannels, MAX_SELECTABLE_FDS),readFds, writeFds, exceptFds, timeout);
}

執行 selector.select() ,poll0函數把指向socket句柄和事件的內存地址傳給底層函數。
1、如果之前沒有發生事件,程序就阻塞在select處,當然不會一直阻塞,因為epoll在timeout時間內如果沒有事件,也會返回;
2、一旦有對應的事件發生,poll0方法就會返回;
3、processDeregisterQueue方法會清理那些已經cancelled的SelectionKey;
4、updateSelectedKeys方法統計有事件發生的SelectionKey數量,并把符合條件發生事件的SelectionKey添加到selectedKeys哈希表中,提供給后續使用。

在早期的JDK1.4和1.5 update10版本之前,Selector基于select/poll模型實現,是基于IO復用技術的非阻塞IO,不是異步IO。在JDK1.5 update10和linux core2.6以上版本,sun優化了Selctor的實現,底層使用epoll替換了select/poll。

read實現

通過遍歷selector中的SelectionKeyImpl數組,獲取發生事件的socketChannel對象,其中保存了對應的socket,實現如下

public int read(ByteBuffer buf) throws IOException {if (buf == null)throw new NullPointerException();synchronized (readLock) {if (!ensureReadOpen())return -1;int n = 0;try {begin();synchronized (stateLock) {if (!isOpen()) {         return 0;}readerThread = NativeThread.current();}for (;;) {n = IOUtil.read(fd, buf, -1, nd);if ((n == IOStatus.INTERRUPTED) && isOpen()) {// The system call was interrupted but the channel// is still open, so retrycontinue;}return IOStatus.normalize(n);}} finally {readerCleanup();        // Clear reader thread// The end method, which end(n > 0 || (n == IOStatus.UNAVAILABLE));// Extra case for socket channels: Asynchronous shutdown//synchronized (stateLock) {if ((n <= 0) && (!isInputOpen))return IOStatus.EOF;}assert IOStatus.check(n);}}
}

最終通過Buffer的方式讀取socket的數據。

wakeup實現

public Selector wakeup() {synchronized (interruptLock) {if (!interruptTriggered) {setWakeupSocket();interruptTriggered = true;}}return this;
}// Sets Windows wakeup socket to a signaled state.
private void setWakeupSocket() {setWakeupSocket0(wakeupSinkFd);
}
private native void setWakeupSocket0(int wakeupSinkFd);

看來wakeupSinkFd這個變量是為wakeup方法使用的。
其中interruptTriggered為中斷已觸發標志,當pollWrapper.interrupt()之后,該標志即為true了;因為這個標志,連續兩次wakeup,只會有一次效果。

epoll原理

epoll是Linux下的一種IO多路復用技術,可以非常高效的處理數以百萬計的socket句柄。

三個epoll相關的系統調用:

  • int epoll_create(int size)
    epoll_create建立一個epoll對象。參數size是內核保證能夠正確處理的最大句柄數,多于這個最大數時內核可不保證效果。
  • int epoll_ctl(int epfd, int op, int fd, struct epoll_event event)
    epoll_ctl可以操作epoll_create創建的epoll,如將socket句柄加入到epoll中讓其監控,或把epoll正在監控的某個socket句柄移出epoll。
  • int epoll_wait(int epfd, struct epoll_event events,int maxevents, int timeout)
    epoll_wait在調用時,在給定的timeout時間內,所監控的句柄中有事件發生時,就返回用戶態的進程。

epoll內部實現大概如下:

  1. epoll初始化時,會向內核注冊一個文件系統,用于存儲被監控的句柄文件,調用epoll_create時,會在這個文件系統中創建一個file節點。同時epoll會開辟自己的內核高速緩存區,以紅黑樹的結構保存句柄,以支持快速的查找、插入、刪除。還會再建立一個list鏈表,用于存儲準備就緒的事件。
  2. 當執行epoll_ctl時,除了把socket句柄放到epoll文件系統里file對象對應的紅黑樹上之外,還會給內核中斷處理程序注冊一個回調函數,告訴內核,如果這個句柄的中斷到了,就把它放到準備就緒list鏈表里。所以,當一個socket上有數據到了,內核在把網卡上的數據copy到內核中后,就把socket插入到就緒鏈表里。
  3. 當epoll_wait調用時,僅僅觀察就緒鏈表里有沒有數據,如果有數據就返回,否則就sleep,超時時立刻返回。

覺得不錯請點贊支持,歡迎留言或進我的個人群855801563領取【架構資料專題目合集90期】、【BATJTMD大廠JAVA面試真題1000+】,本群專用于學習交流技術、分享面試機會,拒絕廣告,我也會在群內不定期答題、探討。

轉載于:https://my.oschina.net/u/3959491/blog/3032417

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/252004.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/252004.shtml
英文地址,請注明出處:http://en.pswp.cn/news/252004.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

介紹一下畫圖小能手matplotlib。

我們在做完數據分析的時候需要把分析出來的結果&#xff0c;做一個圖形化的形象表達&#xff0c;這里我們就需要用到畫圖小能手matplotlib&#xff0c;下面就演示一下常用的條形圖和折線圖 散點圖 散點圖的做大的作用是研究兩個變量的相關性&#xff08;正相關&#xff0c;負相…

立體視覺標定源代碼C++,簡單粗暴!粗暴·······

疑點解答&#xff1a; 攝像機矩陣由內參矩陣和外參矩陣組成&#xff0c;對攝像機矩陣進行QR分解可以得到內參矩陣和外參矩陣。 內參包括焦距、主點、傾斜系數、畸變系數 &#xff08;1&#xff09; 其中&#xff0c;fx&#xff0c;fy為焦距&#xff0c;一般情況下&#xff…

11. 臨時表

-- 查詢5大洲國家總數 SELECT continent,COUNT(*) FROM country GROUP BY continent;-- 演示臨時表 CREATE TEMPORARY TABLE tmp_table ( continent VARCHAR(20), COUNT INT );INSERT INTO tmp_table SELECT Asia AS continent,COUNT(*) FROM country WHERE continent Asia;…

MongoDB負載信息一目了然 阿里云HDM重磅發布MongoDB監控和診斷功

2019獨角獸企業重金招聘Python工程師標準>>> 混合云數據庫管理&#xff08;HDM&#xff09;的統一監控、告警、診斷功能新增了對MongoDB的支持。 通過直觀的方式將MongoDB多個維度的負載信息統一整合&#xff0c;不僅可以清晰的查看實時負載信息&#xff0c;也可以方…

在iview的Table中添加Select(render)

首先對Render進行分析&#xff0c;在iview官方的文檔中&#xff0c;找到了table插入Button的例子&#xff1a; [javascript] view plaincopy { title: Action, key: action, width: 150, align: center, render: (h, params) > { return h(div, [ h(Butt…

JavaScript中call和apply方法

1 /*2 在js中 call和apply常用于綁定作用域3 */4 //1 簡單的綁定5 function sum(a,b){6 return ab;7 }8 //將sum的功能綁定給test2來執行9 function test2(a,b){ 10 return sum.call(this,a,b); 11 } 12 // call 和apply的區別是 apply接收數組作為參數…

工業機械人運動學正逆解,簡單粗暴!!!!!!

ur機械臂是六自由度機械臂&#xff0c;由D-H參數法確定它的運動學模型&#xff0c;連桿坐標系的建立如上圖所示。 轉動關節θi是關節變量&#xff0c;連桿偏移di是常數。 關節編號 α&#xff08;繞x軸&#xff09; a&#xff08;沿x軸&#xff09; θ&#xff08;繞z軸&am…

python opencv立體測距 立體匹配BM算法

立體標定應用標定數據轉換成深度圖標定 在開始之前&#xff0c;需要準備的當然是兩個攝相頭&#xff0c;根據你的需求將兩個攝像頭進行相對位置的固定&#xff0c;我是按平行來進行固定的&#xff08;如果為了追求兩個雙目圖像更高的生命度&#xff0c;也可以將其按一定鈍角固…

對于python 作用域新的理解

今天看Python習題&#xff0c;看到如下題目 def num():return [lambda x: i*x for i in range(4)] print([m(2) for m in num()])  # 求輸出結果是什么 我看了半天才明白這應該是一個列表生成式&#xff0c;列表中的元素為四個匿名函數&#xff0c;我本以為每個匿名函數應該是…

Vue基礎學習(一)------內部指令

一.v-if v-else v-show 指令 1.v-if v-if:是vue 的一個內部指令&#xff0c;指令用在我們的html中,用來判斷是否加載html的DOM 現在舉個栗子&#xff0c;判斷用戶的登錄操作&#xff0c;用isLogin作為一個判斷字段&#xff0c;登錄成功&#xff0c;就顯示用戶的名稱 代碼&…

【bzoj3555】[Ctsc2014]企鵝QQ 簡單哈希

傳送門 題目分析 題意即求有多少對字符串只相差一個字符&#xff0c;枚舉刪除每個字符后的哈希&#xff0c; 看有多少相等即可。 比如有如下字符串&#xff1a;$Sd123$&#xff0c;其中S部分的哈希值為H&#xff0c;刪除的是d&#xff0c;則原字符串的哈希值為$$(((H * T d) *…

StereoRectify()函數定義及用法畸變矯正與立體校正

畸變矯正是上一篇博文的遺留問題&#xff0c;當畸變系數和內外參數矩陣標定完成后&#xff0c;就應該進行畸變的矯正&#xff0c;以達到消除畸變的目的&#xff0c;此其一。 在該系列第一部分的博文中介紹的立體成像原理中提到&#xff0c;要通過兩幅圖像估計物點的深度信息&a…

死磕 java集合之TreeMap源碼分析(三)- 內含紅黑樹分析全過程

2019獨角獸企業重金招聘Python工程師標準>>> 歡迎關注我的公眾號“彤哥讀源碼”&#xff0c;查看更多源碼系列文章, 與彤哥一起暢游源碼的海洋。 刪除元素 刪除元素本身比較簡單&#xff0c;就是采用二叉樹的刪除規則。 &#xff08;1&#xff09;如果刪除的位置有兩…

Linux:進程實例信息(/proc)

https://blog.csdn.net/test1280/article/details/73632333 Linux:進程實例信息&#xff08;/proc&#xff09; 問幾個問題&#xff1a; 1.怎么知道一個進程對應哪個可執行文件&#xff1f; 2.怎么知道一個進程的資源限制&#xff1f; 3.怎么知道一個進程所處的環境&#xff1f…

四元素理解

旋轉變換_四元數 2017年03月29日 11:59:38 csxiaoshui 閱讀數&#xff1a;5686 1.簡介 四元數是另一種描述三維旋轉的方式&#xff0c;四元數使用4個分量來描述旋轉&#xff0c;四元數的描述方式如下&#xff1a; qsxiyjzk,(s,x,y,z∈?&#xff09;i2j2k2ijk?1 四元數的由…

31、SAM文件中flag含義解釋工具--轉載

轉載&#xff1a;http://www.cnblogs.com/nkwy2012/p/6362996.html SAM是Sequence Alignment/Map 的縮寫。像bwa等軟件序列比對結果都會輸出這樣的文件。samtools網站上有專門的文檔介紹SAM文件。具體地址&#xff1a;http://samtools.sourceforge.net/SAM1.pdf很多人困惑SAM文…

《Head First設計模式》批注系列(一)——觀察者設計模式

最近在讀《Head First設計模式》一書&#xff0c;此系列會引用源書內容&#xff0c;但文章內容會更加直接&#xff0c;以及加入一些自己的理解。 觀察者模式&#xff08;有時又被稱為模型-視圖&#xff08;View&#xff09;模式、源-收聽者(Listener)模式或從屬者模式&#xff…

PYPL 4 月排行:Python 最流行,Java 還行不行?

開發四年只會寫業務代碼&#xff0c;分布式高并發都不會還做程序員&#xff1f; PYPL 發布了 4 月份的編程語言排行榜。 前五的分別是&#xff1a;Python、Java、Javascript、C# 和 PHP。可以看到&#xff0c;榜單沒有什么大變化&#xff0c;但是相比去年 4 月份&#xff0c;…

兩個向量的旋轉矩陣與四元素

兩向量的夾角 2017年06月20日 17:38:11 csxiaoshui 閱讀數&#xff1a;36764 怎么計算兩個向量間的夾角呢&#xff1f; 這里主要分兩種情況&#xff0c;對于二維向量和三維向量來分別討論。 1. 二維向量 二維向量的情況相對簡單&#xff0c;根據向量間的點乘關系 v1?v2|…

順序表

一、數據是如何在內存中存儲的&#xff1f; 32位系統中char&#xff0c;int型數據在內存中的存儲方式&#xff1a; char占1byte&#xff08;8bit&#xff09;int占4byte&#xff08;32bit&#xff09;假設我們有一個int類型的值&#xff0c;它從0x01開始&#xff0c;一個int占據…