学习笔记—Node之读取流的使用与实现

日常的学习笔记,包括 ES6、Promise、Node.js、Webpack、http 原理、Vue全家桶,后续可能还会继续更新 Typescript、Vue3 和 常见的面试题 等等。


在上一篇文章中,我们利用 分片读写(发布订阅模式) 的方式实现了 文件拷贝 的功能。

Node 中提供了几种原生解决方案,fs.createReadStreamfs.createWriteStream

顾名思义,这两种方案都是以创建 流(Stream 的方式进行处理的。

这篇文章我们会着重说明一下 可读流(fs.createReadStream 的基本使用与它的实现原理。

可读流的使用

fs.createReadStream 的具体说明,可以参考官网 fs 文件系统 | Node.js API文档

fs.createReadStream 会创建一个 可读流 用来对文件内容进行读取。方法会返回一个 fs.ReadStream 类的实例作为回调,其父类是 stream.Readable ,属于 stream 类上的一个类。

其中 第一个参数 path 传入需要进行读取的 文件路径

第二个参数 option 包括

  • encoding:编码格式,默认值为 null
  • autoClose:读取完毕后自动关闭,默认值为 true
  • start / end:从文件中读取一定范围的字节,而不是整个文件。
  • highWaterMark:最高水位线,默认长度为 64 * 1024 ,也就是 64kb

我们可以参考下面的例子,了解一下它的使用方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// test.txt
123456789

// createReadStream.js
const fs = require("fs");
const path = require("path");
// 返回一个 Readable 类的实例
const rs = fs.createReadStream(path.resolve(__dirname, "test.txt"), {
highWaterMark: 3 // 最高水位线为3,也就是每次读取 3kb
});
rs.on("open", (fd) => { // 此方法是 fs 模块中自己实现的
console.log(fd); // 3
});
rs.on("data", (chunk) => {
console.log(chunk); // 返回Buffer格式 <Buffer 31 32 33> <Buffer 34 35 36> <Buffer 37 38 39>
});
rs.on("end", () => {
console.log("end"); // 结束事件
});

如果我们的文件中包括中文,并把 highWaterMark 改成2,并使用变量进行拼接,就会出现乱码的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// test.txt
莫小尚1234567890

// createReadStream.js
const rs = fs.createReadStream(path.resolve(__dirname, "test.txt"), {
highWaterMark: 2
});
let result = '';
rs.on("data", (chunk) => {
result += chunk;
});
rs.on("end", () => {
console.log(result); // �������1234567890
});

这个原因就是因为 Buffer 在拼接的时候,需要使用其特定方法 .concat 进行拼接。

1
2
3
4
5
6
7
8
// 对代码进行修改
const arr = [];
rs.on("data", (chunk) => {
arr.push(chunk);
});
rs.on("end", () => {
console.log(Buffer.concat(arr).toString()); // 莫小尚1234567890
});

搞清楚了 ReadStream 的使用,我们就可以尝试自己手写一套 ReadStream 类了。

手写实现可读流

先来看一下完整的代码实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
// readStream.js 手写方法
const EventEmitter = require("events");
const fs = require("fs");
class ReadStream extends EventEmitter {
constructor(path, options) {
super();
this.path = path;
this.flags = options.flags || "r";
this.highWaterMark = options.highWaterMark || 64 * 1024;
this.start = options.start || 0;
this.end = options.end;
this.emitClose = options.emitClose || true;
this.encoding = options.encoding;
this.offset = this.start; // 偏移量
// 是否需要触发data事件
this.flowing = false; // 当用户监听data事件后,此属性变成true
// 即使只绑定了open事件,也可以进行触发(也就是只进行订阅)
this.open();
// 每次绑定事件,都会触发newListener的回调
this.on("newListener", (type) => {
if (type === "data") {
this.flowing = true;
this.read();
}
});
}
// 销毁方法
destory(err) {
if (err) {
this.emit("error");
}
if (this.fd) {
fs.close(this.fd, () => {
this.emit("close");
});
}
}
open() {
fs.open(this.path, this.flags, (err, fd) => {
if (err) {
return this.destory(err);
}
this.fd = fd;
this.emit("open", fd); // 触发open事件
});
}
read() {
if (typeof this.fd !== "number") {
return this.once("open", () => this.read());
}
// 此处直接读取 this.fd 是读取不到的,所以我们需要先进行判断
const howManyToRead = this.end
? Math.min(this.highWaterMark, this.end - this.offset + 1)
: this.highWaterMark;
const buffer = Buffer.alloc(howManyToRead);
fs.read(this.fd, buffer, 0, howManyToRead, this.offset, (err,bytesRead) => {
if (err) return this.destory(err);
if (bytesRead == 0) {
this.emit("end"); // 触发end事件
return this.destory(); // 进行销毁
}
this.offset += bytesRead;
this.emit("data", buffer.slice(0, bytesRead));
// 进行递归,循环输出结果
if (this.flowing) {
this.read();
}
}
);
}
pause() {
if (this.flowing) {
this.flowing = false;
}
}
resume() {
if (!this.flowing) {
this.flowing = true;
this.read();
}
}
}
module.exports = ReadStream;

然后我们再对这个方法进行引用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// test.txt
莫小尚1234567890

// createReadStream.js
const path = require("path");
const ReadStream = require("./readStream");

const rs = new ReadStream(path.resolve(__dirname, "test.txt"), {
highWaterMark: 2,
});

rs.on("open", (fd) => {
console.log(fd);
});

const arr = [];
rs.on("data", (chunk) => {
console.log(chunk);
arr.push(chunk);
});

rs.on("end", () => {
console.log(Buffer.concat(arr).toString());
});

然后我们来进行依次解析,看一下 fs.createReadStream 的实现原理究竟是什么样的。

实现方法解析

根据官网,我们可以知道 fs.createReadStream 会接受两个参数,分别是 pathoption(其中包括很多参数,具体可参考官网)

  1. 创建一个类,并使它继承自 EventEmitter,这样会让他具备 发布订阅模式 的特性。其原本的继承链是 class ReadStream extends Readable extends EventEmitter

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    const EventEmitter = require("events");
    class ReadStream {
    constructor(path, options) {
    super()
    this.path = path;
    this.flags = options.flags || "r";
    this.highWaterMark = options.highWaterMark || 64 * 1024;
    this.start = options.start || 0;
    this.end = options.end;
    this.emitClose = options.emitClose || true;
    this.encoding = options.encoding;
    }
    }

    module.exports = ReadStream;
  2. 定义一个 flowing 属性,并使用 .on('newListener') 来监听 data事件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    // ...
    class ReadStream extends EventEmitter {
    constructor(path, options) {
    // ...

    // 是否需要触发data事件
    this.flowing = false; // 当用户监听data事件后,此属性变成true

    // 每次绑定事件,都会触发newListener的回调
    this.on("newListener", (type) => {
    if (type === "data") {
    this.flowing = true;
    this.read();
    }
    });
    }
    read() {
    console.log("用户监听了data"); // 打印出此方法
    }
    }
  3. 引入 fs 模块,并定义出一个 .open() 方法。其中包含一个 destory 方法,用来处理错误事件。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    const fs = require("fs");
    // ...
    class ReadStream extends EventEmitter {
    constructor(path, options) {
    // ...

    // 即使只绑定了open事件,也可以进行触发(也就是只进行订阅)
    this.open();
    }
    // 销毁方法
    destory(err) {
    if (err) {
    this.emit("error");
    }
    if (this.fd) {
    fs.close(this.fd, () => {
    this.emit("close");
    });
    }
    }
    open() {
    fs.open(this.path, this.flags, (err, fd) => {
    if (err) {
    return this.destory(err);
    }
    this.fd = fd;
    this.emit("open", fd); // 触发open事件
    });
    }
    // ...
    }

    这样我们只要在需要引入的文件中,定义一个监听的 error 事件就可以了。

    1
    2
    3
    4
    // createReadStream.js
    rs.on("error", (err) => {
    console.log(err);
    });

​ 当然,也可以不进行 error事件 绑定。

  1. 在文件打开后,再进行内容读取。

    我们都清楚, fs.open 是一个异步操作,所以可能会存在 data 事件触发时,文件还没有读取完的情况。

    这时我们就需要进行一个 轮询处理 ,等待 this.emit('open') 触发后,再进行 read 方法。

    接下来,我们来对 read 方法进行改造。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    // ...
    class ReadStream extends EventEmitter {
    constructor(path, options) {
    // ...
    }
    // ...
    read() {
    if (typeof this.fd !== "number") {
    return this.once("open", () => this.read());
    }
    // 此处直接读取 this.fd 是读取不到的,所以我们需要先进行判断
    fs.read(this.fd);
    }
    }

    这样就可以保证,read 事件是在文件打开后进行触发的。

  2. 现在来对 fs.read 的剩余参数进行处理。

    创建一个 Buffer 来对内容进行存储,长度为 highWaterMark 字节。并且计算 startend

    这里需要注意,我们在创建 Buffer 的时候,需要每次都声明一个新的内存空间来对内容进行存储。

    不可以在属性中定义一个 buffer 属性,这样会导致每次指向的都是同一个内存空间。

    1
    2
    3
    4
    5
    6
    7
    // ...
    read() {
    // 每次都创建一个新的内存空间
    const buffer = Buffer.alloc(this.highWaterMark);

    // ...
    }

    然后我们需要对 startend 进行判断。

    我们先定义一个 offset 属性,来记录上一轮计算的偏移量。

    并且还需要判断,用户当前是否传入了 startend 属性

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    // ...
    class ReadStream extends EventEmitter {
    constructor(path, options) {
    // ...
    this.offset = this.start; // 偏移量
    }
    // ...
    read() {
    // ...
    const howManyToRead = this.end
    ? Math.min(this.highWaterMark, this.end - this.offset + 1)
    : this.highWaterMark;

    const buffer = Buffer.alloc(howManyToRead);

    fs.read(this.fd, buffer, 0, howManyToRead, this.offset, (err,bytesRead) => {
    if (err) return this.destory(err);
    this.offset += bytesRead;
    // 可能存在最后一次的buffer大小 大于 实际数据大小的情况,所以使用slice来进行截取
    this.emit("data", buffer.slice(0, bytesRead)); // 将结果抛给data事件的回调
    });
    }
    }

    这样我们在前面的 data 事件中,就可以监听到读取的数据了。

    1
    2
    3
    4
    // createReadStream.js
    rs.on("data", (chunk) => {
    console.log(chunk); // <Buffer e8 8e>
    });
  3. 进行递归,循环输出结果

    这里不用做过多的解释,通过上一步我们不难发现,每次都只输出了一次结果。

    所以我们需要进行递归处理,将数据进行完全输出,并触发 end 事件。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    // ...
    class ReadStream extends EventEmitter {
    constructor(path, options) {
    // ...
    }
    // ...
    read() {
    // ...
    fs.read(this.fd, buffer, 0, howManyToRead, this.offset, (err,bytesRead) => {
    if (err) return this.destory(err);
    // 如果读取不到数据了,就进行销毁
    if (bytesRead == 0) {
    this.emit('end'); // 触发end事件
    return this.destory(); // 进行销毁
    };
    this.offset += bytesRead;
    this.emit("data", buffer.slice(0, bytesRead));
    // 进行递归,循环输出结果
    if (this.flowing) {
    this.read();
    }
    });
    }
    }
  4. 添加一个 pauseresume 来进行流量控制。

    pauseresume 可以控制当前数据流的 停止继续

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    // ...
    class ReadStream extends EventEmitter {
    constructor(path, options) {
    // ...
    }
    // ...
    pause() {
    // 判断当前是否读取完毕了
    if (this.flowing) {
    this.flowing = false;
    }
    }
    resume() {
    // 判断当前是否读取完毕了
    if (!this.flowing) {
    this.flowing = true;
    this.read();
    }
    }
    }

这样我们就手写实现了 fs.createReadStream 方法。

fs.createReadStream 在工作中比较常见,可以用来进行大型文件的处理,所以学好用好此方法还是比较重要的。

本篇文章由 莫小尚 创作,文章中如有任何问题和纰漏,欢迎您的指正与交流。
您也可以关注我的 个人站点博客园掘金,我会在文章产出后同步上传到这些平台上。
最后感谢您的支持!

请打赏并支持一下作者吧~

欢迎关注我的微信公众号