日常的学习笔记,包括 ES6、Promise、Node.js、Webpack、http 原理、Vue全家桶,后续可能还会继续更新 Typescript、Vue3 和 常见的面试题 等等。
在上一篇文章中,我们利用 分片读写(发布订阅模式) 的方式实现了 文件拷贝 的功能。
而 Node 中提供了几种原生解决方案,fs.createReadStream 和 fs.createWriteStream。
顾名思义,这两种方案都是以创建 流(Stream
) 的方式进行处理的。
这篇文章我们会着重说明一下 可读流(fs.createReadStream
) 的基本使用与它的实现原理。
可读流的使用
fs.createReadStream
的具体说明,可以参考官网 fs 文件系统 | Node.js API文档
fs.createReadStream
会创建一个 可读流 用来对文件内容进行读取。方法会返回一个 fs.ReadStream
类的实例作为回调,其父类是 stream.Readable ,属于 stream
类上的一个类。
其中 第一个参数 path
传入需要进行读取的 文件路径 。
第二个参数 option
包括
encoding
:编码格式,默认值为null
。autoClose
:读取完毕后自动关闭,默认值为true
。start
/end
:从文件中读取一定范围的字节,而不是整个文件。highWaterMark
:最高水位线,默认长度为64 * 1024
,也就是64kb
。
我们可以参考下面的例子,了解一下它的使用方法。
1 | // test.txt |
如果我们的文件中包括中文,并把 highWaterMark
改成2,并使用变量进行拼接,就会出现乱码的情况。
1 | // test.txt |
这个原因就是因为 Buffer
在拼接的时候,需要使用其特定方法 .concat
进行拼接。
1 | // 对代码进行修改 |
搞清楚了 ReadStream
的使用,我们就可以尝试自己手写一套 ReadStream
类了。
手写实现可读流
先来看一下完整的代码实现。
1 | // readStream.js 手写方法 |
然后我们再对这个方法进行引用。
1 | // test.txt |
然后我们来进行依次解析,看一下 fs.createReadStream
的实现原理究竟是什么样的。
实现方法解析
根据官网,我们可以知道 fs.createReadStream
会接受两个参数,分别是 path
和 option
(其中包括很多参数,具体可参考官网)。
创建一个类,并使它继承自
EventEmitter
,这样会让他具备 发布订阅模式 的特性。其原本的继承链是class ReadStream extends Readable extends EventEmitter
。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15const EventEmitter = require("events");
class ReadStream {
constructor(path, options) {
super()
this.path = path;
this.flags = options.flags || "r";
this.highWaterMark = options.highWaterMark || 64 * 1024;
this.start = options.start || 0;
this.end = options.end;
this.emitClose = options.emitClose || true;
this.encoding = options.encoding;
}
}
module.exports = ReadStream;定义一个
flowing
属性,并使用.on('newListener')
来监听 data事件。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20// ...
class ReadStream extends EventEmitter {
constructor(path, options) {
// ...
// 是否需要触发data事件
this.flowing = false; // 当用户监听data事件后,此属性变成true
// 每次绑定事件,都会触发newListener的回调
this.on("newListener", (type) => {
if (type === "data") {
this.flowing = true;
this.read();
}
});
}
read() {
console.log("用户监听了data"); // 打印出此方法
}
}引入
fs
模块,并定义出一个.open()
方法。其中包含一个destory
方法,用来处理错误事件。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31const fs = require("fs");
// ...
class ReadStream extends EventEmitter {
constructor(path, options) {
// ...
// 即使只绑定了open事件,也可以进行触发(也就是只进行订阅)
this.open();
}
// 销毁方法
destory(err) {
if (err) {
this.emit("error");
}
if (this.fd) {
fs.close(this.fd, () => {
this.emit("close");
});
}
}
open() {
fs.open(this.path, this.flags, (err, fd) => {
if (err) {
return this.destory(err);
}
this.fd = fd;
this.emit("open", fd); // 触发open事件
});
}
// ...
}这样我们只要在需要引入的文件中,定义一个监听的
error
事件就可以了。1
2
3
4// createReadStream.js
rs.on("error", (err) => {
console.log(err);
});
当然,也可以不进行 error事件 绑定。
在文件打开后,再进行内容读取。
我们都清楚,
fs.open
是一个异步操作,所以可能会存在data
事件触发时,文件还没有读取完的情况。这时我们就需要进行一个 轮询处理 ,等待
this.emit('open')
触发后,再进行read
方法。接下来,我们来对
read
方法进行改造。1
2
3
4
5
6
7
8
9
10
11
12
13
14// ...
class ReadStream extends EventEmitter {
constructor(path, options) {
// ...
}
// ...
read() {
if (typeof this.fd !== "number") {
return this.once("open", () => this.read());
}
// 此处直接读取 this.fd 是读取不到的,所以我们需要先进行判断
fs.read(this.fd);
}
}这样就可以保证,
read
事件是在文件打开后进行触发的。现在来对
fs.read
的剩余参数进行处理。创建一个
Buffer
来对内容进行存储,长度为highWaterMark
字节。并且计算start
和end
这里需要注意,我们在创建
Buffer
的时候,需要每次都声明一个新的内存空间来对内容进行存储。不可以在属性中定义一个
buffer
属性,这样会导致每次指向的都是同一个内存空间。1
2
3
4
5
6
7// ...
read() {
// 每次都创建一个新的内存空间
const buffer = Buffer.alloc(this.highWaterMark);
// ...
}然后我们需要对
start
和end
进行判断。我们先定义一个
offset
属性,来记录上一轮计算的偏移量。并且还需要判断,用户当前是否传入了
start
和end
属性1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23// ...
class ReadStream extends EventEmitter {
constructor(path, options) {
// ...
this.offset = this.start; // 偏移量
}
// ...
read() {
// ...
const howManyToRead = this.end
? Math.min(this.highWaterMark, this.end - this.offset + 1)
: this.highWaterMark;
const buffer = Buffer.alloc(howManyToRead);
fs.read(this.fd, buffer, 0, howManyToRead, this.offset, (err,bytesRead) => {
if (err) return this.destory(err);
this.offset += bytesRead;
// 可能存在最后一次的buffer大小 大于 实际数据大小的情况,所以使用slice来进行截取
this.emit("data", buffer.slice(0, bytesRead)); // 将结果抛给data事件的回调
});
}
}这样我们在前面的 data 事件中,就可以监听到读取的数据了。
1
2
3
4// createReadStream.js
rs.on("data", (chunk) => {
console.log(chunk); // <Buffer e8 8e>
});进行递归,循环输出结果
这里不用做过多的解释,通过上一步我们不难发现,每次都只输出了一次结果。
所以我们需要进行递归处理,将数据进行完全输出,并触发
end
事件。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24// ...
class ReadStream extends EventEmitter {
constructor(path, options) {
// ...
}
// ...
read() {
// ...
fs.read(this.fd, buffer, 0, howManyToRead, this.offset, (err,bytesRead) => {
if (err) return this.destory(err);
// 如果读取不到数据了,就进行销毁
if (bytesRead == 0) {
this.emit('end'); // 触发end事件
return this.destory(); // 进行销毁
};
this.offset += bytesRead;
this.emit("data", buffer.slice(0, bytesRead));
// 进行递归,循环输出结果
if (this.flowing) {
this.read();
}
});
}
}添加一个
pause
和resume
来进行流量控制。pause
和resume
可以控制当前数据流的 停止 和 继续。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20// ...
class ReadStream extends EventEmitter {
constructor(path, options) {
// ...
}
// ...
pause() {
// 判断当前是否读取完毕了
if (this.flowing) {
this.flowing = false;
}
}
resume() {
// 判断当前是否读取完毕了
if (!this.flowing) {
this.flowing = true;
this.read();
}
}
}
这样我们就手写实现了 fs.createReadStream
方法。
fs.createReadStream
在工作中比较常见,可以用来进行大型文件的处理,所以学好用好此方法还是比较重要的。
本篇文章由 莫小尚 创作,文章中如有任何问题和纰漏,欢迎您的指正与交流。
您也可以关注我的 个人站点、博客园 和 掘金,我会在文章产出后同步上传到这些平台上。
最后感谢您的支持!