Node.js 中的流(Stream)是出了名的難用甚至是難以理解?!疽曨l教程推薦:nodejs視頻教程 】
用 Dominic Tarr 的話來說:“流是 Node 中最好的,也是最容易被誤解的想法。”即使是 Redux 的創(chuàng)建者和 React.js 的核心團隊成員 Dan Abramov 也害怕 Node 流。
本文將幫助你了解流以及如何使用。不要害怕,你完全可以把它搞清楚!
什么是流(Stream)?
流(Stream)是為 Node.js 應(yīng)用提供動力的基本概念之一。它們是數(shù)據(jù)處理方法,用于將輸入的數(shù)據(jù)順序讀取或把數(shù)據(jù)寫入輸出。
流是一種以有效方式處理讀寫文件、網(wǎng)絡(luò)通信或任何類型的端到端信息交換的方式。
流的處理方式非常獨特,流不是像傳統(tǒng)方式那樣將文件一次全部讀取到存儲器中,而是逐段讀取數(shù)據(jù)塊并處理數(shù)據(jù)的內(nèi)容,不將其全部保留在內(nèi)存中。
這種方式使流在處理大量數(shù)據(jù)時非常強大,例如,文件的大小可能大于可用的內(nèi)存空間,從而無法將整個文件讀入內(nèi)存進行處理。那是流的用武之地!
既能用流來處理較小的數(shù)據(jù)塊,也可以讀取較大的文件。
以 YouTube 或 Netflix 之類的“流媒體”服務(wù)為例:這些服務(wù)不會讓你你立即下載視頻和音頻文件。取而代之的是,你的瀏覽器以連續(xù)的塊流形式接收視頻,從而使接收者幾乎可以立即開始觀看和收聽。
但是,流不僅涉及處理媒體和大數(shù)據(jù)。它們還在代碼中賦予了我們“可組合性”的力量??紤]可組合性的設(shè)計意味著能夠以某種方式組合多個組件以產(chǎn)生相同類型的結(jié)果。在 Node.js 中,可以通過流在其他較小的代碼段中傳遞數(shù)據(jù),從而組成功能強大的代碼段。
為什么使用流?
與其他數(shù)據(jù)處理方法相比,流基本上具有兩個主要優(yōu)點:
- 內(nèi)存效率:你無需事先把大量數(shù)據(jù)加載到內(nèi)存中即可進行處理
- 時間效率:得到數(shù)據(jù)后立即開始處所需的時間大大減少,不必等到整個有效數(shù)據(jù)全部發(fā)送完畢才開始處理
Node.js 中有 4 種流:
- 可寫流:可以向其中寫入數(shù)據(jù)的流。例如,
fs.createWriteStream()
使我們可以使用流將數(shù)據(jù)寫入文件。 - 可讀流:可從中讀取數(shù)據(jù)的流。例如:
fs.createReadStream()
讓我們讀取文件的內(nèi)容。 - 雙工流(可讀寫的流):可讀和可寫的流。例如,
net.Socket
- Transform:可在寫入和讀取時修改或轉(zhuǎn)換數(shù)據(jù)。例如在文件壓縮的情況下,你可以在文件中寫入壓縮數(shù)據(jù),也可以從文件中讀取解壓縮的數(shù)據(jù)。
如果你已經(jīng)使用過 Node.js,則可能遇到過流。例如在基于 Node.js 的 HTTP 服務(wù)器中,request
是可讀流,而 response
是可寫流。你可能用過 fs
模塊,該模塊可讓你用可讀和可寫文件流。每當(dāng)使用 Express 時,你都在使用流與客戶端進行交互,而且由于 TCP 套接字、TLS棧和其他連接都基于 Node.js,所以在每個可以使用的數(shù)據(jù)庫連接驅(qū)動的程序中使用流。
實例
如何創(chuàng)建可讀流?
首先需要可讀性流,然后將其初始化。
const Stream = require('stream') const readableStream = new Stream.Readable()
現(xiàn)在,流已初始化,可以向其發(fā)送數(shù)據(jù)了:
readableStream.push('ping!') readableStream.push('pong!')
異步迭代器
強烈建議在使用流時配合異步迭代器(async iterator)。根據(jù) Axel Rauschmayer 博士的說法,異步迭代是一種用于異步檢索數(shù)據(jù)容器內(nèi)容的協(xié)議(這意味著當(dāng)前“任務(wù)”可以在檢索項目之前被暫停)。另外必須提及的是,流異步迭代器實現(xiàn)使用內(nèi)部的 readable
事件。
從可讀流中讀取時,可以使用異步迭代器:
import * as fs from 'fs'; async function logChunks(readable) { for await (const chunk of readable) { console.log(chunk); } } const readable = fs.createReadStream( 'tmp/test.txt', {encoding: 'utf8'}); logChunks(readable); // Output: // 'This is a test!n'
也可以用字符串收集可讀流的內(nèi)容:
import {Readable} from 'stream'; async function readableToString2(readable) { let result = ''; for await (const chunk of readable) { result += chunk; } return result; } const readable = Readable.from('Good morning!', {encoding: 'utf8'}); assert.equal(await readableToString2(readable), 'Good morning!');
注意,在這種情況下必須使用異步函數(shù),因為我們想返回 Promise。
請切記不要將異步功能與 EventEmitter
混合使用,因為當(dāng)前在事件處理程序中發(fā)出拒絕時,無法捕獲拒絕,從而導(dǎo)致難以跟蹤錯誤和內(nèi)存泄漏。目前的最佳實踐是始終將異步函數(shù)的內(nèi)容包裝在 try/catch 塊中并處理錯誤,但這很容易出錯。 這個 pull request 旨在解決一旦其落在 Node 核心上產(chǎn)生的問題。
要了解有關(guān)異步迭代的 Node.js 流的