Nodejs 使用Transform的问题

Nodejs 使用Transform的问题

使用Transform时,第一次可以从socket接收到客户端数据(HTTP请求),等处理并写入socket一些响应数据后,想再次处理客户端数据时,但获取不到任何数据,使用模式: var Transform = require(“stream”).Transform; var inherits = require(“util”).inherits; var Request = function() { Transform.call(this); this._transform = function(data) { console.log("recv data length " + data.length); // 这一句只处理了一次 }; }; inherits(Request, Transform);

net.createServer(function(socket) { var req = new Request(); socket.pipe(req).pipe(socket); }); 在浏览器中检查socket使用状态,在发送HTTP请求后,后续数据是发送出去的(数据发送在onopen中),客户端已经发送成功,连接也还在,说明服务器socket已经收到数据了,从Readable的角度看,是监听src的’data’事件,如果Request没有收到数据,是不是可以说src没有触发’data’事件?或者说监听src失效?


6 回复

在Node.js中使用Transform流处理数据时,可能会遇到你描述的问题。这个问题通常与流的状态管理有关。当你从一个Socket读取数据,并且希望多次处理这些数据时,需要确保流的状态正确地被维护。

问题分析

你的代码中,Transform流在接收数据后并没有继续处理新的数据,这可能是因为Transform流的内部状态没有正确更新,导致后续的数据没有触发_transform方法。

示例代码及解释

为了更好地理解这个问题,我们可以添加更多的调试信息,并确保Transform流能够正确处理多次数据传输。以下是一个改进后的示例代码:

const { Transform } = require('stream');
const util = require('util');

// 创建自定义Transform类
function Request() {
    Transform.call(this);
    this.dataBuffer = [];
}
util.inherits(Request, Transform);

Request.prototype._transform = function(chunk, encoding, callback) {
    console.log(`Received chunk: ${chunk.length} bytes`);
    
    // 将新数据追加到缓冲区
    this.dataBuffer.push(chunk);
    
    // 处理数据
    const processedData = Buffer.concat(this.dataBuffer);
    console.log(`Processed data: ${processedData.length} bytes`);
    
    // 清空缓冲区
    this.dataBuffer = [];
    
    // 调用回调函数,表示数据已处理完毕
    callback(null, processedData);
};

// 创建服务器
const net = require('net');
net.createServer((socket) => {
    console.log('Client connected');
    
    const req = new Request();
    socket.pipe(req).pipe(socket);
    
    // 监听数据结束事件
    socket.on('end', () => {
        console.log('Client disconnected');
    });
}).listen(12345, () => {
    console.log('Server listening on port 12345');
});

关键点解释

  1. 数据缓冲:我们使用了一个缓冲区dataBuffer来存储每次接收到的数据块。
  2. 处理数据:在_transform方法中,我们将所有缓冲的数据合并成一个大的Buffer对象进行处理。
  3. 清空缓冲区:处理完数据后,清空缓冲区,以便下次可以继续接收新的数据。
  4. 回调函数:调用callback函数通知流数据已经被处理。

通过这种方式,即使数据分多次到达,Transform流也能正确处理每一块数据,并在处理完毕后将结果传递给下一个流或Socket。

这样,你的服务器应该能够正确处理多次来自客户端的数据传输。


代码标记一下吧

额,最近的人都不标记代码的了。

标记代码?网页自带的编辑器没有提供代码高亮啊! 代码是临时手敲的,不多也很简单,凑合着看吧~~~ 如果我不用这种方式,换一种思路,将: var req = new Request(); socket.pipe(req).pipe(socket); 换成是: Request.prototype.on(“request”, function(data) { … }); var req = new Request(); req.pipe(socket); socket.on(“readable”, function(data) { var data = socket.read(); req.emit(“request”, data); }); 之后测试代码,运行正常! 'readable’事件以被动方式取数据,就更加证实了上面的推测!

var data = socket.read(); req.emit(“request”, data); 应该是: var data = socket.read(); data && req.emit(“request”, data);

当你使用 Transform 流来处理从客户端通过 socket 接收到的数据时,可能会遇到多次调用 _transform 方法的问题。这是因为 Transform 流默认情况下会消耗输入数据,并且可能需要特殊处理才能多次接收数据。

以下是一个示例代码,展示了如何正确使用 Transform 流来处理从客户端接收到的数据,并能够多次接收这些数据:

const { Transform } = require('stream');
const net = require('net');

class CustomTransform extends Transform {
  constructor() {
    super({ readableObjectMode: true });
    this.pushedData = [];
  }

  _transform(chunk, encoding, callback) {
    console.log(`Received chunk: ${chunk.length} bytes`);
    
    // 将接收到的数据缓存起来
    this.pushedData.push(chunk);

    // 处理完数据后调用回调函数
    callback();
  }

  // 实现_flush方法来处理剩余的数据
  _flush(callback) {
    if (this.pushedData.length > 0) {
      console.log(`Flushed ${this.pushedData.length} chunks of data`);
      // 将所有缓存的数据合并并推送出去
      this.push(Buffer.concat(this.pushedData));
    }
    callback();
  }
}

const server = net.createServer((socket) => {
  const transformStream = new CustomTransform();

  socket.pipe(transformStream).pipe(socket);
});

server.listen(3000, () => {
  console.log('Server is listening on port 3000');
});

解释

  1. CustomTransform 类:继承自 Transform,重写了 _transform_flush 方法。

    • _transform 方法用于处理每个传入的 chunk,并将数据缓存到 pushedData 数组中。
    • _flush 方法用于在流结束时处理缓存的数据。如果 pushedData 数组中有数据,则将其合并并推送出去。
  2. 创建服务器:使用 net.createServer 创建一个 TCP 服务器,并将客户端的 socket 管道到 CustomTransform 流,然后再管道回 socket

  3. 启动服务器:在端口 3000 上启动服务器。

这样处理之后,即使客户端多次发送数据,_transform 方法也能被多次调用,从而能够多次处理数据。

回到顶部