本篇文章帶大家解讀一下Node.js流源碼,深入了解下Node可讀流,看看其基本原理、使用方法與工作機制,希望對大家有所幫助!
1. 基本概念
1.1. 流的歷史演變
流不是 Nodejs 特有的概念。 它們是幾十年前在 Unix 操作系統(tǒng)中引入的,程序可以通過管道運算符(|)對流進行相互交互。
在基于Unix系統(tǒng)的MacOS以及Linux中都可以使用管道運算符(|),他可以將運算符左側(cè)進程的輸出轉(zhuǎn)換成右側(cè)的輸入。
在Node中,我們使用傳統(tǒng)的readFile去讀取文件的話,會將文件從頭到尾都讀到內(nèi)存中,當所有內(nèi)容都被讀取完畢之后才會對加載到內(nèi)存中的文件內(nèi)容進行統(tǒng)一處理。
這樣做會有兩個缺點:
-
內(nèi)存方面:占用大量內(nèi)存
-
時間方面:需要等待數(shù)據(jù)的整個有效負載都加載完才會開始處理數(shù)據(jù)
為了解決上述問題,Node.js效仿并實現(xiàn)了流的概念,在Node.js流中,一共有四種類型的流,他們都是Node.js中EventEmitter的實例:
-
可讀流(Readable Stream)
-
可寫流(Writable Stream)
-
可讀可寫全雙工流(Duplex Stream)
-
轉(zhuǎn)換流(Transform Stream)
為了深入學習這部分的內(nèi)容,循序漸進的理解Node.js中流的概念,并且由于源碼部分較為復雜,本人決定先從可讀流開始學習這部分內(nèi)容。
1.2. 什么是流(Stream)
流是一種抽象的數(shù)據(jù)結(jié)構(gòu),是數(shù)據(jù)的集合,其中存儲的數(shù)據(jù)類型只能為以下類型(僅針對objectMode === false的情況):
- string
- Buffer
我們可以把流看作這些數(shù)據(jù)的集合,就像液體一樣,我們先把這些液體保存在一個容器里(流的內(nèi)部緩沖區(qū)BufferList),等到相應的事件觸發(fā)的時候,我們再把里面的液體倒進管道里,并通知其他人在管道的另一側(cè)拿自己的容器來接里面的液體進行處理。
1.3. 什么是可讀流(Readable Stream)
可讀流是流的一種類型,他有兩種模式三種狀態(tài)
兩種讀取模式:
-
流動模式:數(shù)據(jù)會從底層系統(tǒng)讀取,并通過EventEmitter盡快的將數(shù)據(jù)傳遞給所注冊的事件處理程序中
-
暫停模式:在這種模式下將不會讀取數(shù)據(jù),必須顯示的調(diào)用Stream.read()方法來從流中讀取數(shù)據(jù)
三種狀態(tài):
-
readableFlowing === null:不會產(chǎn)生數(shù)據(jù),調(diào)用Stream.pipe()、Stream.resume會使其狀態(tài)變?yōu)閠rue,開始產(chǎn)生數(shù)據(jù)并主動觸發(fā)事件
-
readableFlowing === false:此時會暫停數(shù)據(jù)的流動,但不會暫停數(shù)據(jù)的生成,因此會產(chǎn)生數(shù)據(jù)積壓
-
readableFlowing === true:正常產(chǎn)生和消耗數(shù)據(jù)
2. 基本原理
2.1. 內(nèi)部狀態(tài)定義(ReadableState)
ReadableState
_readableState: ReadableState { objectMode: false, // 操作除了string、Buffer、null之外的其他類型的數(shù)據(jù)需要把這個模式打開 highWaterMark: 16384, // 水位限制,1024 * 16,默認16kb,超過這個限制則會停止調(diào)用_read()讀數(shù)據(jù)到buffer中 buffer: BufferList { head: null, tail: null, length: 0 }, // Buffer鏈表,用于保存數(shù)據(jù) length: 0, // 整個可讀流數(shù)據(jù)的大小,如果是objectMode則與buffer.length相等 pipes: [], // 保存監(jiān)聽了該可讀流的所有管道隊列 flowing: null, // 可獨流的狀態(tài) null、false、true ended: false, // 所有數(shù)據(jù)消費完畢 endEmitted: false, // 結(jié)束事件收否已發(fā)送 reading: false, // 是否正在讀取數(shù)據(jù) constructed: true, // 流在構(gòu)造好之前或者失敗之前,不能被銷毀 sync: true, // 是否同步觸發(fā)'readable'/'data'事件,或是等到下一個tick needReadable: false, // 是否需要發(fā)送readable事件 emittedReadable: false, // readable事件發(fā)送完畢 readableListening: false, // 是否有readable監(jiān)聽事件 resumeScheduled: false, // 是否調(diào)用過resume方法 errorEmitted: false, // 錯誤事件已發(fā)送 emitClose: true, // 流銷毀時,是否發(fā)送close事件 autoDestroy: true, // 自動銷毀,在'end'事件觸發(fā)后被調(diào)用 destroyed: false, // 流是否已經(jīng)被銷毀 errored: null, // 標識流是否報錯 closed: false, // 流是否已經(jīng)關閉 closeEmitted: false, // close事件是否已發(fā)送 defaultEncoding: 'utf8', // 默認字符編碼格式 awaitDrainWriters: null, // 指向監(jiān)聽了'drain'事件的writer引用,類型為null、Writable、Set<Writable> multiAwaitDrain: false, // 是否有多個writer等待drain事件 readingMore: false, // 是否可以讀取