本篇文章帶大家了解一下Nodejs中的可寫流write,介紹一下Node可寫流write的實現(xiàn)。有一定的參考價值,有需要的朋友可以參考一下,希望對大家有所幫助。
【推薦學習:《nodejs 教程》】
可寫流-Writable
fs.createWriteStream調用例子
- 首次讀取的數(shù)據會真實寫入目標文件
- 其余次讀取的數(shù)據要根據讀取數(shù)據是否超出highWaterMark ,是的話存入緩存區(qū)等待寫入目標文件中
const fs = require("fs"); const path = require("path"); const bPath = path.join(__dirname, "b.txt"); let ws = fs.createWriteStream(bPath, { flags: "w", encoding: "utf-8", autoClose: true, start: 0, highWaterMark: 3, }); ws.on("open", function (fd) { console.log("open", fd); }); ws.on("close", function () { console.log("close"); }); //string 或者buffer,ws.write 還有一個boolea的返回值 ws.write("1"); //flag 表示 當前要寫的值是直接是否直接寫入文件,不能超出了單次最大寫入值highWaterMark let flag = ws.write("1"); console.log({ flag });//true flag = ws.write("1"); console.log({ flag });//false flag = ws.write("1"); console.log({ flag });//false flag = ws.write("14444444"); console.log({ flag });//false ws.end(); //write+close,沒有調用 end 是不會調用 觸發(fā)close的,看到這里的小伙伴可以嘗試注釋end() 看看close的console是否有打印
- 效果
自定義可寫流initWriteStream
繼承EventEmitter發(fā)布訂閱
const EventEmitter = require("events"); const fs = require("fs"); class WriteStream extends EventEmitter {} module.exports = WriteStream;
鏈表生成隊列做文件讀取的緩存
鏈表&隊列的實現(xiàn)
https://juejin.cn/post/6973847774752145445
// 用鏈表 生成隊列 對 文件緩存區(qū)的讀取 進行優(yōu)化 const Queue = require("./queue");
初始化實例默認數(shù)據constructor()
constructor(path, options = {}) { super(); this.path = path; this.flags = options.flags || "w"; this.encoding = options.encoding || "utf8"; this.mode = options.mode || 0o666; //默認8進制 ,6 6 6 三組分別的權限是 可讀可寫 this.autoClose = options.start || 0; this.highWaterMark = options.highWaterMark || 16 * 1024; //默認一次讀取16個字節(jié)的數(shù)據 this.len = 0; //用于維持有多少數(shù)據還沒有被寫入文件中 //是否根據等待當前讀取的最大文數(shù)據 排空后再寫入 this.needDrain = false; // // 緩存隊列 用于存放 非第一次的文件讀取 到的數(shù)據,因為第一次讀取 直接塞入目標文件中 // 除第一次 的文件讀取數(shù)據的都存放再緩存中 // this.cache = []; // 隊列做緩存 this.cache = new Queue(); // 標記是否是第一次寫入目標文件的標識 this.writing = false; this.start = options.start || 0; this.offset = this.start; //偏移量 this.open(); }
-
this.mode 文件操作權限 默認0o666(0o表示8進制)
-
3個6所占位置分別對應:文件所屬用戶對它的權限 ;文件所屬用戶組用戶對它的權限;表示其他用戶對它的權限
-
權限由:r–可讀(對應數(shù)值4),w–可寫(對應數(shù)值2),x–可執(zhí)行(對應數(shù)值1,例如文件夾下有 .exe 這樣的標識 說明點擊可以直接執(zhí)行)組成
-
所以默認情況下3組用戶對文件的操作權限都是可讀可寫
-
open()
- 調用fs.open()
- 回調emit實例open方法,fs.open的返回值fd做參數(shù)傳入
open() { fs.open(this.path, this.flags, this.mode, (err, fd) => { this.fd = fd; this.emit("open", fd); }); }
write()
- 轉化實例傳入的需要寫入的文件數(shù)據格式為buffer
- 判斷寫入數(shù)據長度是否大于highWaterMark,如果達到預期后,文件讀取到的數(shù)據存放再緩存里 不直接寫入目標文件(這里要排除是否是第一次讀取文件)
- 執(zhí)行實例write 傳入的cb 并調用clearBuffer 清空緩存
- 判斷 是否是第一次讀取,第一次讀取 直接寫入調用 _write(待實現(xiàn))
- 緩存隊列尾部offer 當前讀取到的數(shù)據等待寫入目標文件
write(chunk, encoding = this.encoding, cb = () => {}) { // 將數(shù)據全部轉換成buffer chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk); this.len += chunk.length; // console.log({chunk},this.len ) let returnValue = this.len < this.highWaterMark; //當數(shù)據寫入后,需要在手動的將this.len-- this.needDrain = !returnValue; //如果達到預期 后 的文件讀取 到數(shù)據存放再緩存里 不直接寫入目標文件 //清空緩存 對用戶傳入的回調 進行二次包裝 let userCb = cb; cb = () => { userCb(); //清空buffer this.clearBuffer();//馬上實現(xiàn) }; //此時需要判斷 是否是第一次讀取,第一次讀取 直接寫入調用 _write if (!this.writing) { // 第一次||緩存隊列已清空完畢 this.writing = true; // console.log("first write"); this._write(chunk, encoding, cb);//馬上實現(xiàn) } else { //緩存隊列尾部offer 當前讀取到的數(shù)據等待寫入目標文件 this.cache.offer({ chunk, encoding, cb, }); } return returnValue; }
clearBuffer()依次清空緩存隊列
- 隊列執(zhí)行順序,先進先出原則
- this.cache.poll() 依次拿取頭部數(shù)據執(zhí)行this._write寫入目標文件
- 緩存隊列poll出來的data如果不存在,則說明是第一次寫入的行為||緩存隊列已清空。this.writing = false; 下次的文件讀取可以直接寫入目標文件
- 如果this.needDrain又達到預期,文件讀取到數(shù)據存放再緩存里 不直接寫入目標文件
clearBuffer() { //寫入成功后 調用 clearBuffer--》寫入緩存第一個,第一個完成后,再繼續(xù) 第二個 let data = this.cache.poll(); // console.log('this.cache',this.cache) if (data) { //有值 寫入文件 this._write(data.chunk, data.encoding, data.cb); } else { this.writing = false; if (this.needDrain) { // 如果是緩存,觸發(fā)drain this.emit("drain"); } } }
_write()
- fs.open()是異步的,成功讀取后fd會是一個number類型
- 根據fd的type 決定是否訂閱一次open,并回調自己(直到fd類型為number)
- fd類型為number:調用fs.write,寫入當前的chunk,
_write(chunk, encoding, cb) { if (typeof this.fd !== "number") { return this.once("open", () => this._write(chunk, encoding, cb)); } fs.write(this.fd, chunk, 0, chunk.length, this.offset, (err, written) => { this.offset += written; //維護偏移量 this.len -= written; //把緩存的個數(shù)減少 cb(); //寫入成功 // console.log(this.cache); }); }
測試自定義的Writable
const WriteStream = require("./initWriteStream"); let ws = new WriteStream(bPath, { highWaterMark: 3, }); let i = 0; function write() { //寫入0-9個 let flag = true; while (i < 10 && flag) { flag = ws.write(i++ + ""); console.log(flag); } } ws.on("drain", function () { // 只有當我們寫入的數(shù)據達到預期,并且數(shù)據被清空后才會觸發(fā)drain ⌚️ console.log("寫完了"); write(); }); write();
- 10個數(shù)字,依次寫入,3次達到最大預期值,然后依次清空了3次緩存結果符合預期
- 目標文件中查看是否正確寫入了我們預期的數(shù)值