怎么看 "A New Streaming API for Nodejs v0.10"

怎么看 "A New Streaming API for Nodejs v0.10"

貌似有新动作了, Stream API 会发生改变…

A New Streaming API for Node v0.10 http://blog.nodejs.org/2012/12/21/streams2/ Fri, 21 Dec 2012 00:45:13 UTC - Isaac Z. Schlueter

不知道会涉及哪些方面, 求解读…

6 回复

看怎么 “A New Streaming API for Node.js v0.10”

背景

Node.js v0.10 引入了一种新的流(Stream)API,称为 Streams2。这是一次重大的改进,旨在解决早期版本中流处理的一些限制,并提供更强大、更灵活的流操作能力。

改进点

Streams2 的主要改进包括:

  • 背压(Backpressure)管理:自动处理数据过载问题,避免消费者跟不上生产者的速度。
  • 内部缓冲区优化:减少内存使用,提高性能。
  • 可读流(Readable Streams)和可写流(Writable Streams)的新方法:简化流的操作。
  • 流的管道(Piping)更加灵活:支持更复杂的流组合。

示例代码

让我们通过一个简单的例子来理解这些改进。

示例 1:基本的读取和写入流
const fs = require('fs');

// 创建一个可读流
const readStream = fs.createReadStream('input.txt', { encoding: 'utf8' });

// 创建一个可写流
const writeStream = fs.createWriteStream('output.txt');

// 将可读流的数据传输到可写流
readStream.pipe(writeStream);

// 监听错误事件
readStream.on('error', (err) => {
    console.error('Read error:', err);
});

writeStream.on('error', (err) => {
    console.error('Write error:', err);
});

在这个例子中,pipe() 方法用于将 readStream 的数据直接传递给 writeStream。Streams2 会自动处理背压,确保不会因为写入速度慢而导致内存溢出。

示例 2:处理数据

我们可以进一步处理读取的数据,在写入之前对其进行修改。

const fs = require('fs');

const readStream = fs.createReadStream('input.txt', { encoding: 'utf8' });
const writeStream = fs.createWriteStream('output.txt');

// 使用 transform 流处理数据
const transformStream = new (require('stream').Transform)({
    transform(chunk, encoding, callback) {
        // 处理每个数据块
        const processedChunk = chunk.toUpperCase();
        callback(null, processedChunk);
    }
});

// 链接流
readStream.pipe(transformStream).pipe(writeStream);

readStream.on('error', (err) => {
    console.error('Read error:', err);
});

writeStream.on('error', (err) => {
    console.error('Write error:', err);
});

在这个例子中,我们创建了一个 Transform 流,它将每个数据块转换为大写形式,然后再写入文件。

结论

Streams2 提供了一种更高效、更灵活的方式来处理数据流。通过自动的背压管理和更丰富的流操作方法,开发者可以更容易地构建复杂的数据处理管道。希望这些示例能够帮助你更好地理解和应用 Streams2。


正在阅读ing~,感谢分享

解决了几个之前的遗留问题(pause不保证完全停止data事件,create之后会立刻开始读数据)。可以不通过事件,主动调用read方法来获取的数据。可以自定义底层的_read,_write等方法来定制stream,还封装了高一层的stream对象。

现有的stream就像一个水龙头,每次流出来的量是不能控制的,你需要盯着水桶,快满了就关掉水龙头,或者倒掉桶里的水。
新的stream2升级了,可以直接告诉它需要多少量的水,按需去取。
同时现有的stream只能出一种水,之后倒到水桶之后你再选择加热还是制冷,新的stream2可以给这个水龙头加装加热、制冷装置,让他直接出热水或者冰水。

让楼上说完了。。。

不过新设计里,如果同样的数据 ,多个处理器感兴趣怎么办?

A读了n个字节,B只能读n之后的字节流了

除非继续用老的data侦听器咯

Node.js 在版本 0.10 中引入了新的流(Stream)API,即 Streams2,旨在解决旧版流系统中的一些问题,并提供更强大和灵活的功能。新版本的流API包括了一些重要的改进,例如自动背压支持、数据管道优化以及更直观的错误处理机制。

主要改进点

  1. 自动背压支持:在Streams2中,当消费者处理数据的速度跟不上生产者生成数据的速度时,流会自动暂停生产者的操作,直到消费者准备好接收更多数据。
  2. 数据管道优化:通过内部队列管理来优化多个流之间的数据传输。
  3. 更直观的错误处理:错误处理机制得到了增强,使得开发者可以更容易地捕捉和处理流中的错误。

示例代码

创建一个可读流

const { Readable } = require('stream');

class MyReadableStream extends Readable {
    constructor(options) {
        super(options);
        this.index = 0;
    }

    _read() {
        setTimeout(() => {
            if (this.index > 6) {
                this.push(null); // 结束流
            } else {
                this.push(this.index + '\n');
                this.index++;
            }
        }, 1000);
    }
}

const readable = new MyReadableStream();
readable.on('data', (chunk) => {
    console.log(`Received data: ${chunk}`);
});

readable.on('end', () => {
    console.log('End of stream.');
});

创建一个可写流

const { Writable } = require('stream');

class MyWritableStream extends Writable {
    _write(chunk, encoding, callback) {
        console.log(`Writing data: ${chunk.toString().trim()}`);
        callback();
    }
}

const writable = new MyWritableStream();
writable.write('Hello\n');
writable.write('World\n');
writable.end(); // 结束写入

使用管道连接流

const { Transform } = require('stream');

class MyTransformStream extends Transform {
    _transform(chunk, encoding, callback) {
        this.push(chunk.toString().toUpperCase());
        callback();
    }
}

const readable = new MyReadableStream();
const transform = new MyTransformStream();
const writable = new MyWritableStream();

readable.pipe(transform).pipe(writable);

以上代码展示了如何使用Streams2创建可读流、可写流以及转换流,并将它们连接在一起形成数据管道。这些改进使得流的操作更加高效和可靠,尤其适合处理大量数据或文件传输等场景。

回到顶部