Node.js Streams:你需要知道的一切
圖像來源
Node.js流以難以使用而聞名,甚至更難理解。好吧,我有個好消息 - 不再是這樣了。
多年來,開發人員在那里創建了許多軟件包,其唯一目的是簡化流程。但在本文中,我將重點介紹本機Node.js流API。
“Streams是Node最好,也是最容易被誤解的想法。”
- Dominic Tarr
什么是溪流?
流是數據的集合 - 就像數組或字符串一樣。不同之處在于流可能無法一次全部可用,并且它們不必適合內存。這使得流真正強大的大量數據,或者數據這是一個從外部來源有人來工作時,大塊的時間。
但是,流不僅僅是處理大數據。它們還為我們提供了代碼中可組合性的強大功能。就像我們可以通過管道其他較小的Linux命令來組成強大的linux命令一樣,我們可以在Node中使用流完全相同。
與Linux命令的可組合性
const grep = ... //用于grep輸出的流
const wc = ... //用于wc輸入的流
const wc = ... //用于wc輸入的流
grep.pipe(WC)
Node中的許多內置模塊實現了流接口:
從我的Pluralsight課程 - Advanced Node.js中捕獲的截圖
上面的列表提供了一些本機Node.js對象的示例,這些對象也是可讀寫的流。其中一些對象是可讀寫的流,如TCP套接字,zlib和加密流。
請注意,對象也是密切相關的。雖然HTTP響應是客戶端上的可讀流,但它是服務器上的可寫流。這是因為在HTTP情況下,我們基本上從一個對象(http.IncomingMessage
)讀取并寫入另一個(http.ServerResponse
)。
另外要注意的是如何stdio
流(stdin
,stdout
,stderr
)有反流類型,當涉及到的子進程。這允許一種非常簡單的方法來管理來自主流程stdio
流的這些流。
一個實際的例子
理論很棒,但往往不是100%令人信服。讓我們看一個示例,演示在內存消耗方面流可以在代碼中產生的差異。
讓我們先創建一個大文件:
const fs = require('fs');
const file = fs.createWriteStream('./ big.file'); for(let i = 0; i <= 1e6; i ++){ file.write('Lorem ipsum dolor sit amet,consectetur adipisicing elit,sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.Ut enim ad minim veniam,quis nostrud練習ullamco laboris nisi ut aliquip ex ea commodo consequat.Duis aute irure dolor in repreptderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur。Excepteur sint occaecat cupidatat non proident,sunt in culpa qui officia deserunt mollit anim id est laborum。\ n' );
} file.end();
const file = fs.createWriteStream('./ big.file'); for(let i = 0; i <= 1e6; i ++){ file.write('Lorem ipsum dolor sit amet,consectetur adipisicing elit,sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.Ut enim ad minim veniam,quis nostrud練習ullamco laboris nisi ut aliquip ex ea commodo consequat.Duis aute irure dolor in repreptderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur。Excepteur sint occaecat cupidatat non proident,sunt in culpa qui officia deserunt mollit anim id est laborum。\ n' );
} file.end();
看看我用來創建那個大文件的東西。一個可寫的流!
該fs
模塊可用于使用流接口讀取和寫入文件。在上面的例子中,我們big.file
通過帶有循環的可寫流100萬行來寫入。
運行上面的腳本會生成大約約400 MB的文件。
這是一個簡單的Node Web服務器,專門用于big.file
:
const fs = require('fs');
const server = require('http')。createServer(); server.on('request',(req,res)=> { fs.readFile('./ big.file',(err,data)=> { if(err)throw err; res.end(data); });
}); server.listen(8000);
const server = require('http')。createServer(); server.on('request',(req,res)=> { fs.readFile('./ big.file',(err,data)=> { if(err)throw err; res.end(data); });
}); server.listen(8000);
當服務器收到請求時,它將使用異步方法為大文件提供服務fs.readFile
。但是,嘿,這不像我們阻止事件循環或任何事情。每件事都很棒,對嗎?對?
好吧,讓我們看看當我們運行服務器,連接到它并監視內存時會發生什么。
當我運行服務器時,它開始時具有正常的內存量,8.7 MB:
然后我連接到服務器。注意消耗的內存發生了什么:
哇 - 內存消耗躍升至434.8 MB。
在將big.file
它們寫入響應對象之前,我們基本上將整個內容放在內存中。這是非常低效的。
HTTP響應對象(res
在上面的代碼中)也是可寫流。這意味著如果我們有一個表示內容的可讀流big.file
,我們可以將這兩個相互管道并實現大致相同的結果,而不會消耗~400 MB的內存。
Node的fs
模塊可以為使用該createReadStream
方法的任何文件提供可讀流。我們可以將它傳遞給響應對象:
const fs = require('fs');
const server = require('http')。createServer(); server.on('request',(req,res)=> { const src = fs.createReadStream('./ big.file'); src.pipe(res);}); server.listen(8000);
const server = require('http')。createServer(); server.on('request',(req,res)=> { const src = fs.createReadStream('./ big.file'); src.pipe(res);}); server.listen(8000);
現在當你連接到這個服務器時,會發生一件神奇的事情(看一下內存消耗):
發生了什么?
當客戶端請求該大文件時,我們一次流一個塊,這意味著我們根本不在內存中緩沖它。內存使用量增長了大約25 MB,就是這樣。
您可以將此示例推到極限。big.file
使用500萬行而不是僅僅100萬行重新生成,這將使文件超過2 GB,并且實際上大于Node中的默認緩沖區限制。
如果您嘗試使用該文件fs.readFile
,默認情況下您根本不能(您可以更改限制)。但是fs.createReadStream
,對于請求者來說,將2 GB的數據流傳輸到請求者是沒有問題的,最重要的是,進程內存使用量大致相同。
準備好學習流了嗎?
本文是關于Node.js的Pluralsight課程的一部分。我在那里報道了類似的視頻格式內容。
流101
Node.js中有四種基本流類型:可讀,可寫,雙工和變換流。
- 可讀流是可以從中消費數據的源的抽象。一個例子是
fs.createReadStream
方法。 - 可寫流是可以寫入數據的目標的抽象。一個例子是
fs.createWriteStream
方法。 - 雙工流是可讀和可寫的。一個例子是TCP套接字。
- 變換流基本上是雙工流,可用于在寫入和讀取數據時修改或轉換數據。一個例子是
zlib.createGzip
使用gzip壓縮數據的流。您可以將轉換流視為一個函數,其中輸入是可寫流部分,輸出是可讀流部分。您可能還會聽到稱為“?直通流?”的轉換流。
所有流都是EventEmitter
。它們發出可用于讀取和寫入數據的事件。但是,我們可以使用該pipe
方法以更簡單的方式使用流數據。
管道方法
這是你需要記住的神奇線條:
readableSrc .pipe(writableDest)
.pipe(writableDest)
在這個簡單的行中,我們管道輸出可讀流 - 數據源,作為可寫流的輸入 - 目的地。源必須是可讀流,目標必須是可寫的。當然,它們也可以是雙工/變換流。事實上,如果我們正在進入雙工流,我們可以像在Linux中一樣鏈接管道調用:
readableSrc .pipe(transformStream1).pipe(transformStream2).pipe(finalWrtitableDest)
.pipe(transformStream1).pipe(transformStream2).pipe(finalWrtitableDest)
該pipe
方法返回目標流,這使我們能夠進行上述鏈接。對于流a
(讀取),b
和c
(雙面),和d
(可寫的),我們可以:
a.pipe(b)中.pipe(c)中.pipe(d)
#這相當于:
a.pipe(b)
b.pipe(c)
c.pipe(d)
a.pipe(b)
b.pipe(c)
c.pipe(d)
#在Linux中,相當于:
$ a | b | c | d
$ a | b | c | d
該pipe
方法是消費流的最簡單方法。通常建議使用該pipe
方法或使用事件消耗流,但避免混合這兩者。通常,當您使用該pipe
方法時,您不需要使用事件,但如果您需要以更自定義的方式使用流,則事件將是可行的方法。
流事件
除了從可讀流源讀取并寫入可寫目的地之外,該pipe
方法還會自動管理一些事情。例如,它處理錯誤,文件結束以及一個流比另一個流更慢或更快的情況。
但是,流也可以直接與事件一起使用。這是該pipe
方法主要用于讀寫數據的簡化事件等效代碼:
#readed.pipe(可寫)
readable.on('data',(chunk)=> { writable.write(chunk);
});
writable.write(chunk);
});
readable.on('end',()=> { writable.end();
});
writable.end();
});
以下是可與可讀寫流一起使用的重要事件和函數的列表:
從我的Pluralsight課程 - Advanced Node.js中捕獲的截圖
事件和函數以某種方式相關,因為它們通常一起使用。
可讀流上最重要的事件是:
data
每當流將一大塊數據傳遞給使用者時發出的事件- 該
end
事件,在沒有更多數據要從流中消耗時發出。
可寫流上最重要的事件是:
- 該
drain
事件,是可寫流可以接收更多數據的信號。 - 該
finish
事件,在將所有數據刷新到基礎系統時發出。
可以組合事件和功能,以實現流的自定義和優化使用。要使用可讀流,我們可以使用pipe
/?unpipe
methods或read
/?unshift
/?resume
方法。要使用可寫流,我們可以將它作為pipe
/?的目標unpipe
,或者只是使用write
方法寫入它,并end
在完成后調用方法。
可讀流的暫停和流動模式
可讀流有兩種主要模式影響我們使用它們的方式:
- 它們可以處于暫停模式
- 或者在流動模式下
這些模式有時被稱為拉動和推動模式。
默認情況下,所有可讀流都以暫停模式啟動,但它們可以輕松切換為流動,并在需要時返回暫停狀態。有時,切換會自動進行。
當可讀流處于暫停模式時,我們可以使用該read()
方法根據需要從流中讀取,但是,對于流動模式中的可讀流,數據不斷流動,我們必須監聽事件以使用它。
在流動模式下,如果沒有消費者可以處理數據,實際上可能會丟失數據。這就是為什么當我們在流動模式下有可讀流時,我們需要一個data
事件處理程序。實際上,只需添加一個data
事件處理程序就可以將暫停的流切換為流動模式,并刪除data
事件處理程序會將流切換回暫停模式。其中一些是為了向后兼容舊的Node流接口而完成的。
要在這兩種流模式之間手動切換,可以使用resume()
和pause()
方法。
從我的Pluralsight課程 - Advanced Node.js中捕獲的截圖
使用該pipe
方法消耗可讀流時,我們不必擔心這些模式會pipe
自動管理它們。
實現流
當我們在Node.js中討論流時,有兩個主要的不同任務:
- 實現流的任務。
- 消費它們的任務。
到目前為止,我們一直在談論只消耗流。讓我們實施一些!
流實現者通常require
是stream
模塊的人。
實現可寫流
要實現可寫流,我們需要使用Writable
流模塊中的構造函數。
const {Writable} = require('stream');
我們可以通過多種方式實現可寫流。例如,Writable
如果需要,我們可以擴展構造函數
class myWritableStream擴展Writable { }}
但是,我更喜歡更簡單的構造方法。我們只是從Writable
構造函數創建一個對象,并傳遞一些選項。唯一需要的選項是write
暴露要寫入的數據塊的函數。
const {Writable} = require('stream');
const outStream = new Writable({ write(chunk,encoding,callback){ console.log(chunk.toString()); callback(); } }); process.stdin.pipe(outStream);write(chunk,encoding,callback){ console.log(chunk.toString()); callback(); } }); process.stdin.pipe(outStream);
這個write方法有三個參數。
- 該塊通常是一個緩沖區,除非我們配置不同的數據流。
- 該編碼參數,需要在這種情況下,但通常我們可以忽略它。
- 該回調是我們需要我們完成處理數據塊之后調用一個函數。這就是寫入是否成功的信號。要發出故障信號,請使用錯誤對象調用回調。
在outStream
,我們只是console.log
將塊作為字符串,并callback
在沒有錯誤的情況下調用之后表示成功。這是一個非常簡單且可能不那么有用的回聲流。它將回應它收到的任何東西。
要使用這個流,我們可以簡單地使用它process.stdin
,這是一個可讀的流,所以我們可以直接process.stdin
進入我們的流outStream
。
當我們運行上面的代碼時,我們輸入的任何內容都process.stdin
將使用該outStream
?console.log
行回顯。
這不是一個非常有用的實現流,因為它實際上已經實現和內置。這非常相當于process.stdout
。我們可以直接stdin
進入stdout
,我們將通過這一行獲得完全相同的回聲功能:
process.stdin.pipe(process.stdout);
實現可讀流
要實現可讀流,我們需要Readable
接口,并從中構造一個對象,并read()
在流的配置參數中實現一個方法:
const {Readable} = require('stream');
const inStream = new Readable({ read(){} });read(){} });
有一種實現可讀流的簡單方法。我們可以直接push
使用我們希望消費者使用的數據。
const {Readable} = require('stream');
const inStream = new Readable({ read(){} });read(){} });inStream中。推('ABCDEFGHIJKLM'); inStream中。推('NOPQRSTUVWXYZ');推('ABCDEFGHIJKLM'); inStream中。推('NOPQRSTUVWXYZ');inStream中。推(null); //沒有更多數據推(null); //沒有更多數據
inStream.pipe(process.stdout);
當我們push
成為一個null
對象時,這意味著我們想要發信號通知該流沒有更多數據。
要使用這個簡單的可讀流,我們可以簡單地將其傳輸到可寫流中process.stdout
。
當我們運行上面的代碼時,我們將從中讀取所有數據inStream
并將其回顯到標準輸出。很簡單,但也不是很有效率。
我們基本上推流中的所有數據之前,它管道到process.stdout
。當消費者要求時,更好的方法是按需推送數據。我們可以通過read()
在配置對象中實現該方法來實現:
const inStream = new Readable({ read(size){ //對數據有需求......有人想讀它。} });read(size){ //對數據有需求......有人想讀它。} });
當在可讀流上調用read方法時,實現可以將部分數據推送到隊列。例如,我們可以一次推送一個字母,從字符代碼65(代表A)開始,并在每次推送時遞增:
const inStream = new Readable({ read(size){ this.push(String.fromCharCode(this.currentCharCode ++)); if(this.currentCharCode> 90){ this.push(null); } } });read(size){ this.push(String.fromCharCode(this.currentCharCode ++)); if(this.currentCharCode> 90){ this.push(null); } } });
inStream.currentCharCode = 65;
inStream.pipe(process.stdout);
當消費者正在閱讀可讀流時,該read
方法將繼續觸發,并且我們將推送更多字母。我們需要在某個地方停止這個循環,這就是當currentCharCode大于90(表示Z)時if語句推送null的原因。
這段代碼相當于我們開始使用的更簡單的代碼,但現在我們在消費者要求時按需推送數據。你應該總是那樣做。
實現雙工/轉換流
使用Duplex流,我們可以使用相同的對象實現可讀和可寫流。就像我們從兩個接口繼承一樣。
這是一個示例雙工流,它結合了上面實現的兩個可寫和可讀示例:
const {Duplex} = require('stream'); const inoutStream = new Duplex({ write(chunk,encoding,callback){ console.log(chunk.toString()); callback(); },read(size){ this.push(String.fromCharCode(this.currentCharCode ++) ); if(this.currentCharCode> 90){ this.push(null); } } }); inoutStream.currentCharCode = 65;const inoutStream = new Duplex({ write(chunk,encoding,callback){ console.log(chunk.toString()); callback(); },read(size){ this.push(String.fromCharCode(this.currentCharCode ++) ); if(this.currentCharCode> 90){ this.push(null); } } }); inoutStream.currentCharCode = 65;
<strong>process.stdin.pipe(inoutStream).pipe(process.stdout);</strong>
通過組合這些方法,我們可以使用此雙工流來讀取A到Z中的字母,我們也可以將其用作其回聲功能。我們將可讀stdin
流傳輸到此雙工流中以使用echo功能,我們將雙工流本身stdout
傳輸到可寫流中以查看字母A到Z.
重要的是要理解雙工流的可讀和可寫側完全獨立地操作。這僅僅是將兩個特征分組到一個對象中。
變換流是更有趣的雙工流,因為其輸出是根據其輸入計算的。
對于轉換流,我們不必實現read
或write
方法,我們只需要實現一個transform
結合它們的方法。它具有write
方法的簽名,我們也可以將它用于push
數據。
這是一個簡單的變換流,在將其轉換為大寫格式之后回顯您輸入的任何內容:
const {Transform} = require('stream'); const upperCaseTr = new Transform({ transform(chunk,encoding,callback){ this.push(chunk.toString()。toUpperCase()); callback(); } }); process.stdin.pipe(upperCaseTr).pipe(process.stdout);const upperCaseTr = new Transform({ transform(chunk,encoding,callback){ this.push(chunk.toString()。toUpperCase()); callback(); } }); process.stdin.pipe(upperCaseTr).pipe(process.stdout);
在這個我們正在消耗的變換流中,就像前面的雙工流示例一樣,我們只實現了一個transform()
方法。在該方法中,我們將chunk
其轉換為大寫版本,然后push
將該版本轉換為可讀部分。
流對象模式
默認情況下,流期望緩沖區/字符串值。objectMode
我們可以設置一個標志,讓流接受任何JavaScript對象。
這是一個簡單的例子來證明這一點。以下轉換流組合使得一個功能可以將逗號分隔值的字符串映射到JavaScript對象中。因此“a,b,c,d”
變得{a: b, c: d}
。
const {Transform} = require('stream');
const commaSplitter = new Transform({ readableObjectMode:true,readableObjectMode:true,transform(chunk,encoding,callback){ this.push(chunk.toString()。trim()。split(',')); 打回來(); } };this.push(chunk.toString()。trim()。split(',')); 打回來(); } };const arrayToObject = new Transform({ readableObjectMode:true,writableObjectMode:true,readableObjectMode:true,writableObjectMode:true,transform(chunk,encoding,callback){ const obj = {}; for(let i = 0; i <chunk.length; i + = 2){ obj [chunk [i]] = chunk [i + 1]; } this.push(obj); 打回來(); } };const obj = {}; for(let i = 0; i <chunk.length; i + = 2){ obj [chunk [i]] = chunk [i + 1]; } this.push(obj); 打回來(); } };const objectToString = new Transform({ writableObjectMode:true,writableObjectMode:true,transform(chunk,encoding,callback){ this.push(JSON.stringify(chunk)+'\ n'); 打回來(); } };this.push(JSON.stringify(chunk)+'\ n'); 打回來(); } };process.stdin .pipe(commaSplitter).pipe(arrayToObject).pipe(objectToString ).pipe(process.stdout).pipe(commaSplitter).pipe(arrayToObject).pipe(objectToString ).pipe(process.stdout)
我們傳遞輸入字符串(例如“a,b,c,d”
),通過commaSplitter
該字符串將數組推送為可讀數據([“a”, “b”, “c”, “d”]
)。readableObjectMode
在該流上添加標志是必要的,因為我們在那里推送一個對象,而不是字符串。
然后我們將數組并將其arrayToObject
傳輸到流中。我們需要一個writableObjectMode
標志來使該流接受一個對象。它還會推送一個對象(映射到對象的輸入數組),這就是為什么我們也需要readableObjectMode
那里的標志。最后一個objectToString
流接受一個對象,但推出一個字符串,這就是為什么我們只需要一個writableObjectMode
標志。可讀部分是普通字符串(字符串化對象)。
使用上面的例子
Node的內置轉換流
Node有一些非常有用的內置變換流。即,zlib和加密流。
這是一個使用zlib.createGzip()
流與fs
可讀/可寫流相結合來創建文件壓縮腳本的示例:
const fs = require('fs');
const zlib = require('zlib');
const file = process.argv [2]; fs.createReadStream(file).pipe(zlib.createGzip())。pipe(fs.createWriteStream(file +'.gz'));
const zlib = require('zlib');
const file = process.argv [2]; fs.createReadStream(file).pipe(zlib.createGzip())。pipe(fs.createWriteStream(file +'.gz'));
您可以使用此腳本將您傳遞的任何文件作為參數進行gzip。我們將該文件的可讀流傳輸到zlib內置轉換流中,然后傳輸到新gzip壓縮文件的可寫流中。簡單。
使用管道的一個很酷的事情是,如果需要,我們實際上可以將它們與事件結合起來。比如說,我希望用戶在腳本工作時看到進度指示器,在腳本完成時看到“完成”消息。由于該pipe
方法返回目標流,我們也可以鏈接事件處理程序的注冊:
const fs = require('fs');
const zlib = require('zlib');
const file = process.argv [2]; fs.createReadStream(file).pipe(zlib.createGzip())。
on('data',()=> process.stdout.write('。'))。pipe(fs.createWriteStream(file +'.zz' ))。on('finish',()=> console.log('完成'));
const zlib = require('zlib');
const file = process.argv [2]; fs.createReadStream(file).pipe(zlib.createGzip())。
on('data',()=> process.stdout.write('。'))。pipe(fs.createWriteStream(file +'.zz' ))。on('finish',()=> console.log('完成'));
因此,通過該pipe
方法,我們可以輕松地使用流,但我們仍然可以使用需要的事件進一步自定義與這些流的交互。
這個pipe
方法的優點在于我們可以用一種可讀的方式逐個編寫程序。例如,data
我們可以簡單地創建一個轉換流來報告進度,而不是監聽上面的事件,并.on()
用另一個??.pipe()
調用替換該??調用:
const fs = require('fs');
const zlib = require('zlib');
const file = process.argv [2]; const {Transform} = require('stream'); const reportProgress = new Transform({ transform(chunk,encoding,callback){ process.stdout.write('。'); callback(null,chunk); }
}); fs.createReadStream(file).pipe(zlib.createGzip())。
pipe(reportProgress).pipe(fs.createWriteStream(file +'。。'))。on('finish',()=> console.log( '完成'));
const zlib = require('zlib');
const file = process.argv [2]; const {Transform} = require('stream'); const reportProgress = new Transform({ transform(chunk,encoding,callback){ process.stdout.write('。'); callback(null,chunk); }
}); fs.createReadStream(file).pipe(zlib.createGzip())。
pipe(reportProgress).pipe(fs.createWriteStream(file +'。。'))。on('finish',()=> console.log( '完成'));
此reportProgress
流是一個簡單的直通流,但它也會將進度報告給標準輸出。請注意我如何使用函數中的第二個參數callback()
來推送transform()
方法中的數據。這相當于首先推送數據。
組合流的應用是無止境的。例如,如果我們需要在gzip之前或之后加密文件,我們需要做的就是按照我們需要的確切順序管道另一個轉換流。我們可以使用Node的crypto
模塊:
const crypto = require('crypto');
// ...
// ...
fs.createReadStream(file).pipe(zlib.createGzip())。
pipe(crypto.createCipher('aes192','a_secret'))。
pipe(reportProgress).pipe(fs.createWriteStream(file +'.zz') ).on('finish',()=> console.log('完成'));
.pipe(zlib.createGzip())。
pipe(crypto.createCipher('aes192','a_secret'))。
pipe(reportProgress).pipe(fs.createWriteStream(file +'.zz') ).on('finish',()=> console.log('完成'));
上面的腳本壓縮然后加密傳遞的文件,只有擁有秘密的人才能使用輸出的文件。我們無法使用普通的解壓縮實用程序解壓縮此文件,因為它已加密。
要實際能夠解壓縮上面腳本壓縮的任何內容,我們需要以相反的順序使用相反的流加密和zlib,這很簡單:
fs.createReadStream(文件).pipe(加密createDecipher( 'AES192', 'a_secret')).pipe(ZLIB。createGunzip()).pipe(reportProgress).pipe(fs.createWriteStream(file.slice(0, - 3)))。on('finish',()=> console.log('Done'));
.pipe(加密createDecipher( 'AES192', 'a_secret')).pipe(ZLIB。createGunzip()).pipe(reportProgress).pipe(fs.createWriteStream(file.slice(0, - 3)))。on('finish',()=> console.log('Done'));
假設傳遞的文件是壓縮版本,上面的代碼將從中創建一個讀取流,將其傳遞到加密createDecipher()
流(使用相同的秘密),將其輸出createGunzip()
傳遞到zlib?流,然后將內容寫回沒有擴展部分的文件。
這就是我對這個話題的全部看法。謝謝閱讀!直到下一次!
學習反應還是節點?查看我的書:
- 通過構建游戲了解React.js
- Node.js超越基礎
?