久久久久久久视色,久久电影免费精品,中文亚洲欧美乱码在线观看,在线免费播放AV片

<center id="vfaef"><input id="vfaef"><table id="vfaef"></table></input></center>

    <p id="vfaef"><kbd id="vfaef"></kbd></p>

    
    
    <pre id="vfaef"><u id="vfaef"></u></pre>

      <thead id="vfaef"><input id="vfaef"></input></thead>

    1. 站長(zhǎng)資訊網(wǎng)
      最全最豐富的資訊網(wǎng)站

      什么是流(Stream)?如何理解Nodejs中的流

      什么是流?如何理解流?下面本篇文章就來帶大家深入了解一下Node中的流(Stream),希望對(duì)大家有所幫助!

      什么是流(Stream)?如何理解Nodejs中的流

      作者最近在開發(fā)中經(jīng)常使用 pipe 函數(shù),只知道這是流的管道,卻不知道他是如何工作的,所以抱著一探究竟的心理干脆就從流開始學(xué)起,隨便將看過的知識(shí)和源碼整理成一篇文章分享給大家。

      流(Stream)在 Nodejs 中是個(gè)十分基礎(chǔ)的概念,很多基礎(chǔ)模塊都是基于流實(shí)現(xiàn)的,扮演著十分重要的角色。同時(shí)流也是是一個(gè)十分難以理解的概念,這主要是相關(guān)的文檔比較缺少,對(duì)于 NodeJs 初學(xué)者來說,理解流往往需要花很多時(shí)間理解,才能真正掌握這個(gè)概念,所幸的是,對(duì)于大部分 NodeJs 使用者來說,僅僅是用來開發(fā) Web 應(yīng)用,對(duì)流的不充分認(rèn)識(shí)并不影響使用。但是,理解流能夠?qū)?NodeJs 中的其他模塊有更好的理解,同時(shí)在某些情況下,使用流來處理數(shù)據(jù)會(huì)有更好的效果?!鞠嚓P(guān)教程推薦:nodejs視頻教程】

      如何理解流

      • 對(duì)于流的使用者來說,可以將流看作一個(gè)數(shù)組,我們只需要關(guān)注從中獲取(消費(fèi))和寫入(生產(chǎn))就可以了。

      • 對(duì)于流的開發(fā)者(使用stream模塊創(chuàng)建一個(gè)新實(shí)例),關(guān)注的是如何實(shí)現(xiàn)流中的一些方法,通常關(guān)注兩點(diǎn),目標(biāo)資源是誰(shuí)和如何操作目標(biāo)資源,確定了后就需要根據(jù)流的不同狀態(tài)和事件來對(duì)目標(biāo)資源進(jìn)行操作

      緩存池

      NodeJs 中所有的流都有緩沖池,緩沖池存在的目的是增加流的效率,當(dāng)數(shù)據(jù)的生產(chǎn)和消費(fèi)都需要時(shí)間時(shí),我們可以在下一次消費(fèi)前提前生產(chǎn)數(shù)據(jù)存放到緩沖池。但是緩沖池并不是時(shí)刻都處于使用狀態(tài),例如緩存池為空時(shí),數(shù)據(jù)生產(chǎn)后就不會(huì)放入緩存池而是直接消費(fèi)。 。

      如果數(shù)據(jù)生產(chǎn)的速度大于數(shù)據(jù)的消費(fèi)速度,多余的數(shù)據(jù)會(huì)在某個(gè)地方等待。如果數(shù)據(jù)的生產(chǎn)速度小于進(jìn)程數(shù)據(jù)的消費(fèi)速度,那么數(shù)據(jù)會(huì)在某個(gè)地方累計(jì)到一定的數(shù)量,然后在進(jìn)行消費(fèi)。(開發(fā)者無(wú)法控制數(shù)據(jù)的生產(chǎn)和消費(fèi)速度,只能盡量在何時(shí)的時(shí)機(jī)生產(chǎn)數(shù)據(jù)或者消費(fèi)數(shù)據(jù))

      那個(gè)數(shù)據(jù)等待,累計(jì)數(shù)據(jù),然后發(fā)生出去的地方。就是緩沖池。緩沖池通常位于電腦的RAM(內(nèi)存)中。

      舉一個(gè)常見的緩沖區(qū)的例子,我們?cè)谟^看在線視頻的時(shí)候,如果你的網(wǎng)速很快,緩沖區(qū)總是會(huì)被立即填充,然后發(fā)送給系統(tǒng)播放,然后立即緩沖下一段視頻。觀看的過程中,不會(huì)有卡頓。如果網(wǎng)速很慢,則會(huì)看到loading,表示緩沖區(qū)正在被填充,當(dāng)填充完成后數(shù)據(jù)被發(fā)送給系統(tǒng),才能看到這段視頻。

      NodeJs 流的緩存池是一個(gè) Buffer 鏈表,每一次想緩存池中加入數(shù)據(jù)都會(huì)重新創(chuàng)建一個(gè) Buffer 節(jié)點(diǎn)插入到鏈表尾部。

      EventEmitter

      NodeJs 中對(duì) Stream 是一個(gè)實(shí)現(xiàn)了 EventEmitter 的抽象接口,所以我會(huì)先簡(jiǎn)單的介紹一下 EventEmitter。

      EventEmitter 是一個(gè)實(shí)現(xiàn)事件發(fā)布訂閱功能的類,其中常用的幾個(gè)方法(on, once, off, emit)相信大家都耳熟能詳了,就不一一介紹了。

      const { EventEmitter } = require('events')  const eventEmitter = new EventEmitter()  // 為 eventA 事件綁定處理函數(shù) eventEmitter.on('eventA', () => {     console.log('eventA active 1'); });  // 為 eventB 事件綁定處理函數(shù) eventEmitter.on('eventB', () => {     console.log('eventB active 1'); });  eventEmitter.once('eventA', () => {     console.log('eventA active 2'); });  // 觸發(fā) eventA eventEmitter.emit('eventA') // eventA active 1 // eventA active 2

      值得注意的是, EventEmitter 有兩個(gè)叫做 newListenerremoveListener 的事件,當(dāng)你向一個(gè)事件對(duì)象中添加任何事件監(jiān)聽函數(shù)后,都會(huì)觸發(fā) newListener(eventEmitter.emit('newListener')),當(dāng)一個(gè)處理函數(shù)被移除時(shí)同理會(huì)觸發(fā) removeListener。

      還需要注意的是, once 綁定的處理函數(shù)只會(huì)執(zhí)行一次,removeListener 將在其執(zhí)行前被觸發(fā),這意味著 once 綁定的監(jiān)聽函數(shù)是先被移除才被觸發(fā)的。

      const { EventEmitter } = require('events')  const eventEmitter = new EventEmitter()  eventEmitter.on('newListener', (event, listener)=>{     console.log('newListener', event, listener) })  eventEmitter.on('removeListener', (event, listener) => {     console.log('removeListener', event, listener) }) //newListener removeListener[Function(anonymous)]   eventEmitter.on('eventA', () => {     console.log('eventA active 1'); }); //newListener eventA [Function (anonymous)]  function listenerB() { console.log('eventB active 1'); } eventEmitter.on('eventB', listenerB); // newListener eventB [Function (anonymous)]  eventEmitter.once('eventA', () => {     console.log('eventA active 2'); }); // newListener eventA [Function (anonymous)]  eventEmitter.emit('eventA') // eventA active 1 // removeListener eventA [Function: bound onceWrapper] { listener: [Function (anonymous)] } // eventA active 2  eventEmitter.off('eventB', listenerB) // removeListener eventB[Function: listenerB]

      不過這對(duì)于我們后面的內(nèi)容來說并不重要。

      Stream

      Stream 是在 Node.js 中處理流數(shù)據(jù)的抽象接口。Stream 并不是一個(gè)實(shí)際的接口,而是對(duì)所有流的一種統(tǒng)稱。實(shí)際的接口有 ReadableStream、 WritableStream、ReadWriteStream 這幾個(gè)。

      interface ReadableStream extends EventEmitter {     readable: boolean;     read(size?: number): string | Buffer;     setEncoding(encoding: BufferEncoding): this;     pause(): this;     resume(): this;     isPaused(): boolean;     pipe<T extends WritableStream>(destination: T, options?: { end?: boolean | undefined; }): T;     unpipe(destination?: WritableStream): this;     unshift(chunk: string | Uint8Array, encoding?: BufferEncoding): void;     wrap(oldStream: ReadableStream): this;     [Symbol.asyncIterator](): AsyncIterableIterator<string | Buffer>; }  interface WritableStream extends EventEmitter {     writable: boolean;     write(buffer: Uint8Array | string, cb?: (err?: Error | null) => void): boolean;     write(str: string, encoding?: BufferEncoding, cb?: (err?: Error | null) => void): boolean;     end(cb?: () => void): this;     end(data: string | Uint8Array, cb?: () => void): this;     end(str: string, encoding?: BufferEncoding, cb?: () => void): this; }  interface ReadWriteStream extends ReadableStream, WritableStream { }

      可以看出 ReadableStream 和 WritableStream 都是繼承 EventEmitter 類的接口(ts中接口是可以繼承類的,因?yàn)樗麄冎皇窃谶M(jìn)行類型的合并)。

      上面這些接口對(duì)應(yīng)的實(shí)現(xiàn)類分別是 Readable、Writable 和 Duplex

      NodeJs中的流有4種:

      • Readable 可讀流(實(shí)現(xiàn)ReadableStream)
      • Writable 可寫流(實(shí)現(xiàn)WritableStream)
      • Duplex 可讀可寫流(繼承Readable后實(shí)現(xiàn)WritableStream)
      • Transform 轉(zhuǎn)換流(繼承Duplex)

      背壓?jiǎn)栴}

      磁盤寫入數(shù)據(jù)的速度是遠(yuǎn)低于內(nèi)存的,我們想象內(nèi)存和磁盤之間有一個(gè)“管道”,“管道”中是“流”,內(nèi)存的數(shù)據(jù)流入管道是非??斓?,當(dāng)管道塞滿時(shí),內(nèi)存中就會(huì)產(chǎn)生數(shù)據(jù)背壓,數(shù)據(jù)積壓在內(nèi)存中,占用資源。

      什么是流(Stream)?如何理解Nodejs中的流

      NodeJs Stream 的解決辦法是為每一個(gè)流的 緩存池(就是圖中寫入隊(duì)列)設(shè)置一個(gè)浮標(biāo)值,當(dāng)其中數(shù)據(jù)量達(dá)到這個(gè)浮標(biāo)值后,往緩存池再次 push 數(shù)據(jù)時(shí)就會(huì)返回 false,表示當(dāng)前流中緩存池內(nèi)容已經(jīng)達(dá)到浮標(biāo)值,不希望再有數(shù)據(jù)寫入了,這時(shí)我們應(yīng)該立即停止數(shù)據(jù)的生產(chǎn),防止緩存池過大產(chǎn)生背壓。

      Readable

      可讀流(Readable)是流的一種類型,他有兩種模式三種狀態(tài)

      兩種讀取模式:

      • 流動(dòng)模式:數(shù)據(jù)會(huì)從底層系統(tǒng)讀取寫入到緩沖區(qū),當(dāng)緩沖區(qū)被寫滿后自動(dòng)通過 EventEmitter 盡快的將數(shù)據(jù)傳遞給所注冊(cè)的事件處理程序中

      • 暫停模式:在這種模式下將不會(huì)主動(dòng)觸發(fā) EventEmitter 傳輸數(shù)據(jù),必須顯示的調(diào)用 Readable.read() 方法來從緩沖區(qū)中讀取數(shù)據(jù),read 會(huì)觸發(fā)響應(yīng)到 EventEmitter 事件。

      三種狀態(tài):

      • readableFlowing === null(初始狀態(tài))

      • readableFlowing === false(暫停模式)

      • readableFlowing === true(流動(dòng)模式)

      初始時(shí)流的 readable.readableFlowingnull

      添加data事件后變?yōu)?true 。調(diào)用 pause()、unpipe()、或接收到背壓或者添加 readable 事件,則 readableFlowing 會(huì)被設(shè)為 false ,在這個(gè)狀態(tài)下,為 data 事件綁定監(jiān)聽器不會(huì)使 readableFlowing 切換到 true。

      調(diào)用 resume() 可以讓可讀流的 readableFlowing 切換到 true

      移除所有的 readable 事件是使 readableFlowing 變?yōu)?null 的唯一方法。

      事件名 說明
      readable 當(dāng)緩沖區(qū)有新的可讀取數(shù)據(jù)時(shí)觸發(fā)(每一個(gè)想緩存池插入節(jié)點(diǎn)都會(huì)觸發(fā))
      data 每一次消費(fèi)數(shù)據(jù)后都會(huì)觸發(fā),參數(shù)是本次消費(fèi)的數(shù)據(jù)
      close 流關(guān)閉時(shí)觸發(fā)
      error 流發(fā)生錯(cuò)誤時(shí)觸發(fā)
      方法名 說明
      read(size) 消費(fèi)長(zhǎng)度為size的數(shù)據(jù),返回null表示當(dāng)前數(shù)據(jù)不足size,否則返回本次消費(fèi)的數(shù)據(jù)。size不傳遞時(shí)表示消費(fèi)緩存池中所有數(shù)據(jù)
      const fs = require('fs');  const readStreams = fs.createReadStream('./EventEmitter.js', {     highWaterMark: 100// 緩存池浮標(biāo)值 })  readStreams.on('readable', () => {     console.log('緩沖區(qū)滿了')     readStreams.read()// 消費(fèi)緩存池的所有數(shù)據(jù),返回結(jié)果并且觸發(fā)data事件 })   readStreams.on('data', (data) => {     console.log('data') })

      https://github1s.com/nodejs/node/blob/v16.14.0/lib/internal/streams/readable.js#L527

      當(dāng) size 為 0 會(huì)觸發(fā) readable 事件。

      當(dāng)緩存池中的數(shù)據(jù)長(zhǎng)度達(dá)到浮標(biāo)值 highWaterMark 后,就不會(huì)在主動(dòng)請(qǐng)求生產(chǎn)數(shù)據(jù),而是等待數(shù)據(jù)被消費(fèi)后在生產(chǎn)數(shù)據(jù)

      暫停狀態(tài)的流如果不調(diào)用 read 來消費(fèi)數(shù)據(jù)時(shí),后續(xù)也不會(huì)在觸發(fā) datareadable,當(dāng)調(diào)用 read 消費(fèi)時(shí)會(huì)先判斷本次消費(fèi)后剩余的數(shù)據(jù)長(zhǎng)度是否低于 浮標(biāo)值,如果低于 浮標(biāo)值 就會(huì)在消費(fèi)前請(qǐng)求生產(chǎn)數(shù)據(jù)。這樣在 read 后的邏輯執(zhí)行完成后新的數(shù)據(jù)大概率也已經(jīng)生產(chǎn)完成,然后再次觸發(fā) readable,這種提前生產(chǎn)下一次消費(fèi)的數(shù)據(jù)存放在緩存池的機(jī)制也是緩存流為什么快的原因

      流動(dòng)狀態(tài)下的流有兩種情況

      • 生產(chǎn)速度慢于消費(fèi)速度時(shí):這種情況下每一個(gè)生產(chǎn)數(shù)據(jù)后一般緩存池中都不會(huì)有剩余數(shù)據(jù),直接將本次生產(chǎn)的數(shù)據(jù)傳遞給 data 事件即可(因?yàn)闆]有進(jìn)入緩存池,所以也不用調(diào)用 read 來消費(fèi)),然后立即開始生產(chǎn)新數(shù)據(jù),待上一次數(shù)據(jù)消費(fèi)完后新數(shù)據(jù)才生產(chǎn)好,再次觸發(fā) data ,一只到流結(jié)束。
      • 生產(chǎn)速度快于消費(fèi)速度時(shí):此時(shí)每一次生產(chǎn)完數(shù)據(jù)后一般緩存池都還存在未消費(fèi)的數(shù)據(jù),這種情況一般會(huì)在消費(fèi)數(shù)據(jù)時(shí)開始生產(chǎn)下一次消費(fèi)的數(shù)據(jù),待舊數(shù)據(jù)消費(fèi)完后新數(shù)據(jù)已經(jīng)生產(chǎn)完并且放入緩存池

      他們的區(qū)別僅僅在于數(shù)據(jù)生產(chǎn)后緩存池是否還存在數(shù)據(jù),如果存在數(shù)據(jù)則將生產(chǎn)的數(shù)據(jù) push 到緩存池等待消費(fèi),如果不存在則直接將數(shù)據(jù)交給 data 而不加入緩存池。

      值得注意的是當(dāng)一個(gè)緩存池中存在數(shù)據(jù)的流從暫停模式進(jìn)入的流動(dòng)模式時(shí),會(huì)先循環(huán)調(diào)用 read 來消費(fèi)數(shù)據(jù)只到返回 null

      暫停模式

      什么是流(Stream)?如何理解Nodejs中的流

      暫停模式下,一個(gè)可讀流讀創(chuàng)建時(shí),模式是暫停模式,創(chuàng)建后會(huì)自動(dòng)調(diào)用 _read 方法,把數(shù)據(jù)從數(shù)據(jù)源 push 到緩沖池中,直到緩沖池中的數(shù)據(jù)達(dá)到了浮標(biāo)值。每當(dāng)數(shù)據(jù)到達(dá)浮標(biāo)值時(shí),可讀流會(huì)觸發(fā)一個(gè) " readable " 事件,告訴消費(fèi)者有數(shù)據(jù)已經(jīng)準(zhǔn)備好了,可以繼續(xù)消費(fèi)。

      一般來說, 'readable' 事件表明流有新的動(dòng)態(tài):要么有新的數(shù)據(jù),要么到達(dá)流的盡頭。所以,數(shù)據(jù)源的數(shù)據(jù)被讀完前,也會(huì)觸發(fā)一次 'readable' 事件;

      消費(fèi)者 " readable " 事件的處理函數(shù)中,通過 stream.read(size) 主動(dòng)消費(fèi)緩沖池中的數(shù)據(jù)。

      const { Readable } = require('stream')  let count = 1000 const myReadable = new Readable({     highWaterMark: 300,     // 參數(shù)的 read 方法會(huì)作為流的 _read 方法,用于獲取源數(shù)據(jù)     read(size) {         // 假設(shè)我們的源數(shù)據(jù)上 1000 個(gè)1         let chunk = null         // 讀取數(shù)據(jù)的過程一般是異步的,例如IO操作         setTimeout(() => {             if (count > 0) {                 let chunkLength = Math.min(count, size)                 chunk = '1'.repeat(chunkLength)                 count -= chunkLength             }             this.push(chunk)         }, 500)     } }) // 每一次成功 push 數(shù)據(jù)到緩存池后都會(huì)觸發(fā) readable myReadable.on('readable', () => {     const chunk = myReadable.read()//消費(fèi)當(dāng)前緩存池中所有數(shù)據(jù)     console.log(chunk.toString()) })

      值得注意的是, 如果 read(size) 的 size 大于浮標(biāo)值,會(huì)重新計(jì)算新的浮標(biāo)值,新浮標(biāo)值是size的下一個(gè)二次冪(size <= 2^n,n取最小值)

      //  hwm 不會(huì)大于 1GB. const MAX_HWM = 0x40000000; function computeNewHighWaterMark(n) {   if (n >= MAX_HWM) {     // 1GB限制     n = MAX_HWM;   } else {     //取下一個(gè)2最高冪,以防止過度增加hwm     n--;     n |= n >>> 1;     n |= n >>> 2;     n |= n >>> 4;     n |= n >>> 8;     n |= n >>> 16;     n++;   }   return n; }

      流動(dòng)模式

      什么是流(Stream)?如何理解Nodejs中的流

      所有可讀流開始的時(shí)候都是暫停模式,可以通過以下方法可以切換至流動(dòng)模式:

      • 添加 " data " 事件句柄;
      • 調(diào)用 “ resume ”方法;
      • 使用 " pipe " 方法把數(shù)據(jù)發(fā)送到可寫流

      流動(dòng)模式下,緩沖池里面的數(shù)據(jù)會(huì)自動(dòng)輸出到消費(fèi)端進(jìn)行消費(fèi),同時(shí),每次輸出數(shù)據(jù)后,會(huì)自動(dòng)回調(diào) _read 方法,把數(shù)據(jù)源的數(shù)據(jù)放到緩沖池中,如果此時(shí)緩存池中不存在數(shù)據(jù)則會(huì)直接吧數(shù)據(jù)傳遞給 data 事件,不會(huì)經(jīng)過緩存池;直到流動(dòng)模式切換至其他暫停模式,或者數(shù)據(jù)源的數(shù)據(jù)被讀取完了( push(null) );

      可讀流可以通過以下方式切換回暫停模式:

      • 如果沒有管道目標(biāo),則調(diào)用 stream.pause() 。
      • 如果有管道目標(biāo),則移除所有管道目標(biāo)。調(diào)用 stream.unpipe() 可以移除多個(gè)管道目標(biāo)。
      const { Readable } = require('stream')  let count = 1000 const myReadable = new Readable({     highWaterMark: 300,     read(size) {         let chunk = null         setTimeout(() => {             if (count > 0) {                 let chunkLength = Math.min(count, size)                 chunk = '1'.repeat(chunkLength)                 count -= chunkLength             }             this.push(chunk)         }, 500)     } })  myReadable.on('data', data => {     console.log(data.toString()) })

      Writable

      相對(duì)可讀流來說,可寫流要簡(jiǎn)單一些。

      什么是流(Stream)?如何理解Nodejs中的流

      當(dāng)生產(chǎn)者調(diào)用 write(chunk) 時(shí),內(nèi)部會(huì)根據(jù)一些狀態(tài)(corked,writing等)選擇是否緩存到緩沖隊(duì)列中或者調(diào)用 _write,每次寫完數(shù)據(jù)后,會(huì)嘗試清空緩存隊(duì)列中的數(shù)據(jù)。如果緩沖隊(duì)列中的數(shù)據(jù)大小超出了浮標(biāo)值(highWaterMark),消費(fèi)者調(diào)用 write(chunk) 后會(huì)返回 false,這時(shí)候生產(chǎn)者應(yīng)該停止繼續(xù)寫入。

      那么什么時(shí)候可以繼續(xù)寫入呢?當(dāng)緩沖中的數(shù)據(jù)都被成功 _write 之后,清空了緩沖隊(duì)列后會(huì)觸發(fā) drain 事件,這時(shí)候生產(chǎn)者可以繼續(xù)寫入數(shù)據(jù)。

      當(dāng)生產(chǎn)者需要結(jié)束寫入數(shù)據(jù)時(shí),需要調(diào)用 stream.end 方法通知可寫流結(jié)束。

      const { Writable, Duplex } = require('stream') let fileContent = '' const myWritable = new Writable({     highWaterMark: 10,     write(chunk, encoding, callback) {// 會(huì)作為_write方法         setTimeout(()=>{             fileContent += chunk             callback()// 寫入結(jié)束后調(diào)用         }, 500)     } })  myWritable.on('close', ()=>{     console.log('close', fileContent) }) myWritable.write('123123')// true myWritable.write('123123')// false myWritable.end()

      注意,在緩存池中數(shù)據(jù)到達(dá)浮標(biāo)值后,此時(shí)緩存池中可能存在多個(gè)節(jié)點(diǎn),在清空緩存池的過程中(循環(huán)調(diào)用_read),并不會(huì)向可讀流一樣盡量一次消費(fèi)長(zhǎng)度為浮標(biāo)值的數(shù)據(jù),而是每次消費(fèi)一個(gè)緩沖區(qū)節(jié)點(diǎn),即使這個(gè)緩沖區(qū)長(zhǎng)度于浮標(biāo)值不一致也是如此

      const { Writable } = require('stream')   let fileContent = '' const myWritable = new Writable({     highWaterMark: 10,     write(chunk, encoding, callback) {         setTimeout(()=>{             fileContent += chunk             console.log('消費(fèi)', chunk.toString())             callback()// 寫入結(jié)束后調(diào)用         }, 100)     } })  myWritable.on('close', ()=>{     console.log('close', fileContent) })  let count = 0 function productionData(){     let flag = true     while (count <= 20 && flag){         flag = myWritable.write(count.toString())         count++     }     if(count > 20){         myWritable.end()     } } productionData() myWritable.on('drain', productionData)

      上述是一個(gè)浮標(biāo)值為 10 的可寫流,現(xiàn)在數(shù)據(jù)源是一個(gè) 0——20 到連續(xù)的數(shù)字字符串,productionData 用于寫入數(shù)據(jù)。

      • 首先第一次調(diào)用 myWritable.write("0") 時(shí),因?yàn)榫彺娉夭淮嬖跀?shù)據(jù),所以 "0" 不進(jìn)入緩存池,而是直接交給 _wirte,myWritable.write("0") 返回值為 true

      • 當(dāng)執(zhí)行 myWritable.write("1") 時(shí),因?yàn)?_wirtecallback 還未調(diào)用,表明上一次數(shù)據(jù)還未寫入完,位置保證數(shù)據(jù)寫入的有序性,只能創(chuàng)建一個(gè)緩沖區(qū)將 "1" 加入緩存池中。后面 2-9 都是如此

      • 當(dāng)執(zhí)行 myWritable.write("10") 時(shí),此時(shí)緩沖區(qū)長(zhǎng)度為 9(1-9),還未到達(dá)浮標(biāo)值, "10" 繼續(xù)作為一個(gè)緩沖區(qū)加入緩存池中,此時(shí)緩存池長(zhǎng)度變?yōu)?11,所以 myWritable.write("1") 返回 false,這意味著緩沖區(qū)的數(shù)據(jù)已經(jīng)足夠,我們需要等待 drain 事件通知時(shí)再生產(chǎn)數(shù)據(jù)。

      • 100ms過后,_write("0", encoding, callback)callback 被調(diào)用,表明 "0" 已經(jīng)寫入完成。然后會(huì)檢查緩存池中是否存在數(shù)據(jù),如果存在則會(huì)先調(diào)用 _read 消費(fèi)緩存池的頭節(jié)點(diǎn)("1"),然后繼續(xù)重復(fù)這個(gè)過程直到緩存池為空后觸發(fā) drain 事件,再次執(zhí)行 productionData

      • 調(diào)用 myWritable.write("11"),觸發(fā)第1步開始的過程,直到流結(jié)束。

      Duplex

      在理解了可讀流與可寫流后,雙工流就好理解了,雙工流事實(shí)上是繼承了可讀流然后實(shí)現(xiàn)了可寫流(源碼是這么寫的,但是應(yīng)該說是同時(shí)實(shí)現(xiàn)了可讀流和可寫流更加好)。

      什么是流(Stream)?如何理解Nodejs中的流

      Duplex 流需要同時(shí)實(shí)現(xiàn)下面兩個(gè)方法

      • 實(shí)現(xiàn) _read() 方法,為可讀流生產(chǎn)數(shù)據(jù)

      • 實(shí)現(xiàn) _write() 方法,為可寫流消費(fèi)數(shù)據(jù)

      上面兩個(gè)方法如何實(shí)現(xiàn)在上面可寫流可讀流的部分已經(jīng)介紹過了,這里需要注意的是,雙工流是存在兩個(gè)獨(dú)立的緩存池分別提供給兩個(gè)流,他們的數(shù)據(jù)源也不一樣

      以 NodeJs 的標(biāo)準(zhǔn)輸入輸出流為例:

      • 當(dāng)我們?cè)诳刂婆_(tái)輸入數(shù)據(jù)時(shí)會(huì)觸發(fā)其 data 事件,這證明他有可讀流的功能,每一次用戶鍵入回車相當(dāng)于調(diào)用可讀的 push 方法推送生產(chǎn)的數(shù)據(jù)。
      • 當(dāng)我們調(diào)用其 write 方法時(shí)也可以向控制臺(tái)輸出內(nèi)容,但是不會(huì)觸發(fā) data 事件,這說明他有可寫流的功能,而且有獨(dú)立的緩沖區(qū),_write 方法的實(shí)現(xiàn)內(nèi)容就是讓控制臺(tái)展示文字。
      // 每當(dāng)用戶在控制臺(tái)輸入數(shù)據(jù)(_read),就會(huì)觸發(fā)data事件,這是可讀流的特性 process.stdin.on('data', data=>{     process.stdin.write(data); })  // 每隔一秒向標(biāo)準(zhǔn)輸入流生產(chǎn)數(shù)據(jù)(這是可寫流的特性,會(huì)直接輸出到控制臺(tái)上),不會(huì)觸發(fā)data setInterval(()=>{     process.stdin.write('不是用戶控制臺(tái)輸入的數(shù)據(jù)') }, 1000)

      Transform

      什么是流(Stream)?如何理解Nodejs中的流

      可以將 Duplex 流視為具有可寫流的可讀流。兩者都是獨(dú)立的,每個(gè)都有獨(dú)立的內(nèi)部緩沖區(qū)。讀寫事件獨(dú)立發(fā)生。

                                   Duplex Stream                           ------------------|                     Read  <-----               External Source             You           ------------------|                       Write ----->               External Sink                           ------------------|

      Transform 流是雙工的,其中讀寫以因果關(guān)系進(jìn)行。雙工流的端點(diǎn)通過某種轉(zhuǎn)換鏈接。讀取要求發(fā)生寫入。

                                       Transform Stream                            --------------|--------------             You     Write  ---->                   ---->  Read  You                            --------------|--------------

      對(duì)于創(chuàng)建 Transform 流,最重要的是要實(shí)現(xiàn) _transform 方法而不是 _write 或者 _read。 _transform 中對(duì)可寫流寫入的數(shù)據(jù)做處理(消費(fèi))然后為可讀流生產(chǎn)數(shù)據(jù)。

      轉(zhuǎn)換流還經(jīng)常會(huì)實(shí)現(xiàn)一個(gè) `_flush` 方法,他會(huì)在流結(jié)束前被調(diào)用,一般用于對(duì)流的末尾追加一些東西,例如壓縮文件時(shí)的一些壓縮信息就是在這里加上的
      const { write } = require('fs') const { Transform, PassThrough } = require('stream')  const reurce = '1312123213124341234213423428354816273513461891468186499126412'  const transform = new Transform({     highWaterMark: 10,     transform(chunk ,encoding, callback){// 轉(zhuǎn)換數(shù)據(jù),調(diào)用push將轉(zhuǎn)換結(jié)果加入緩存池         this.push(chunk.toString().replace('1', '@'))         callback()     },     flush(callback){// end觸發(fā)前執(zhí)行         this.push('<<<')         callback()     } })   // write 不斷寫入數(shù)據(jù) let count = 0 transform.write('>>>') function productionData() {     let flag = true     while (count <= 20 && flag) {         flag = transform.write(count.toString())         count++     }     if (count > 20) {         transform.end()     } } productionData() transform.on('drain', productionData)   let result = '' transform.on('data', data=>{     result += data.toString() }) transform.on('end', ()=>{     console.log(result)     // >>>0@23456789@0@1@2@3@4@5@6@7@8@920<<< })

      Pipe

      管道是將上一個(gè)程序的輸出作為下一個(gè)程序的輸入,這是管道在 Linux 中管道的作用。NodeJs 中的管道其實(shí)也類似,它管道用于連接兩個(gè)流,上游的流的輸出會(huì)作為下游的流的輸入。

      什么是流(Stream)?如何理解Nodejs中的流

      管道 sourec.pipe(dest, options) 要求 sourec 是可讀的,dest是可寫的。其返回值是 dest。

      對(duì)于處于管道中間的流既是下一個(gè)流的上游也是上一個(gè)流的下游,所以其需要時(shí)一個(gè)可讀可寫的雙工流,一般我們會(huì)使用轉(zhuǎn)換流來作為管道中間的流。

      https://github1s.com/nodejs/node/blob/v17.0.0/lib/internal/streams/legacy.js#L16-L33

      Stream.prototype.pipe = function(dest, options) {   const source = this;    function ondata(chunk) {     if (dest.writable && dest.write(chunk) === false && source.pause) {       source.pause();     }   }    source.on('data', ondata);    function ondrain() {     if (source.readable && source.resume) {       source.resume();     }   }    dest.on('drain', ondrain);   // ...后面的代碼省略 }

      pipe 的實(shí)現(xiàn)非常清晰,當(dāng)上游的流發(fā)出 data 事件時(shí)會(huì)調(diào)用下游流的 write 方法寫入數(shù)據(jù),然后立即調(diào)用 source.pause() 使得上游變?yōu)闀和顟B(tài),這主要是為了防止背壓。

      當(dāng)下游的流將數(shù)據(jù)消費(fèi)完成后會(huì)調(diào)用 source.resume() 使上游再次變?yōu)榱鲃?dòng)狀態(tài)。

      我們實(shí)現(xiàn)一個(gè)將 data 文件中所有 1 替換為 @ 然后輸出到 result 文件到管道。

      const { Transform } = require('stream') const { createReadStream, createWriteStream } = require('fs')  // 一個(gè)位于管道中的轉(zhuǎn)換流 function createTransformStream(){     return new Transform({         transform(chunk, encoding, callback){             this.push(chunk.toString().replace(/1/g, '@'))             callback()         }     }) } createReadStream('./data') .pipe(createTransformStream()) .pipe(createWriteStream('./result'))

      在管道中只存在兩個(gè)流時(shí),其功能和轉(zhuǎn)換流有點(diǎn)類似,都是將一個(gè)可讀流與一個(gè)可寫流串聯(lián)起來,但是管道可以串聯(lián)多個(gè)流。

      原文地址:https://juejin.cn/post/7077511716564631566

      作者:月夕

      贊(0)
      分享到: 更多 (0)
      網(wǎng)站地圖   滬ICP備18035694號(hào)-2    滬公網(wǎng)安備31011702889846號(hào)