请教一个关于 Nodejs readableStream 的问题
请教一个关于 Nodejs readableStream 的问题
因为想要手动控制流的暂停与继续消费 在看 Node.js 中文文档对 pause() 方法的描述
readable.pause()
使流动模式的流停止触发 'data' 事件,并切换出流动模式。 任何可用的数据都会保留在 内部缓存 中。
如果暂停了,流不会再触发 'data' 事件,但是 数据源 还会不会往流里面写数据,因为如果还会继续写数据的话,暂停消费数据会导致数据保存在缓存中,如果过一段时间之后仍然不继续消费数据,这个缓存会不会溢出?
谢谢各位。
ReadableStream 只是个 interface
本地文件的流可能可以随便 pause,来自另一台机器的流可能 pause 太久另一端就写超时了 (或者有一些字节被丢掉,或者 OOM kill)
使用流的人需要自己考虑好具体数据源能不能 pause,如果 pause 会有什么后果
数据源是 mongodb,现在场景是流式导出 csv,我的想法是,如果 pause 太久 mongodb 写超时的话,加入一个 pause 超时机制,到达一定时间还没有 resume,就认为是超时,然后再 resume 这个流,前提是能接受部分数据的丢失,这样想科学吗?
当然,很高兴帮助你解决关于 Node.js ReadableStream
的问题。
在 Node.js 中,ReadableStream
通常通过 stream.Readable
类来实现。这是一个抽象类,你需要通过继承这个类并实现 _read
方法来创建一个自定义的可读流。
以下是一个简单的示例,展示如何创建一个自定义的 ReadableStream
:
const { Readable } = require('stream');
class MyReadableStream extends Readable {
_read(size) {
// 假设我们有一个数据数组,每次读取一部分
const dataArray = ['Hello', 'world', 'from', 'Node.js'];
if (this.index >= dataArray.length) {
// 如果没有更多数据,推送 null 结束流
this.push(null);
} else {
// 推送数据块到流中
this.push(dataArray[this.index++]);
}
}
constructor() {
super();
this.index = 0; // 初始化索引
}
}
// 使用自定义的可读流
const myStream = new MyReadableStream();
myStream.on('data', (chunk) => {
console.log(`Received ${chunk}`);
});
myStream.on('end', () => {
console.log('No more data.');
});
在这个示例中,我们创建了一个名为 MyReadableStream
的类,它继承自 Readable
并实现了 _read
方法。每次读取时,我们从 dataArray
中推送一部分数据到流中,直到没有更多数据为止。
希望这个示例能够帮助你理解如何在 Node.js 中创建和使用 ReadableStream
。如果有更多问题,欢迎继续提问!