Nodejs 中一个 stream 的性能问题

发布于 1周前 作者 sinazl 来自 nodejs/Nestjs

Nodejs 中一个 stream 的性能问题

原文地址: https://zhuanlan.zhihu.com/p/37601326

之前用 neovim 的 node-client 模块发现,传输大数据时会非常的慢,一个 7M 的文件内容解析需要 30s 才能完成。 经常一些尝试,终于找到了问题的原因。 事实上做为解码的 msgpack-lite 模块并不慢,7M 的内容解析不会超过 200ms。所以做为数据源的 socket 传输数据慢了?其实也不慢,如果你传输数据会发现传输也不会超过 100ms,示例代码:

const fs = require('fs')
const net = require('net')
const socketPath = '/tmp/test'
try {
  fs.unlinkSync(socketPath)
} catch (e) {}
// 你需要下载一个 sqlite3.c 做为测试文件
const stream = fs.createReadStream('/Users/chemzqm/sqlite3.c', {
  highWaterMark: 1024*1024
})

const server = net.createServer(conn => { console.log(‘client connected, send data’) stream.pipe(conn) })

server.on(‘error’, err => { throw err })

server.listen({path:socketPath}, () => { console.log(‘server bound’) })

const client = net.createConnection({ path:socketPath })

let l = 0 let ts = Date.now() client.on(‘data’, chunk => { l = l + chunk.length server.close() }) client.on(‘end’, () => { console.log(${Date.now() - ts}) })

然鹅,你 socket 的 stream 接到 msgpack 的 stream 就会非常慢了。示例:

const msgpack = require('msgpack-lite')
const fs = require('fs')
const net = require('net')
const socketPath = '/tmp/test'
try {
  fs.unlinkSync(socketPath)
} catch (e) {}

// 使用预先导出的 msgpack 格式 data 数据 const readStream = fs.createReadStream(‘data.msp’, { highWaterMark: 1024*1024 });

const server = net.createServer(conn => { console.log(‘client connected, send data’) readStream.pipe(conn) })

server.on(‘error’, err => { throw err })

server.listen({path:socketPath}, () => { console.log(‘server bound’) })

const client = net.createConnection({ path:socketPath, highWaterMark: 1024*1024 })

const decodeStream = msgpack.createDecodeStream({ objectMode: true })

let ts = Date.now() client.pipe(decodeStream) client.on(‘end’, () => { console.log(‘client end’) }) decodeStream.on(‘data’, obj => { console.log(Date.now() - ts) console.log(obj.length) })

最终我们发现,socket 只会以 8kb 每次来 emit 数据,而 msgpack stream 对于这种方式的数据反应非常迟钝,然后就导致了解析非常缓慢的结果。 起初以为设置 socket 的 highWaterMark 到一个大的数值就能解决这个问题,然而发现这个值在 javascript 里面根本起不了效果,它 emit 的 data 永远最大 8kb。 不得已,写了个 Transform stream 做为中介:

import { Transform } from 'stream';

const MIN_SIZE = 8 * 1024;

export default class Buffered extends Transform { private chunks: Buffer[] | null; constructor() { super({ readableHighWaterMark: 10 * 1024 * 1024, writableHighWaterMark: 10 * 1024 * 1024, } as any); this.chunks = null; }

_transform(chunk: Buffer, encoding: any, callback: any) { let { chunks } = this; if (chunk.length < MIN_SIZE) { if (!chunks) return callback(null, chunk); chunks.push(chunk); this.chunks = null; let buf = Buffer.concat(chunks); callback(null, buf); return; } if (!chunks) { chunks = this.chunks = [chunk]; } else { chunks.push(chunk); } setTimeout(() => { let { chunks } = this; if (chunks) { this.chunks = null; let buf = Buffer.concat(chunks); this.push(buf); } }, 100); callback(); }

_flush(callback: any) { let { chunks } = this; if (chunks) { this.chunks = null; let buf = Buffer.concat(chunks); callback(null, buf); } else { callback(); } } }

测试结果:

❯ node index.js
Testing msgpack-lite stream with 2.2M data
msgpack-lite time costed: 4207ms
Testing msgpack5 stream with 2.2M data
msgpack5 time costed: 11189ms
Testing msgpack-lite with buffered stream with 2.2M data
msgpack-lite with buffered time costed: 67ms

对比非常明显。 猜测可能原因是 msgpack 的 stream 是工作在 object mode 下面的,而我们的数据源给的都是 buffer,它每次 emit data 都尝试解析 object 所以导致了耗时。 测试代码在此:chemzqm/stream-issue


6 回复

你的测试数据有点问题呀,整个文件就只有一个 pack,msgpack 必须要全部接收到才能 decode 出来,但你的文件又太大了,导致在网络上无法一次性传过来(记得 socket 的 buffer size 不会超过 64k 的)。这样接收端会出现:不断地接收 -> decode -> decode 失败 -> 再接收,这一过程。最后只有全部接收到才能 decode 出来,然而 decode 需要耗时的。。。


看成 steam 的性能问题

数据问题? neovim 就是这样传送整个 buffer 的,人家 python-client 处理起来一点压力没有。
说到底就是接收的 stream 没有好的判定数据完整性方法导致的问题。

你 msgpack 包的数据结构本身就没有包含长度在里面,让别人怎么去检测,也就只能不断接收,不断重试解码了。
PS:看了 python-client 里的 msgpack 也是这样处理的。至于为什么比 python 版的慢,你可以测试下他们两个的 msgpack 解码速度。

这个只能说是传输格式局限性的问题,除非 neovim 像 tsserver 那样多传一个长度头部,但是那样处理上会麻烦一些。我觉得检测完整性肯定是有办法去优化的,传递长度只是最容易的方法。
慢的原因就是过多尝试解析流的问题,具体谁 python 和 node 谁快我是没兴趣,因为现在的解析的速度已经足够快了,7M 的数据不会超过 100ms

在Node.js中,处理流(stream)的性能问题通常涉及多个方面,包括流的类型、数据处理逻辑、系统资源限制等。以下是一些常见的性能优化建议及示例代码:

  1. 选择合适的流类型

    • 使用Readable流读取数据。
    • 使用Writable流写入数据。
    • 使用DuplexTransform流进行数据的双向处理或转换。
  2. 使用管道(pipe)进行流连接: 管道操作可以自动管理流的流动,避免手动调用readwrite方法,从而提高性能。

    const fs = require('fs');
    const source = fs.createReadStream('source.txt');
    const dest = fs.createWriteStream('dest.txt');
    source.pipe(dest);
    
  3. 处理背压(Backpressure): 当写入流的处理速度跟不上读取流的速度时,应处理背压问题,避免内存溢出。

    source.on('data', (chunk) => {
      if (!dest.write(chunk)) {
        source.pause();
        dest.once('drain', () => {
          source.resume();
        });
      }
    });
    
  4. 优化数据处理逻辑

    • 尽量减少同步操作,使用异步操作提高性能。
    • 避免在流中执行CPU密集型任务,考虑使用worker_threads模块进行并行处理。
  5. 调整系统资源限制

    • 增加Node.js进程的文件描述符限制。
    • 根据需要调整Node.js的内存限制。

通过上述方法,可以有效提升Node.js中流的性能。具体优化策略还需根据实际应用场景进行调整。

回到顶部