本篇文章帶大家解讀一下Node.js流源碼,深入了解下Node可讀流,看看其基本原理、使用方法與工作機(jī)制,希望對(duì)大家有所幫助!
1. 基本概念
1.1. 流的歷史演變
流不是 Nodejs 特有的概念。 它們是幾十年前在 Unix 操作系統(tǒng)中引入的,程序可以通過(guò)管道運(yùn)算符(|)對(duì)流進(jìn)行相互交互。
在基于Unix系統(tǒng)的MacOS以及Linux中都可以使用管道運(yùn)算符(|),他可以將運(yùn)算符左側(cè)進(jìn)程的輸出轉(zhuǎn)換成右側(cè)的輸入。
在Node中,我們使用傳統(tǒng)的readFile去讀取文件的話,會(huì)將文件從頭到尾都讀到內(nèi)存中,當(dāng)所有內(nèi)容都被讀取完畢之后才會(huì)對(duì)加載到內(nèi)存中的文件內(nèi)容進(jìn)行統(tǒng)一處理。
這樣做會(huì)有兩個(gè)缺點(diǎn):
-
內(nèi)存方面:占用大量?jī)?nèi)存
-
時(shí)間方面:需要等待數(shù)據(jù)的整個(gè)有效負(fù)載都加載完才會(huì)開(kāi)始處理數(shù)據(jù)
為了解決上述問(wèn)題,Node.js效仿并實(shí)現(xiàn)了流的概念,在Node.js流中,一共有四種類(lèi)型的流,他們都是Node.js中EventEmitter的實(shí)例:
-
可讀流(Readable Stream)
-
可寫(xiě)流(Writable Stream)
-
可讀可寫(xiě)全雙工流(Duplex Stream)
-
轉(zhuǎn)換流(Transform Stream)
為了深入學(xué)習(xí)這部分的內(nèi)容,循序漸進(jìn)的理解Node.js中流的概念,并且由于源碼部分較為復(fù)雜,本人決定先從可讀流開(kāi)始學(xué)習(xí)這部分內(nèi)容。
1.2. 什么是流(Stream)
流是一種抽象的數(shù)據(jù)結(jié)構(gòu),是數(shù)據(jù)的集合,其中存儲(chǔ)的數(shù)據(jù)類(lèi)型只能為以下類(lèi)型(僅針對(duì)objectMode === false的情況):
- string
- Buffer
我們可以把流看作這些數(shù)據(jù)的集合,就像液體一樣,我們先把這些液體保存在一個(gè)容器里(流的內(nèi)部緩沖區(qū)BufferList),等到相應(yīng)的事件觸發(fā)的時(shí)候,我們?cè)侔牙锩娴囊后w倒進(jìn)管道里,并通知其他人在管道的另一側(cè)拿自己的容器來(lái)接里面的液體進(jìn)行處理。
1.3. 什么是可讀流(Readable Stream)
可讀流是流的一種類(lèi)型,他有兩種模式三種狀態(tài)
兩種讀取模式:
-
流動(dòng)模式:數(shù)據(jù)會(huì)從底層系統(tǒng)讀取,并通過(guò)EventEmitter盡快的將數(shù)據(jù)傳遞給所注冊(cè)的事件處理程序中
-
暫停模式:在這種模式下將不會(huì)讀取數(shù)據(jù),必須顯示的調(diào)用Stream.read()方法來(lái)從流中讀取數(shù)據(jù)
三種狀態(tài):
-
readableFlowing === null:不會(huì)產(chǎn)生數(shù)據(jù),調(diào)用Stream.pipe()、Stream.resume會(huì)使其狀態(tài)變?yōu)閠rue,開(kāi)始產(chǎn)生數(shù)據(jù)并主動(dòng)觸發(fā)事件
-
readableFlowing === false:此時(shí)會(huì)暫停數(shù)據(jù)的流動(dòng),但不會(huì)暫停數(shù)據(jù)的生成,因此會(huì)產(chǎn)生數(shù)據(jù)積壓
-
readableFlowing === true:正常產(chǎn)生和消耗數(shù)據(jù)
2. 基本原理
2.1. 內(nèi)部狀態(tài)定義(ReadableState)
ReadableState
_readableState: ReadableState { objectMode: false, // 操作除了string、Buffer、null之外的其他類(lèi)型的數(shù)據(jù)需要把這個(gè)模式打開(kāi) highWaterMark: 16384, // 水位限制,1024 * 16,默認(rèn)16kb,超過(guò)這個(gè)限制則會(huì)停止調(diào)用_read()讀數(shù)據(jù)到buffer中 buffer: BufferList { head: null, tail: null, length: 0 }, // Buffer鏈表,用于保存數(shù)據(jù) length: 0, // 整個(gè)可讀流數(shù)據(jù)的大小,如果是objectMode則與buffer.length相等 pipes: [], // 保存監(jiān)聽(tīng)了該可讀流的所有管道隊(duì)列 flowing: null, // 可獨(dú)流的狀態(tài) null、false、true ended: false, // 所有數(shù)據(jù)消費(fèi)完畢 endEmitted: false, // 結(jié)束事件收否已發(fā)送 reading: false, // 是否正在讀取數(shù)據(jù) constructed: true, // 流在構(gòu)造好之前或者失敗之前,不能被銷(xiāo)毀 sync: true, // 是否同步觸發(fā)'readable'/'data'事件,或是等到下一個(gè)tick needReadable: false, // 是否需要發(fā)送readable事件 emittedReadable: false, // readable事件發(fā)送完畢 readableListening: false, // 是否有readable監(jiān)聽(tīng)事件 resumeScheduled: false, // 是否調(diào)用過(guò)resume方法 errorEmitted: false, // 錯(cuò)誤事件已發(fā)送 emitClose: true, // 流銷(xiāo)毀時(shí),是否發(fā)送close事件 autoDestroy: true, // 自動(dòng)銷(xiāo)毀,在'end'事件觸發(fā)后被調(diào)用 destroyed: false, // 流是否已經(jīng)被銷(xiāo)毀 errored: null, // 標(biāo)識(shí)流是否報(bào)錯(cuò) closed: false, // 流是否已經(jīng)關(guān)閉 closeEmitted: false, // close事件是否已發(fā)送 defaultEncoding: 'utf8', // 默認(rèn)字符編碼格式 awaitDrainWriters: null, // 指向監(jiān)聽(tīng)了'drain'事件的writer引用,類(lèi)型為null、Writable、Set<Writable> multiAwaitDrain: false, // 是否有多個(gè)writer等待drain事件 readingMore: false, // 是否可以讀取