Node.js的pipe和unpipe问题,可能是个bug?
Node.js的pipe和unpipe问题,可能是个bug?
1、看代码
var net=require('net');
var file=require('fs').createWriteStream('./message.txt');
var server=net.createServer();
server.on('connection',function(socket){
socket.setTimeout(10*1000);
socket.pause();
//socket.pipe(file); //推荐仅调用一次pipe
socket.on('timeout',function(){
socket.resume();
socket.pipe(file); //结果是:每一次调用都会增加一遍数据写入
});
socket.on('data',function(data){
socket.unpipe(file); //无法成功,????????????????
socket.pause();
});
});
server.listen('8431','localhost');
2、测试结果
第一次输入a,第二次输入b,第三次输入c,结果变为abbccc而不是abc 3、分析 猜测:每次调用pipe就生成一个全新的隧道,因此,每次写数据都要从递增的多个隧道进行写入——就变成N倍了。
4、用unpipe取消每次递增的隧道
var net=require('net');
var file=require('fs').createWriteStream('./message.txt');
var server=net.createServer();
server.on('connection',function(socket){
socket.setTimeout(10*1000);
socket.pause();
//socket.pipe(file);
socket.on('timeout',function(){
socket.resume();
socket.pipe(file);
});
socket.on('data',function(data){
socket.unpipe(file); //无法成功,????????????????
socket.pause();
});
});
server.listen(‘8431’,‘localhost’);
结果:失败。unpipe根本无法取消pipe创建的隧道。 原因:参考https://groups.google.com/forum/#!topic/nodejs/75R0dOZV1UI 反正没找到解决方案(除非调用socket.removeAllListeners(); 但socket就无法继续监听data了,除非另建监听器,在循环下基本不可能)。
5、根据原因,仅调用一次pipe
var net=require('net');
var file=require('fs').createWriteStream('./message.txt');
var server=net.createServer();
server.on('connection',function(socket){
socket.setTimeout(10*1000);
socket.pause();
socket.pipe(file); // 只调用一次pipe
socket.on('timeout',function(){
socket.resume();
//socket.pipe(file); //注释,避免多次调用pipe
});
socket.on('data',function(data){
socket.pause();
});
});
server.listen('8431','localhost');
结果:OK
6、问题:依然无解。 还是无法unpipe,除非没有任何嵌套,例如:
var net=require('net');
var fs=require('fs');
var file=fs.createWriteStream('./message.txt');
var server=net.createServer();
server.on('connection',function(socket){
socket.pipe(file,{end:false}); //当客户端关闭时,不会立即关闭file的写操作
socket.on('data',function(data){
if(data.toString()=='q'){ //收到字符q之后,不会再向file写入新的数据(但字符q可以被写入)
socket.unpipe(file); //取消指定pipe隧道
}
});
});
server.listen(8431,'localhost');
server.on('error',
function(err) {
console.log('出错了,err=' + err.code);
});
server.on('close',
function() {
console.log('本服务器要关闭了。');
});
上面结果:可以unpipe成功。
但,有嵌套就无法成功,例如:
var net=require('net');
var file=require('fs').createWriteStream('./message.txt');
var server=net.createServer();
server.on('connection',function(socket){
socket.pause();
setTimeout(function(){ // 本例子,仅能在每个新socket开始的30秒内进行暂停数据写入
socket.resume();
socket.pipe(file);
}, 30000);
socket.on('data',function(data){ //resume方法可以触发data事件,但data事件无法阻止pipe写入文件
socket.unpipe();
console.log('客户端触发data事件,数据为:'+data.toString());
});
});
server.listen(8431,‘localhost’);
结果:无法unpipe成功。因为有嵌套,例如setTimeout。
真的是这样么? 我不知道,仅仅通过测试,瞎猜的! 请高人指点!
Node.js 的 pipe
和 unpipe
问题,可能是个 bug?
1. 看代码
var net = require('net');
var file = require('fs').createWriteStream('./message.txt');
var server = net.createServer();
server.on('connection', function(socket) {
socket.setTimeout(10 * 1000);
socket.pause();
socket.on('timeout', function() {
socket.resume();
socket.pipe(file); // 每次调用都会增加一遍数据写入
});
socket.on('data', function(data) {
socket.unpipe(file); // 无法成功取消pipe
socket.pause();
});
});
server.listen('8431', 'localhost');
2. 测试结果
第一次输入 a
,第二次输入 b
,第三次输入 c
,结果变为 abbccc
而不是 abc
。
3. 分析
猜测:每次调用 pipe
就会生成一个新的隧道,因此每次写数据都要从递增的多个隧道进行写入——这就变成了 N 倍的数据写入。
4. 使用 unpipe
取消每次递增的隧道
var net = require('net');
var file = require('fs').createWriteStream('./message.txt');
var server = net.createServer();
server.on('connection', function(socket) {
socket.setTimeout(10 * 1000);
socket.pause();
socket.on('timeout', function() {
socket.resume();
socket.pipe(file);
});
socket.on('data', function(data) {
socket.unpipe(file); // 无法成功取消pipe
socket.pause();
});
});
server.listen('8431', 'localhost');
结果:失败。unpipe
根本无法取消 pipe
创建的隧道。
5. 根据原因,仅调用一次 pipe
var net = require('net');
var file = require('fs').createWriteStream('./message.txt');
var server = net.createServer();
server.on('connection', function(socket) {
socket.setTimeout(10 * 1000);
socket.pause();
socket.pipe(file); // 只调用一次 pipe
socket.on('timeout', function() {
socket.resume();
// socket.pipe(file); // 注释,避免多次调用 pipe
});
socket.on('data', function(data) {
socket.pause();
});
});
server.listen('8431', 'localhost');
结果:OK
6. 问题:依然无解
还是无法 unpipe
,除非没有任何嵌套,例如:
var net = require('net');
var fs = require('fs');
var file = fs.createWriteStream('./message.txt');
var server = net.createServer();
server.on('connection', function(socket) {
socket.pipe(file, { end: false }); // 当客户端关闭时,不会立即关闭 file 的写操作
socket.on('data', function(data) {
if (data.toString() == 'q') { // 收到字符 q 之后,不会再向 file 写入新的数据(但字符 q 可以被写入)
socket.unpipe(file); // 取消指定 pipe 隧道
}
});
});
server.listen(8431, 'localhost');
server.on('error', function(err) {
console.log('出错了,err=' + err.code);
});
server.on('close', function() {
console.log('本服务器要关闭了。');
});
上面结果:可以 unpipe
成功。
但是,如果有嵌套,例如:
var net = require('net');
var file = require('fs').createWriteStream('./message.txt');
var server = net.createServer();
server.on('connection', function(socket) {
socket.pause();
setTimeout(function() { // 本例子,仅能在每个新 socket 开始的 30 秒内进行暂停数据写入
socket.resume();
socket.pipe(file);
}, 30000);
socket.on('data', function(data) { // resume 方法可以触发 data 事件,但 data 事件无法阻止 pipe 写入文件
socket.unpipe();
console.log('客户端触发 data 事件,数据为:' + data.toString());
});
});
server.listen(8431, 'localhost');
结果:无法 unpipe
成功。因为有嵌套,例如 setTimeout
。
结论
目前来看,unpipe
在有嵌套的情况下可能会失效。建议仅调用一次 pipe
,并在必要时手动管理数据流。如果需要动态地停止写入,考虑使用条件逻辑来控制 pipe
和 unpipe
的调用时机。
楼主,你的问题解决了么?我想在forever模块里data时间触发时判断是否创建当前日期的新日志,unpipe掉旧日志,pipe一个新日志。pipe有效,unpipe无效。具体可以看这里 https://cnodejs.org/topic/54dc0f5550194b3e191bc483
socket.unpipe(file); //无法成功,????????????????
用的不对,自然无法unpipe。 不能在on(‘data’)里面unpipe,因为当前的pipe还有很多工作没执行完,需要把unpipe放在setImmediate / process.nextTick里。 打个比方吧,如果想要拆一个房子,肯定不能在房子里面拆,会砸到自己
你的观察是正确的。pipe
方法一旦建立后,如果想要取消它,需要确保没有嵌套或者异步调用导致 unpipe
在 pipe
之后再被调用。
以下是优化后的代码示例,展示了如何正确地使用 pipe
和 unpipe
:
const net = require('net');
const fs = require('fs');
const file = fs.createWriteStream('./message.txt');
const server = net.createServer();
server.on('connection', function(socket) {
socket.setTimeout(10 * 1000);
socket.pause();
socket.on('timeout', function() {
socket.resume();
socket.pipe(file, { end: false }); // 使用 end: false 防止自动关闭
});
socket.on('data', function(data) {
socket.unpipe(file); // 取消管道连接
socket.pause(); // 暂停数据接收
if (data.toString().trim() === 'q') {
// 如果接收到特定字符,停止写入文件
file.end();
} else {
// 继续写入文件
socket.pipe(file);
}
});
});
server.listen(8431, 'localhost');
解释:
-
pipe(file, { end: false })
:- 使用
{ end: false }
参数防止在管道结束时自动关闭文件流。
- 使用
-
socket.unpipe(file)
:- 在接收到数据后,调用
unpipe
来取消管道连接。
- 在接收到数据后,调用
-
if (data.toString().trim() === 'q')
:- 当接收到特定字符(例如
'q'
)时,终止文件写入并关闭文件流。
- 当接收到特定字符(例如
-
socket.pipe(file)
:- 如果不希望停止写入文件,可以在
unpipe
后重新建立管道连接。
- 如果不希望停止写入文件,可以在
通过这种方式,可以有效地管理数据流的管道连接,并在需要时取消这些连接。