Node.js Stream - 基礎篇

背景

在構建較復雜的系統時,通常將其拆解為功能獨立的若干部分。這些部分的接口遵循一定的規范,通過某種方式相連,以共同完成較復雜的任務。譬如,shell通過管道|連接各部分,其輸入輸出的規范是文本流。

在Node.js中,內置的Stream模塊也實現了類似功能,各部分通過.pipe()連接。

鑒于目前國內系統性介紹Stream的文章較少,而越來越多的開源工具都使用了Stream,本系列文章將從以下幾方面來介紹相關內容:

  1. 流的基本類型,以及Stream模塊的基本使用方法
  2. 流式處理與back pressure的工作原理
  3. 如何開發流式程序,包括對Gulp與Browserify的剖析,以及一個實戰示例。

本文為系列文章的第一篇。

流的四種類型

Stream提供了以下四種類型的流:

var Stream = require('stream')var Readable = Stream.Readable
var Writable = Stream.Writable
var Duplex = Stream.Duplex
var Transform = Stream.Transform

使用Stream可實現數據的流式處理,如:

var fs = require('fs')
// `fs.createReadStream`創建一個`Readable`對象以讀取`bigFile`的內容,并輸出到標準輸出
// 如果使用`fs.readFile`則可能由于文件過大而失敗
fs.createReadStream(bigFile).pipe(process.stdout)

Readable

創建可讀流。

實例:流式消耗迭代器中的數據。

'use strict'
const Readable = require('stream').Readableclass ToReadable extends Readable {constructor(iterable) {super()this.iterator = new function *() {yield * iterable}}// 子類需要實現該方法// 這是生產數據的邏輯_read() {const res = this.iterator.next()if (res.done) {// 數據源已枯竭,調用`push(null)`通知流this.push(null)} else {// 通過`push`方法將數據添加到流中this.push(res.value + '\n')}}
}module.exports = ToReadable

實際使用時,new ToReadable(iterable)會返回一個可讀流,下游可以流式的消耗迭代器中的數據。

const iterable = function *(limit) {while (limit--) {yield Math.random()}
}(1e10)const readable = new ToReadable(iterable)// 監聽`data`事件,一次獲取一個數據
readable.on('data', data => process.stdout.write(data))// 所有數據均已讀完
readable.on('end', () => process.stdout.write('DONE'))

執行上述代碼,將會有100億個隨機數源源不斷地寫進標準輸出流。

創建可讀流時,需要繼承Readable,并實現_read方法。

  • _read方法是從底層系統讀取具體數據的邏輯,即生產數據的邏輯。
  • _read方法中,通過調用push(data)將數據放入可讀流中供下游消耗。
  • _read方法中,可以同步調用push(data),也可以異步調用。
  • 當全部數據都生產出來后,必須調用push(null)來結束可讀流。
  • 流一旦結束,便不能再調用push(data)添加數據。

可以通過監聽data事件的方式消耗可讀流。

  • 在首次監聽其data事件后,readable便會持續不斷地調用_read(),通過觸發data事件將數據輸出。
  • 第一次data事件會在下一個tick中觸發,所以,可以安全地將數據輸出前的邏輯放在事件監聽后(同一個tick中)。
  • 當數據全部被消耗時,會觸發end事件。

上面的例子中,process.stdout代表標準輸出流,實際是一個可寫流。下小節中介紹可寫流的用法。

Writable

創建可寫流。

前面通過繼承的方式去創建一類可讀流,這種方法也適用于創建一類可寫流,只是需要實現的是_write(data, enc, next)方法,而不是_read()方法。

有些簡單的情況下不需要創建一類流,而只是一個流對象,可以用如下方式去做:

const Writable = require('stream').Writableconst writable = Writable()
// 實現`_write`方法
// 這是將數據寫入底層的邏輯
writable._write = function (data, enc, next) {// 將流中的數據寫入底層process.stdout.write(data.toString().toUpperCase())// 寫入完成時,調用`next()`方法通知流傳入下一個數據process.nextTick(next)
}// 所有數據均已寫入底層
writable.on('finish', () => process.stdout.write('DONE'))// 將一個數據寫入流中
writable.write('a' + '\n')
writable.write('b' + '\n')
writable.write('c' + '\n')// 再無數據寫入流時,需要調用`end`方法
writable.end()
  • 上游通過調用writable.write(data)將數據寫入可寫流中。write()方法會調用_write()data寫入底層。
  • _write中,當數據成功寫入底層后,必須調用next(err)告訴流開始處理下一個數據。
  • next的調用既可以是同步的,也可以是異步的。
  • 上游必須調用writable.end(data)來結束可寫流,data是可選的。此后,不能再調用write新增數據。
  • end方法調用后,當所有底層的寫操作均完成時,會觸發finish事件。

Duplex

創建可讀可寫流。

Duplex實際上就是繼承了ReadableWritable的一類流。
所以,一個Duplex對象既可當成可讀流來使用(需要實現_read方法),也可當成可寫流來使用(需要實現_write方法)。

var Duplex = require('stream').Duplexvar duplex = Duplex()// 可讀端底層讀取邏輯
duplex._read = function () {this._readNum = this._readNum || 0if (this._readNum > 1) {this.push(null)} else {this.push('' + (this._readNum++))}
}// 可寫端底層寫邏輯
duplex._write = function (buf, enc, next) {// a, bprocess.stdout.write('_write ' + buf.toString() + '\n')next()
}// 0, 1
duplex.on('data', data => console.log('ondata', data.toString()))duplex.write('a')
duplex.write('b')duplex.end()

上面的代碼中實現了_read方法,所以可以監聽data事件來消耗Duplex產生的數據。
同時,又實現了_write方法,可作為下游去消耗數據。

因為它既可讀又可寫,所以稱它有兩端:可寫端和可讀端。
可寫端的接口與Writable一致,作為下游來使用;可讀端的接口與Readable一致,作為上游來使用。

Transform

在上面的例子中,可讀流中的數據(0, 1)與可寫流中的數據('a', 'b')是隔離開的,但在Transform中可寫端寫入的數據經變換后會自動添加到可讀端。
Tranform繼承自Duplex,并已經實現了_read_write方法,同時要求用戶實現一個_transform方法。

'use strict'const Transform = require('stream').Transformclass Rotate extends Transform {constructor(n) {super()// 將字母旋轉`n`個位置this.offset = (n || 13) % 26}// 將可寫端寫入的數據變換后添加到可讀端_transform(buf, enc, next) {var res = buf.toString().split('').map(c => {var code = c.charCodeAt(0)if (c >= 'a' && c <= 'z') {code += this.offsetif (code > 'z'.charCodeAt(0)) {code -= 26}} else if (c >= 'A' && c <= 'Z') {code += this.offsetif (code > 'Z'.charCodeAt(0)) {code -= 26}}return String.fromCharCode(code)}).join('')// 調用push方法將變換后的數據添加到可讀端this.push(res)// 調用next方法準備處理下一個next()}}var transform = new Rotate(3)
transform.on('data', data => process.stdout.write(data))
transform.write('hello, ')
transform.write('world!')
transform.end()// khoor, zruog!

objectMode

前面幾節的例子中,經常看到調用data.toString()。這個toString()的調用是必需的嗎?
本節介紹完如何控制流中的數據類型后,自然就有了答案。

在shell中,用管道(|)連接上下游。上游輸出的是文本流(標準輸出流),下游輸入的也是文本流(標準輸入流)。在本文介紹的流中,默認也是如此。

對于可讀流來說,push(data)時,data只能是StringBuffer類型,而消耗時data事件輸出的數據都是Buffer類型。對于可寫流來說,write(data)時,data只能是StringBuffer類型,_write(data)調用時傳進來的data都是Buffer類型。

也就是說,流中的數據默認情況下都是Buffer類型。產生的數據一放入流中,便轉成Buffer被消耗;寫入的數據在傳給底層寫邏輯時,也被轉成Buffer類型。

但每個構造函數都接收一個配置對象,有一個objectMode的選項,一旦設置為true,就能出現“種瓜得瓜,種豆得豆”的效果。

Readable未設置objectMode時:

const Readable = require('stream').Readableconst readable = Readable()readable.push('a')
readable.push('b')
readable.push(null)readable.on('data', data => console.log(data))

輸出:

<Buffer 61>
<Buffer 62>

Readable設置objectMode后:

const Readable = require('stream').Readableconst readable = Readable({ objectMode: true })readable.push('a')
readable.push('b')
readable.push({})
readable.push(null)readable.on('data', data => console.log(data))

輸出:

a
b
{}

可見,設置objectMode后,push(data)的數據被原樣地輸出了。此時,可以生產任意類型的數據。

系列文章

  • 第一部分:《Node.js Stream - 基礎篇》,介紹Stream接口的基本使用。
  • 第二部分:《Node.js Stream - 進階篇》,重點剖析Stream底層如何支持流式數據處理,及其back pressure機制。
  • 第三部分:《Node.js Stream - 實戰篇》,介紹如何使用Stream進行程序設計。從BrowserifyGulp總結出兩種設計模式,并基于Stream構建一個為Git倉庫自動生成changelog的應用作為示例。

?

參考文獻

  • GitHub,substack/browserify-handbook
  • GitHub,zoubin/streamify-your-node-program

來自:http://tech.meituan.com/stream-basics.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/283917.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/283917.shtml
英文地址,請注明出處:http://en.pswp.cn/news/283917.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Axure RP使用攻略--動態面板的用途(8)

寫了幾個Axure教程之后發現&#xff0c;可能教程的起點有些高了&#xff0c;過分的去講效果的實現&#xff0c;而忽略了axure功能以及基礎元件的使用&#xff0c;那么從這個教程開始&#xff0c;把這些逐漸的展開講解。 關于動態面板 動態面板是axure原型制作中使用非常頻繁的一…

ABP 6.0.0-rc.1的新特性

2022-07-26官方發布ABP 6.0.0-rc.1版本&#xff0c;本文挑選了幾個新特性進行了介紹&#xff0c;主要包括LeptonX Lite默認主題、OpenIddict模塊&#xff0c;以及如何將Identity Server遷移到OpenIddict。據ABP官方公眾號介紹&#xff0c;ABP 6.0.0穩定版的計劃發布日期為2022-…

Java并發包--線程池框架

轉載請注明出處&#xff1a;http://www.cnblogs.com/skywang12345/p/3509903.html 線程池架構圖 線程池的架構圖如下&#xff1a; 1. Executor 它是"執行者"接口&#xff0c;它是來執行任務的。準確的說&#xff0c;Executor提供了execute()接口來執行已提交的 Runna…

c 試水解碼jpeg圖片比特流(已成功解碼)

找到一張采用霍夫曼通用DC,AC編碼表的圖片&#xff0c;提取出此圖片的比特流準備對它解碼&#xff0c;再反推怎樣編碼。 下圖是此圖片比特流前100個字節。解碼是每次讀一字節&#xff0c;對這8比特解碼&#xff0c;如8比特不能解碼&#xff0c;再讀入一字節。因為霍夫曼表最多…

Raft算法詳解

Raft算法屬于Multi-Paxos算法&#xff0c;它是在Multi-Paxos思想的基礎上&#xff0c;做了一些簡化和限制&#xff0c;比如增加了日志必須是連續的&#xff0c;只支持領導者、跟隨者和候選人三種狀態&#xff0c;在理解和算法實現上都相對容易許多 從本質上說&#xff0c;Raft算…

淘寶彈性布局方案lib-flexible研究

1. lib-flexible不能與響應式布局兼容 先說說響應式布局的一些基本認識&#xff1a; 響應式布局的表現是&#xff1a;網頁通過css媒介查詢判斷可視區域的寬度&#xff0c;在不同的范圍應用不同的樣式&#xff0c;以便在不同尺寸的設備上呈現最佳的界面效果。典型的例子是&#…

[No0000DB]C# FtpClientHelper Ftp客戶端上傳下載重命名 類封裝

using System; using System.Diagnostics; using System.IO; using System.Text; using Shared;namespace Helpers {public static class FileHelper{#region Methods/// <summary>/// 向文本文件的尾部追加內容/// </summary>/// <param name"filePa…

WPF效果第一百九十四篇之伸縮面板

前面一篇玩耍了一下登錄實現效果;今天在原來的基礎上來玩耍一下伸縮面板的效果;閑話不多扯直接看效果:1、關于前臺簡單布局:2、左側面板伸縮動畫&#xff1a;<Storyboard x:Key"ShowConfigSb"><ThicknessAnimationUsingKeyFrames Storyboard.TargetProperty…

你不知道的JavaScript(二)

第三章 原生函數 JS有很多原生函數&#xff0c;為基本的數據類型值提供了封裝對象&#xff0c;String&#xff0c;Number&#xff0c;Boolean等。我們可以通過{}.call.toString()來查看所有typeof返回object的對象的內置屬性[[class]],這個屬性無法直接訪問。我們基本類型調用的…

[轉]guava快速入門

Guava工程包含了若干被Google的 Java項目廣泛依賴 的核心庫&#xff0c;例如&#xff1a;集合 [collections] 、緩存 [caching] 、原生類型支持 [primitives support] 、并發庫 [concurrency libraries] 、通用注解 [common annotations] 、字符串處理 [string processing] 、I…

數據庫編程1 Oracle 過濾 函數 分組 外連接 自連接

【本文謝絕轉載原文來自http://990487026.blog.51cto.com】<大綱>數據庫編程1 Oracle 過濾 函數 分組 外連接 自連接本文實驗基于的數據表:winsows安裝好Oracle11g之后,開始實驗SQLplus 登陸 ORaclesqlplus 退出的方式查看用戶之下有什么表查看表的所有記錄&#xff0c;不…

【.NET 6】開發minimal api以及依賴注入的實現和代碼演示

前言&#xff1a;.net 6 LTS版本發布已經有一段時間了。此處做一個關于使用.net 6 開發精簡版webapi&#xff08;minimal api&#xff09;的入門教程演示。1、新建一個項目。此處就命名為 SomeExample:2、選擇 .net6版本&#xff0c;并且此處先去掉HTTPS配置以及去掉使用控制器…

(轉載)VS2010/MFC編程入門之四(MFC應用程序框架分析)

上一講雞啄米講的是VS2010應用程序工程中文件的組成結構&#xff0c;可能大家對工程的運行原理還是很模糊&#xff0c;理不出頭緒&#xff0c;畢竟跟C編程入門系列中的例程差別太大。這一節雞啄米就為大家分析下MFC應用程序框架的運行流程。 一.SDK應用程序與MFC應用程序運行過…

個人博客開發-開篇

邁出第一步&#xff1a; 很久以前就有這個想法&#xff0c;自己動手開發一套個人博客系統&#xff0c;終于&#xff0c;現在開始邁出了第一步。做這件事一點是做一個有個人風格的博客系統&#xff0c;第二點是對做這件事所使用的技術棧進行學習&#xff0c;所謂最好的學習就是實…

2022年中國中小學教育信息化行業研究報告

教育信息化丨研究報告 核心摘要&#xff1a; 背景篇 目前&#xff0c;我國中小學教育主要呈現信息時代教育的特征&#xff0c;智能時代教育特征初露端倪&#xff1b;中小學教育信息化正從量變邁向質變&#xff0c;創新引領與生態變革成為行業縱深的主旋律&#xff1b; 2021年…

使用curl指令發起websocket請求

昨日的文章沒指出websocket請求協商切換的精髓&#xff0c;刪除重發。前文相關&#xff1a;? .NET WebSockets 核心原理初體驗[1]? SignalR 從開發到生產部署避坑指南[2]tag&#xff1a;瀏覽器--->nginx--> server其中提到nginx默認不會為客戶端轉發Upgrade、Connectio…

Yii 2 的安裝 之 踩坑歷程

由于剛接觸yii2 ,決定先裝個試試&#xff1b;可是這一路安裝差點整吐血&#xff0c;可能還是水平有限吧&#xff0c; 但還是想把這個過程分享出來&#xff0c;讓遇到同樣問題的同學有個小小的參考&#xff0c;好了言歸正傳&#xff01;&#xff01; <(~.~)> 下面是安裝流…

設計模式之代理模式(上) 靜態代理與JDK動態代理

2019獨角獸企業重金招聘Python工程師標準>>> 代理模式 給某一個對象提供一個代理&#xff0c;并由代理對象控制對原對象的引用。靜態代理 靜態代理是由我們編寫好的類&#xff0c;在程序運行之前就已經編譯好的的類&#xff0c;此時就叫靜態代理。 說理論還是比較懵…

mysql 分頁查詢

使用limit函數 limit關鍵字的用法&#xff1a; LIMIT [offset,] rows offset指定要返回的第一行的偏移量&#xff0c;rows第二個指定返回行的最大數目。初始行的偏移量是0(不是1)。轉載于:https://www.cnblogs.com/xping/p/6703986.html