Node.js的pipe和unpipe问题,可能是个bug?

Node.js的pipe和unpipe问题,可能是个bug?

1、看代码

var net=require('net');
var file=require('fs').createWriteStream('./message.txt');
var server=net.createServer();
server.on('connection',function(socket){
	socket.setTimeout(10*1000);
	socket.pause();
	 //socket.pipe(file); //推荐仅调用一次pipe
	socket.on('timeout',function(){
		socket.resume();
		socket.pipe(file);  //结果是:每一次调用都会增加一遍数据写入
	});

	socket.on('data',function(data){
		socket.unpipe(file); //无法成功,????????????????
        socket.pause();
	});
});
server.listen('8431','localhost');

2、测试结果

第一次输入a,第二次输入b,第三次输入c,结果变为abbccc而不是abc 3、分析 猜测:每次调用pipe就生成一个全新的隧道,因此,每次写数据都要从递增的多个隧道进行写入——就变成N倍了。

4、用unpipe取消每次递增的隧道

var net=require('net'); 
var file=require('fs').createWriteStream('./message.txt'); 
var server=net.createServer(); 
server.on('connection',function(socket){ 
    socket.setTimeout(10*1000); 
    socket.pause(); 
     //socket.pipe(file); 
    socket.on('timeout',function(){ 
        socket.resume(); 
        socket.pipe(file); 
    }); 
socket.on('data',function(data){ 
    socket.unpipe(file); //无法成功,???????????????? 
           socket.pause(); 
}); 

}); server.listen(‘8431’,‘localhost’);

结果:失败。unpipe根本无法取消pipe创建的隧道。 原因:参考https://groups.google.com/forum/#!topic/nodejs/75R0dOZV1UI 反正没找到解决方案(除非调用socket.removeAllListeners(); 但socket就无法继续监听data了,除非另建监听器,在循环下基本不可能)。

5、根据原因,仅调用一次pipe

var net=require('net'); 
var file=require('fs').createWriteStream('./message.txt'); 
var server=net.createServer(); 
server.on('connection',function(socket){ 
    socket.setTimeout(10*1000); 
    socket.pause(); 
        socket.pipe(file); // 只调用一次pipe 
    socket.on('timeout',function(){ 
        socket.resume(); 
        //socket.pipe(file); //注释,避免多次调用pipe 
    }); 
    socket.on('data',function(data){ 
        socket.pause(); 
    }); 
}); 
server.listen('8431','localhost'); 

结果:OK

6、问题:依然无解。 还是无法unpipe,除非没有任何嵌套,例如:

var net=require('net');
var fs=require('fs');
var file=fs.createWriteStream('./message.txt');
var server=net.createServer();
server.on('connection',function(socket){
    socket.pipe(file,{end:false});       //当客户端关闭时,不会立即关闭file的写操作
    socket.on('data',function(data){         
         if(data.toString()=='q'){       //收到字符q之后,不会再向file写入新的数据(但字符q可以被写入)
           socket.unpipe(file);         //取消指定pipe隧道
         }                        
    });
});
server.listen(8431,'localhost');
server.on('error',
function(err) {
    console.log('出错了,err=' + err.code);
});
server.on('close',
function() {
    console.log('本服务器要关闭了。');
});

上面结果:可以unpipe成功。

但,有嵌套就无法成功,例如:

var net=require('net');
var file=require('fs').createWriteStream('./message.txt');
var server=net.createServer();
server.on('connection',function(socket){
    socket.pause();
    setTimeout(function(){ // 本例子,仅能在每个新socket开始的30秒内进行暂停数据写入
        socket.resume();
        socket.pipe(file);
    }, 30000);
    socket.on('data',function(data){ //resume方法可以触发data事件,但data事件无法阻止pipe写入文件
        socket.unpipe();
        console.log('客户端触发data事件,数据为:'+data.toString());
    });
});

server.listen(8431,‘localhost’);

结果:无法unpipe成功。因为有嵌套,例如setTimeout。

真的是这样么? 我不知道,仅仅通过测试,瞎猜的! 请高人指点!


4 回复

Node.js 的 pipeunpipe 问题,可能是个 bug?

1. 看代码

var net = require('net');
var file = require('fs').createWriteStream('./message.txt');
var server = net.createServer();

server.on('connection', function(socket) {
    socket.setTimeout(10 * 1000);
    socket.pause();

    socket.on('timeout', function() {
        socket.resume();
        socket.pipe(file);  // 每次调用都会增加一遍数据写入
    });

    socket.on('data', function(data) {
        socket.unpipe(file); // 无法成功取消pipe
        socket.pause();
    });
});

server.listen('8431', 'localhost');

2. 测试结果

第一次输入 a,第二次输入 b,第三次输入 c,结果变为 abbccc 而不是 abc

3. 分析

猜测:每次调用 pipe 就会生成一个新的隧道,因此每次写数据都要从递增的多个隧道进行写入——这就变成了 N 倍的数据写入。

4. 使用 unpipe 取消每次递增的隧道

var net = require('net');
var file = require('fs').createWriteStream('./message.txt');
var server = net.createServer();

server.on('connection', function(socket) {
    socket.setTimeout(10 * 1000);
    socket.pause();

    socket.on('timeout', function() {
        socket.resume();
        socket.pipe(file);
    });

    socket.on('data', function(data) {
        socket.unpipe(file); // 无法成功取消pipe
        socket.pause();
    });
});

server.listen('8431', 'localhost');

结果:失败。unpipe 根本无法取消 pipe 创建的隧道。

5. 根据原因,仅调用一次 pipe

var net = require('net');
var file = require('fs').createWriteStream('./message.txt');
var server = net.createServer();

server.on('connection', function(socket) {
    socket.setTimeout(10 * 1000);
    socket.pause();

    socket.pipe(file); // 只调用一次 pipe

    socket.on('timeout', function() {
        socket.resume();
        // socket.pipe(file); // 注释,避免多次调用 pipe
    });

    socket.on('data', function(data) {
        socket.pause();
    });
});

server.listen('8431', 'localhost');

结果:OK

6. 问题:依然无解

还是无法 unpipe,除非没有任何嵌套,例如:

var net = require('net');
var fs = require('fs');
var file = fs.createWriteStream('./message.txt');
var server = net.createServer();

server.on('connection', function(socket) {
    socket.pipe(file, { end: false }); // 当客户端关闭时,不会立即关闭 file 的写操作
    socket.on('data', function(data) {         
        if (data.toString() == 'q') { // 收到字符 q 之后,不会再向 file 写入新的数据(但字符 q 可以被写入)
            socket.unpipe(file); // 取消指定 pipe 隧道
        }
    });
});

server.listen(8431, 'localhost');
server.on('error', function(err) {
    console.log('出错了,err=' + err.code);
});
server.on('close', function() {
    console.log('本服务器要关闭了。');
});

上面结果:可以 unpipe 成功。

但是,如果有嵌套,例如:

var net = require('net');
var file = require('fs').createWriteStream('./message.txt');
var server = net.createServer();

server.on('connection', function(socket) {
    socket.pause();
    setTimeout(function() { // 本例子,仅能在每个新 socket 开始的 30 秒内进行暂停数据写入
        socket.resume();
        socket.pipe(file);
    }, 30000);

    socket.on('data', function(data) { // resume 方法可以触发 data 事件,但 data 事件无法阻止 pipe 写入文件
        socket.unpipe();
        console.log('客户端触发 data 事件,数据为:' + data.toString());
    });
});

server.listen(8431, 'localhost');

结果:无法 unpipe 成功。因为有嵌套,例如 setTimeout

结论

目前来看,unpipe 在有嵌套的情况下可能会失效。建议仅调用一次 pipe,并在必要时手动管理数据流。如果需要动态地停止写入,考虑使用条件逻辑来控制 pipeunpipe 的调用时机。


楼主,你的问题解决了么?我想在forever模块里data时间触发时判断是否创建当前日期的新日志,unpipe掉旧日志,pipe一个新日志。pipe有效,unpipe无效。具体可以看这里 https://cnodejs.org/topic/54dc0f5550194b3e191bc483

  socket.unpipe(file); //无法成功,????????????????

用的不对,自然无法unpipe。 不能在on(‘data’)里面unpipe,因为当前的pipe还有很多工作没执行完,需要把unpipe放在setImmediate / process.nextTick里。 打个比方吧,如果想要拆一个房子,肯定不能在房子里面拆,会砸到自己

你的观察是正确的。pipe 方法一旦建立后,如果想要取消它,需要确保没有嵌套或者异步调用导致 unpipepipe 之后再被调用。

以下是优化后的代码示例,展示了如何正确地使用 pipeunpipe

const net = require('net');
const fs = require('fs');

const file = fs.createWriteStream('./message.txt');
const server = net.createServer();

server.on('connection', function(socket) {
    socket.setTimeout(10 * 1000);
    socket.pause();

    socket.on('timeout', function() {
        socket.resume();
        socket.pipe(file, { end: false }); // 使用 end: false 防止自动关闭
    });

    socket.on('data', function(data) {
        socket.unpipe(file); // 取消管道连接
        socket.pause(); // 暂停数据接收

        if (data.toString().trim() === 'q') {
            // 如果接收到特定字符,停止写入文件
            file.end();
        } else {
            // 继续写入文件
            socket.pipe(file);
        }
    });
});

server.listen(8431, 'localhost');

解释:

  1. pipe(file, { end: false }):

    • 使用 { end: false } 参数防止在管道结束时自动关闭文件流。
  2. socket.unpipe(file):

    • 在接收到数据后,调用 unpipe 来取消管道连接。
  3. if (data.toString().trim() === 'q'):

    • 当接收到特定字符(例如 'q')时,终止文件写入并关闭文件流。
  4. socket.pipe(file):

    • 如果不希望停止写入文件,可以在 unpipe 后重新建立管道连接。

通过这种方式,可以有效地管理数据流的管道连接,并在需要时取消这些连接。

回到顶部