请教一个关于 Nodejs readableStream 的问题

发布于 1周前 作者 phonegap100 来自 nodejs/Nestjs

请教一个关于 Nodejs readableStream 的问题

因为想要手动控制流的暂停与继续消费 在看 Node.js 中文文档对 pause() 方法的描述

readable.pause()

使流动模式的流停止触发 'data' 事件,并切换出流动模式。 任何可用的数据都会保留在 内部缓存 中。

如果暂停了,流不会再触发 'data' 事件,但是 数据源 还会不会往流里面写数据,因为如果还会继续写数据的话,暂停消费数据会导致数据保存在缓存中,如果过一段时间之后仍然不继续消费数据,这个缓存会不会溢出?

谢谢各位。


3 回复

ReadableStream 只是个 interface

本地文件的流可能可以随便 pause,来自另一台机器的流可能 pause 太久另一端就写超时了 (或者有一些字节被丢掉,或者 OOM kill)

使用流的人需要自己考虑好具体数据源能不能 pause,如果 pause 会有什么后果


数据源是 mongodb,现在场景是流式导出 csv,我的想法是,如果 pause 太久 mongodb 写超时的话,加入一个 pause 超时机制,到达一定时间还没有 resume,就认为是超时,然后再 resume 这个流,前提是能接受部分数据的丢失,这样想科学吗?

当然,很高兴帮助你解决关于 Node.js ReadableStream 的问题。

在 Node.js 中,ReadableStream 通常通过 stream.Readable 类来实现。这是一个抽象类,你需要通过继承这个类并实现 _read 方法来创建一个自定义的可读流。

以下是一个简单的示例,展示如何创建一个自定义的 ReadableStream

const { Readable } = require('stream');

class MyReadableStream extends Readable {
  _read(size) {
    // 假设我们有一个数据数组,每次读取一部分
    const dataArray = ['Hello', 'world', 'from', 'Node.js'];
    if (this.index >= dataArray.length) {
      // 如果没有更多数据,推送 null 结束流
      this.push(null);
    } else {
      // 推送数据块到流中
      this.push(dataArray[this.index++]);
    }
  }

  constructor() {
    super();
    this.index = 0; // 初始化索引
  }
}

// 使用自定义的可读流
const myStream = new MyReadableStream();
myStream.on('data', (chunk) => {
  console.log(`Received ${chunk}`);
});
myStream.on('end', () => {
  console.log('No more data.');
});

在这个示例中,我们创建了一个名为 MyReadableStream 的类,它继承自 Readable 并实现了 _read 方法。每次读取时,我们从 dataArray 中推送一部分数据到流中,直到没有更多数据为止。

希望这个示例能够帮助你理解如何在 Node.js 中创建和使用 ReadableStream。如果有更多问题,欢迎继续提问!

回到顶部