node中的Stream-Readable和Writeable解讀

在node中,只要涉及到文件IO的場景一般都會涉及到一個類-Stream。Stream是對IO設備的抽象表示,其在JAVA中也有涉及,主要體現在四個類-InputStream、Reader、OutputStream、Writer,其中InputStream和OutputStream類針對字節數據進行讀寫;Reader和Writer針對字符數據讀寫。同時Java中有多種針對這四種類型的擴展類,如節點流、緩沖流和轉換流等。比較而言,node中Stream類型也和Java中的類似,同樣提供了支持字節和字符讀寫的Readable和Writeable類,也存在轉換流Transform類,本文主要分析node中Readable和Writeable的實現機制,從底層的角度更好的理解Readable和Writeable實現機制,解讀在讀寫過程中發生的一些重要事件。

Readable類

Readable對應于Java中的InputStream和Reader兩個類,針對Readable設置encode編碼可完成內部數據由Buffer到字符的轉換。Readable Stream有兩種模式,即flowing和paused模式。這兩種模式對于用戶而言區別在于是否需要手動調用Readable.prototype.read(n),讀取緩沖區的數據。查詢node API文檔可知觸發flowing模式有三種方式:

  • 偵聽data事件
  • readable.resume()
  • readable.pipe()
    而觸發paused模式同樣有幾種方式:
  • 移除data事件
  • readable.pause()
  • readable.unpipe()
    可能這樣講解大家仍不明白Readable Stream這兩種模式的區別,那么下文從更深層次分析兩種模式的機制。

深入Readable的實現

Readable繼承EventEmitter,大家也都知道。但是相信大家應該不怎么熟悉Readable的實例屬性**_readableState**。該屬性是一個ReadableState類型的對象,保存了Readable實例的重要信息,如讀取模式(是否為對象模式)、highWaterMark(緩沖區存放的最大字節數)、緩沖區、flowing模式等。在Readable的實現中,處處使用ReadableState對象記錄當前讀取狀態,并設置緩沖區保證讀操作的順利進行。

首先需要針對Readable.prototype.read方法進行特別解讀:

  if (n === 0 &&state.needReadable &&(state.length >= state.highWaterMark || state.ended)) {debug('read: emitReadable', state.length, state.ended);if (state.length === 0 && state.ended)endReadable(this);elseemitReadable(this);return null;}

當讀入的數據為0時,執行emitReadable操作。這意味著,針對Readable Stream執行read(0)方法會觸發readable事件,但是不會讀當前緩沖區。因此使用read(0)可以完成一些比較巧妙的事情,如在readable處理函數中可以使用read(0)觸發下一次readable事件,可選的操作讀緩沖區。

繼續分析代碼,如果讀入的數據并不是0,則計算讀取緩沖區的具體字節數,

n = howMuchToRead(n, state);function howMuchToRead(n, state) {if (state.length === 0 && state.ended)return 0;if (state.objectMode)return n === 0 ? 0 : 1;if (n === null || isNaN(n)) {// only flow one buffer at a timeif (state.flowing && state.buffer.length)return state.buffer[0].length;// 若是paused狀態,則讀全部的緩沖區elsereturn state.length;}if (n <= 0)return 0;if (n > state.highWaterMark)state.highWaterMark = computeNewHighWaterMark(n);// don't have that much.  return null, unless we've ended.if (n > state.length) {if (!state.ended) {state.needReadable = true;return 0;} else {return state.length;}}return n;
}

針對對象模式的讀取,每次只讀一個;對于處在flowing模式下的讀取,每次只讀緩沖區中第一個buffer的長度;在paused模式下則讀取全部緩沖區的長度;若讀取的字節數大于設置的緩沖區最大值,則適當擴大緩沖區的大小(默認為16k,最大為8m);若讀取的長度大于當前緩沖區的大小,設置needReadable屬性并準備數據等待下一次讀取。

接下來,判斷是否需要準備數據。在這里,依賴于needReadable的值,

var doRead = state.needReadable;debug('need readable', doRead);if (state.length === 0 || state.length - n < state.highWaterMark) {doRead = true;debug('length less than watermark', doRead);}// reading, then it's unnecessary.if (state.ended || state.reading) {doRead = false;debug('reading or ended', doRead);}

如果當前緩沖區為空,或者緩沖區并未超出我們設定的最大值,那么就可以繼續準備數據;如果此時正在準備數據或者已經結束讀取,那么就放棄準備數據。一旦doRead為true,那么進入準備數據階段,

if (doRead) {debug('do read');state.reading = true;state.sync = true;// if the length is currently zero, then we *need* a readable event.if (state.length === 0)state.needReadable = true;// call internal read method// 默認Readable未實現_read,拋出Error// 針對自定義的Readable子類,_read可修改state.buffer的數量,進行預處理,// 然后由下面的fromList讀出去緩存中的相關數據this._read(state.highWaterMark);state.sync = false;}

接下來設置相關的標志位,進行_read處理。針對這個私有方法_read,文檔上有特殊說明,自定義的Readable實現類需要實現這個方法,在該方法中手動添加數據到Readable對象的讀緩沖區,然后進行Readable的讀取。可以理解為_read函數為讀取數據前的準備工作(準備數據),針對的是流的實現者而言。

  if (doRead && !state.reading)n = howMuchToRead(nOrig, state);var ret;if (n > 0)ret = fromList(n, state);elseret = null;if (ret === null) {state.needReadable = true;n = 0;}state.length -= n;if (state.length === 0 && !state.ended)state.needReadable = true;if (nOrig !== n && state.ended && state.length === 0)endReadable(this);// flowing模式下的數據讀取依賴于 read函數// data事件觸發的次數,依賴于howMuchToRead計算的次數if (ret !== null)this.emit('data', ret);

一旦在_read中更新了緩沖區,那么我們需要重新計算(消費者,即可寫流)讀取的字節數。fromList方法完成了讀緩沖區的slice,如果是objectMode下的讀,則只讀緩沖區的第一個對象;針對未傳參數的read方法而言,默認讀取全部緩沖區等等。從讀緩沖區讀取完數據之后設置相關flag,如needReadable,最終,觸發data事件,結束!

上節提到,設置data事件的執行函數會進入flowing模式的讀,而上文看到正是read方法觸發了data事件,而默認條件下Readable處于paused狀態,因此在paused狀態讀取數據需要手動執行read函數,每次read讀取完畢觸發一次data事件。從這點看出,flowing和paused狀態區別在于是否需要手動執行read()來獲取數據。flowing狀態下,我們無需執行read,僅需要設置data事件處理函數或者設定導流目標pipe;而在paused狀態下,不僅僅是簡單的執行read方法,因為讀緩沖區的內容時刻在改變,一旦讀緩沖區又有新數據,簡單執行read()就沒法滿足需求(因為我們無法知道是否又有新數據到來),因此需要偵聽讀緩沖區的相關事件,即readable事件,在該事件處理函數中進行read相關數據。

那么,什么情況下會觸發readable事件呢?在實現_read私有方法中,我們使用stream.push(chunk)或stream.unshift(chunk)方法注入數據到讀緩沖區,那么push和unshift方法都實現了下面的邏輯,

if (state.flowing && state.length === 0 && !state.sync) {stream.emit('data', chunk);stream.read(0);
} else {// update the buffer info.state.length += state.objectMode ? 1 : chunk.length;if (addToFront)state.buffer.unshift(chunk);elsestate.buffer.push(chunk);if (state.needReadable)emitReadable(stream);
}function emitReadable(stream) {var state = stream._readableState;state.needReadable = false;if (!state.emittedReadable) {debug('emitReadable', state.flowing);state.emittedReadable = true;if (state.sync)process.nextTick(emitReadable_, stream);elseemitReadable_(stream);}
}function emitReadable_(stream) {debug('emit readable');stream.emit('readable');flow(stream);
}
// 在flowing狀態下,自動讀取流(替代paused狀態下手動read)
function flow(stream) {var state = stream._readableState;debug('flow', state.flowing);if (state.flowing) {do {var chunk = stream.read();} while (null !== chunk && state.flowing);}
}

一旦處于flowing模式并且當前緩沖區沒有數據,那么就立即將預處理的push(unshift)數據傳遞給data事件處理函數,并執行stream.read(0)。前文已經交代過,read(0)僅僅用來觸發readable事件,并不讀取緩沖區,這就是觸發readable的第一種情況。

第二種則是第一種情況之外的所有情景,即根據操作(push、unshift)的不同將數據插入讀緩沖區的不同位置。最后執行emitReadable函數,觸發readable事件。針對emitReadable函數,它的作用就是異步觸發readable事件,并執行flow函數。flow函數則針對flowing狀態的Readable做自適應讀取,免去了手動執行read函數和何時執行read函數的苦惱。

這樣,對于Readable的實現者,一旦在_read函數插入有效數據到讀緩沖區,都會觸發readable事件,在paused狀態下,設置readable事件處理函數并手動執行read函數,便可完成數據的讀取;而在flowing狀態下,通過設置data事件處理函數或者定義pipe目標流同樣可以實現讀取。

既然pipe同樣可以觸發Readable進入flowing狀態,那么pipe方法具體做了什么呢?其實pipe針對Readable和Writeable做了限流,首先針對Readable的data事件進行偵聽,并執行Writeable的write函數,當Writeable的寫緩沖區大于一個臨界值(highWaterMark),導致write函數返回false(此時意味著Writeable無法匹配Readable的速度,Writeable的寫緩沖區已經滿了),此時,pipe修改了Readable模式,執行pause方法,進入paused模式,停止讀取讀緩沖區。而同時Writeable開始刷新寫緩沖區,刷新完畢后異步觸發drain事件,在該事件處理函數中,設置Readable為flowing狀態,并繼續執行flow函數不停的刷新讀緩沖區,這樣就完成了pipe限流。需要注意的是,Readable和Writeable各自維護了一個緩沖區,在實現的上有區別:Readable的緩沖區是一個數組,存放Buffer、String和Object類型;而Writeable則是一個有向鏈表,依次存放需要寫入的數據。

Writeable解讀

Writeable對應Java的OutputStream和Writer類,實現字節和字符數據的寫。與Readable類似,Writeable的實例對象同樣維護了一個狀態對象-WriteableState,記錄了當前輸出流的狀態信息,如寫緩沖區的最大值(hightWaterMark)、緩沖區(有向鏈表)和緩沖區長度等信息。在本節中,主要分析輸出流的關鍵方法write和事件drain,并解析輸出流的實現者需要實現的方法**_writewrite**的關系。

function write
----------------------------
if (state.ended)writeAfterEnd(this, cb);else if (validChunk(this, state, chunk, cb)) {state.pendingcb++;ret = writeOrBuffer(this, state, chunk, encoding, cb);}return ret;

在write方法中,判斷寫入數據的格式并執行writeOrBuffer函數,并返回執行結果,該返回值標示當前寫緩沖區是否已滿。真正執行寫入邏輯的是writeOrBuffer函數,該函數的作用在于刷新或者更新寫緩沖區,下面看看主要做了什么,

function writeOrBuffer(stream, state, chunk, encoding, cb) {chunk = decodeChunk(state, chunk, encoding);if (chunk instanceof Buffer)encoding = 'buffer';var len = state.objectMode ? 1 : chunk.length;state.length += len;// 如果緩存的長度大于highWaterMark,需要刷新緩沖,所以設置needDrain標志var ret = state.length < state.highWaterMark;// we must ensure that previous needDrain will not be reset to false.if (!ret)state.needDrain = true;// 緩存未處理的寫請求,在clearBuffer中執行緩存// 由此看出,Readable和Writeable都有緩存,Readable 中緩存的方式是數組(項為Buffer,字符串或對象),Writeable的// 緩存則是對象鏈表if (state.writing || state.corked) {var last = state.lastBufferedRequest;state.lastBufferedRequest = new WriteReq(chunk, encoding, cb);if (last) {last.next = state.lastBufferedRequest;} else {state.bufferedRequest = state.lastBufferedRequest;}state.bufferedRequestCount += 1;} else {doWrite(stream, state, false, len, chunk, encoding, cb);}return ret;
}

writeOrBuffer首先針對數據進行編碼,字符串轉換成Buffer類型,如果設置了Writeable的ObjectMode模式則仍為Object類型;接下來更新寫緩沖區的長度,并判斷寫緩沖區長度是否超過設定的Writeable的最大值(默認16k),如果超過超過則ret=false并更新WriteableState的屬性needDrain=true。ret的結果其實就是write方法返回值,因此一旦write返回值為false,意味著當前寫緩沖區已滿,需要停止繼續寫入數據。

在Readable的pipe方法中,涉及到了Writeable的drain事件。該事件的觸發意味著寫緩沖區已可以繼續緩存數據,可見drain事件與寫緩沖區嚴格相關。繼續分析writeOrBuffer函數,若當前輸出流正在寫數據,那么則當前數據緩存至寫緩沖區(創建WriteReq對象);否則執行doWrite函數,刷新緩沖區。

function doWrite(stream, state, writev, len, chunk, encoding, cb) {state.writelen = len;state.writecb = cb;state.writing = true;state.sync = true;if (writev)stream._writev(chunk, state.onwrite);elsestream._write(chunk, encoding, state.onwrite);state.sync = false;
}

doWrite函數設置了需要寫入數據的長度、寫入狀態等信息,并執行輸出流實現者需要實現的_write函數。在_write函數中,針對數據流向做最后的處理,這里分析_write函數的具體實現。_write函數有三個參數,分別為chunk,encoding和state.onwrite回調函數,對該回調函數稍后分析,先著重講解_write函數的實現。在node的fs模塊中,可以通過fs.createWriteStream創建Writeable實例,通過執行

var writeStream = fs.createWriteStream('./output',{decodeStrings: false});
console.log(writeStream._write.toString());-----------------輸出-----------------function (data, encoding, cb) {if (!(data instanceof Buffer))return this.emit('error', new Error('Invalid data'));if (typeof this.fd !== 'number')return this.once('open', function() {this._write(data, encoding, cb);});var self = this;fs.write(this.fd, data, 0, data.length, this.pos, function(er, bytes) {if (er) {self.destroy();return cb(er);}self.bytesWritten += bytes;cb();});if (this.pos !== undefined)this.pos += data.length;
}

看出,在_write實現中,只接受Buffer類型的數據,接著執行fs.write操作,寫入到對應文件描述符fd對應的文件中,寫入成功或失敗后執行回調函數,即state.onwrite函數。

function onwrite(stream, er) {var state = stream._writableState;var sync = state.sync;var cb = state.writecb;onwriteStateUpdate(state);// 默認未重寫_write方法,會收到er值if (er)onwriteError(stream, state, sync, er, cb);else {// Check if we're actually ready to finish, but don't emit yetvar finished = needFinish(state);// 寫緩存的數據if (!finished &&!state.corked &&!state.bufferProcessing &&state.bufferedRequest) {clearBuffer(stream, state);}// 異步觸發drain事件if (sync) {process.nextTick(afterWrite, stream, state, finished, cb);} else {afterWrite(stream, state, finished, cb);}}
}

在state.onwrite函數中主要工作有兩個:

  • 寫緩沖區的數據
  • 寫完緩沖區的數據后,異步觸發drain事件

第一步,在clearBuffer函數中,就是取出寫緩沖區(有向鏈表)的第一個WriteReq對象,執行doWrite函數,寫入緩沖區的第一個數據;這樣循環往復最終清空寫緩沖區,重置一些標志位。

第二步,異步執行afterWrite函數,觸發drain事件,并判斷是否寫操作完畢觸發“finish”事件。這里之所以強調異步觸發drain事件,是因為為了保證先獲得write()返回值為false,給用戶綁定drain處理函數的時隙,然后再觸發drain事件。

至此,Writeable的重要流程已全部走通。可以看出來,在核心的write()中,判斷寫緩沖區是否已滿并返回該值,在適當條件下緩存數據或調用_write()寫數據,在Writeable實現者需要實現的** _write() 中,主要任務是數據寫入方向控制,完成最基本的任務**。

總結

對比Readable的read()和_read(),我總結了下這四個函數在“讀寫過程”中的執行順序與關系,如下圖所示:
Readable和Writeable的函數執行順序

轉載于:https://www.cnblogs.com/accordion/p/5560531.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/395816.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/395816.shtml
英文地址,請注明出處:http://en.pswp.cn/news/395816.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

新Rider預覽版發布,對F#的支持是亮點

JetBrains一直在改進自己的跨平臺.NET IDE產品Rider&#xff0c;努力使其成為Visual Studio家族產品可承擔職能的重要替代者。于今年四月發布的Rider預覽版&#xff08;EAP 21&#xff09;提供了一些新特性&#xff0c;其中的亮點在于對函數式編程語言F#的支持。\\鑒于這是Ride…

java代碼整合_java合并多個文件的實例代碼

在實際項目中&#xff0c;在處理較大的文件時&#xff0c;常常將文件拆分為多個子文件進行處理&#xff0c;最后再合并這些子文件。下面就為各位介紹下Java中合并多個文件的方法。Java中合并子文件最容易想到的就是利用BufferedStream進行讀寫。具體的實現方式如下&#xff0c;…

正則表達式的一些規則

1.限定修飾符只對其緊前的元字符有效 String rex8 "\\d\\D"; 上式中&#xff0c;只對\\D有效&#xff0c;即有至少有1個&#xff08;1個或多個&#xff09;非數字&#xff0c;\\d仍然只許有一個數字。 2.[1,2,3]和[123]是一樣的轉載于:https://www.cnblogs.com/Sabr…

2016版單詞的減法_在2016年最大的電影中,女性只說了27%的單詞。

2016版單詞的減法by Amber Thomas通過琥珀托馬斯 在2016年最大的電影中&#xff0c;女性只說了27&#xff05;的單詞。 (Women only said 27% of the words in 2016’s biggest movies.) Movie trailers in 2016 promised viewers so many strong female characters. Jyn Erso…

軟件工程博客---團隊項目---個人設計2(算法)

針對分析我們團隊項目的需求&#xff0c;我們選定Dijkstra算法。 算法的基本思想&#xff1a; Dijkstra算法是由E.W.Dijkstra于1959年提出&#xff0c;又叫迪杰斯特拉算法&#xff0c;它應用了貪心算法模式&#xff0c;是目前公認的最好的求解最短路徑的方法。算法解決的是有向…

UWP 雜記

UWP用選取文件對話框 http://blog.csdn.net/u011033906/article/details/65448394 文件選取器、獲取文件屬性、寫入和讀取、保存讀取和刪除應用數據 https://yq.aliyun.com/articles/839 UWP判斷文件是否存在 http://blog.csdn.net/lindexi_gd/article/details/51387901…

微信上傳素材 java_微信素材上傳(JAVA)

public String uploadMaterial(String url,InputStream sbs,String filelength,String filename, String type) throws Exception {try {DataInputStream innew DataInputStream(sbs);url url.replace("TYPE", type);URL urlObj new URL(url);// 創建Http連接HttpU…

SQL Server讀寫分離之發布訂閱

一、發布 上面有多種發布方式&#xff0c;這里我選擇事物發布&#xff0c;具體區別請自行百度。 點擊下一步、然后繼續選擇需要發布的對象。 如果需要篩選發布的數據點擊添加。 根據自己的計劃選擇發布的時間。 點擊安全設置&#xff0c;設置代理信息。 最后單擊完成系統會自動…

碼農和程序員的幾個重要區別!

如果一個企業老板大聲嚷嚷說&#xff0c;“我要招個程序員”&#xff0c;那么十之八九指的是“碼農”——一種純粹為了錢而寫代碼的技術人員。這其實是一種非常狹隘和錯誤的做法&#xff0c;原因么&#xff0c;且聽我一一道來。1、碼農寫代碼&#xff0c;程序員寫系統從本質上講…

sql server2008禁用遠程連接

1.打開SQL Server 配置管理器&#xff0c;雙擊左邊 SQL Server 網絡配置&#xff0c;點擊TCP/IP協議,在協議一欄中,找到 全部偵聽,修改為否&#xff0c;然后點擊IP地址,將IP地址為127.0.0.1(IPV4)或::1(IPV6)的已啟用修改為是,其它的IP地址的已啟用修改為否 注意&#xff1a;如…

snapchat注冊不到_從Snapchat獲得開發人員職位中學到的經驗教訓

snapchat注冊不到Here are three links worth your time:這是三個值得您花費時間的鏈接&#xff1a; I just got a developer job at Snapchat. Here’s what I learned and how it can help you with your job search (15 minute read) 我剛剛在Snapchat獲得開發人員職位。 這…

java bitmap jar_Java面試中常用的BitMap代碼

引言阿里內推面試的時候被考了一道編程題&#xff1a;10億個范圍為1~2048的整數&#xff0c;將其去重并計算數字數目。我看到這個題目就想起來了《編程珠璣》第一章講的叫做BitMap的數據結構&#xff0c;但是我并沒有在java上實現過&#xff0c;這就比較尷尬了&#xff0c;再加…

移動端工程架構與后端工程架構的思想摩擦之旅(1)

此文已由作者黎星授權網易云社區發布。歡迎訪問網易云社區&#xff0c;了解更多網易技術產品運營經驗記資源投放后端工程的架構調整與優化 架構思考一直以來對軟件工程架構有著極大的興趣&#xff0c;無論是之前負責的移動端Android工程&#xff0c;亦或是現在轉到后端開發后維…

View野指針問題分析報告

【問題描述】 音樂組同事反饋了一個必現Native Crash問題&#xff0c;tombstone如下&#xff1a; pid: 5028, tid: 5028, name: com.miui.player >>> com.miui.player <<< signal 11 (SIGSEGV), code 2 (SEGV_ACCERR), fault addr 79801f28r0 7ac59c98 r1 …

SicilyFunny Game

一、題目描述 Two players, Singa and Suny, play, starting with two natural numbers. Singa, the first player, subtracts any positive multiple of the lesser of the two numbers from the greater of the two numbers, provided that the resulting number must be non…

java 分布式同步_Java Web分布式集群搭建(三)——Session同步

對于一個業務系統的Tomcat集群來說&#xff0c;必須保證同一個用戶訪問到任一臺服務器上都可以維持之前操作的身份。比如在服務器A進行了登陸&#xff0c;那么在服務器B中也要同步該用戶已登錄的狀態&#xff0c;這里就用到了Session的同步。同步方式sticky模式、復制模式、Ter…

移動應用程序和網頁應用程序_如何不完全破壞您的移動應用程序的用戶界面

移動應用程序和網頁應用程序by Luke Konior盧克科尼爾(Luke Konior) 如何不完全破壞您的移動應用程序的用戶界面 (How to not utterly ruin your mobile app’s user interface) There’s no single universal formula for designing a great user interface (if you discover…

logging記錄日志

日志是一個系統的重要組成部分&#xff0c;用以記錄用戶操作、系統運行狀態和錯誤信息。日志記錄的好壞直接關系到系統出現問題時定位的速度。logging模塊Python2.3版本開始成為Python標準庫的一部分。 日志級別 在最簡單的使用中&#xff0c;我們直接導入logging模塊&#xff…

C#編程之接口

1.定義 接口是把公共方法和屬性組合起來&#xff0c;以封裝特定功能的一個集合。&#xff08;一旦定義了接口&#xff0c;就可以在類中實現它。這樣類就可以支持接口所指定的所有屬性和成員&#xff09; 注意1&#xff1a;接口不能單獨存在。不能像實例化一個類那樣實例化一個接…

supervisor守護進程

2019獨角獸企業重金招聘Python工程師標準>>> supervisor 是一個client/server系統,把不是守護進程的進程變成守護進程,并監控和控制類 Unix 操作系統上的進程。 upervisor就是用Python開發的一套通用的進程管理程序&#xff0c;能將一個普通的命令行進程變為后臺dae…