Pub/Sub 模型在Nodejs中如何处理ETIMEDOUT与EPIPE?

Pub/Sub 模型在Nodejs中如何处理ETIMEDOUT与EPIPE?

Node.js新手。

最近使用Node.js的TCP实现一个Pub/Sub模型的应用,逻辑很简单。

比如有三个客户端A, B, C连接服务器S。服务器S负责转发ABC的数据,和聊天非常相似,但是是二进制的数据。

但是服务端上线之后,发现数据经常有错误。表现在客户端收到服务端转发来的数据,却无法正常解析。

我在日志中输出了socket遇到的所有错误,其中发现了大量类似这样的输出:

{ [Error: write ETIMEDOUT] code: ‘ETIMEDOUT’, errno: ‘ETIMEDOUT’, syscall: ‘write’ } { [Error: read ETIMEDOUT] code: ‘ETIMEDOUT’, errno: ‘ETIMEDOUT’, syscall: ‘read’ } { [Error: write ETIMEDOUT] code: ‘ETIMEDOUT’, errno: ‘ETIMEDOUT’, syscall: ‘write’ } { [Error: write EPIPE] code: ‘EPIPE’, errno: ‘EPIPE’, syscall: ‘write’ } { [Error: write EPIPE] code: ‘EPIPE’, errno: ‘EPIPE’, syscall: ‘write’ } { [Error: write EPIPE] code: ‘EPIPE’, errno: ‘EPIPE’, syscall: ‘write’ } { [Error: write EPIPE] code: ‘EPIPE’, errno: ‘EPIPE’, syscall: ‘write’ } { [Error: write EPIPE] code: ‘EPIPE’, errno: ‘EPIPE’, syscall: ‘write’ }

socket我是设置了setKeepAlive的,而且也设置了setNoDelay。

因为客户端通过服务端进行通信,数据包非常多,服务端将所有的数据做解析、验证,非常耗资源,所以这里想问一下,类似这样的错误该如何处理?

关键代码:

// server继承于net.Server server.on(‘listening’, function() { var port = server.address().port; }).on(‘connection’, function(cli) { cli.socketBuf = new Buffers(); cli.commandStarted = false; cli.dataSize = 0; cli.setKeepAlive(true); cli.setNoDelay(true); cli.on(‘connect’, function() { server.clients.push(cli); }).on(‘close’, function() { var index = server.clients.indexOf(cli); server.clients.splice(index, 1); }).on(‘data’, function (buf) { server.emit(‘data’, cli, buf); if(op.autoBroadcast) { _.each(server.clients, function© { if(c != cli) c.write(buf); }); } }).on(‘error’, function(err) { console.log(err); //上面日志输出的错误,都来自这里 }); }).on(‘error’, function(err) { console.log(‘Error with server!’); console.log(err); });

// …

// room.dataSocket 是上面对象的实例 room.dataSocket.on(‘data’, function(cli, d) { // bf是带有缓冲区的文件类 bf.append(d); room.dataFileSize += d.length;

}).on(‘connection’, function(con){ bf.readAll(function(da) { con.write(da); }); });

完整代码:https://github.com/liuyanghejerry/painttyServer/blob/master/socket.js


2 回复

针对您的问题,我将解释如何在Node.js中处理ETIMEDOUTEPIPE 错误,并提供一些改进的代码示例。

1. 处理 ETIMEDOUT 错误

ETIMEDOUT 错误通常表示网络超时。这可能是因为客户端与服务器之间的连接长时间没有活动或者网络不稳定导致的。为了处理这种情况,可以设置TCP连接的keepAlive选项,并设置适当的超时时间。同时,可以增加重试机制以确保数据能够被发送。

const net = require('net');

const server = net.createServer((client) => {
    client.setKeepAlive(true, 5000); // 启用keepAlive,每隔5秒发送一次心跳
    client.setTimeout(30000); // 设置超时时间为30秒
    
    client.on('timeout', () => {
        console.log("Connection timeout");
        client.end(); // 关闭连接
    });

    client.on('data', (buf) => {
        // 处理接收到的数据
    });

    client.on('error', (err) => {
        if (err.code === 'ETIMEDOUT') {
            console.log("Connection timed out:", err);
            client.destroy(); // 强制关闭连接
        }
    });
});

server.listen(8080, () => {
    console.log('Server is listening on port 8080');
});

2. 处理 EPIPE 错误

EPIPE 错误通常发生在尝试向已经关闭或不可写的套接字写入数据时。这种情况下,可以捕获错误并优雅地处理它,例如通过记录错误信息或清理资源。

client.on('error', (err) => {
    if (err.code === 'EPIPE') {
        console.log("Broken pipe detected:", err);
        // 清理资源,例如从客户端列表中移除该客户端
        server.clients = server.clients.filter(c => c !== client);
    }
});

3. 优化数据处理逻辑

在您的代码中,当数据到达时,您尝试将数据广播给所有其他客户端。然而,如果其中一个客户端关闭了连接,可能会导致EPIPE错误。为了避免这种情况,可以在广播数据之前检查每个客户端的状态。

client.on('data', (buf) => {
    server.clients.forEach((c) => {
        if (c.writable) { // 确保客户端仍然可写
            c.write(buf);
        } else {
            console.log("Client is not writable, removing from list");
            server.clients = server.clients.filter(client => client !== c);
        }
    });
});

这些更改可以帮助您更有效地处理网络超时和管道错误,并提高应用的稳定性和可靠性。希望这些示例能帮助您解决遇到的问题。


在处理TCP的Pub/Sub模型时,ETIMEDOUTEPIPE 错误是比较常见的网络问题。这些错误通常表示客户端或服务器之间的连接存在问题。下面是针对这些问题的一些解决方案和建议:

处理 ETIMEDOUT

ETIMEDOUT 表示写操作超时。这可能是因为网络延迟高或者目标主机不可达。

解决方案

  1. 增加超时时间:可以尝试增加 socket.setTimeout 的时间。
  2. 重试机制:如果连接超时,可以实现重试机制,重连或重新发送数据。
  3. 心跳检测:保持连接活跃,可以定时发送心跳消息。
cli.setTimeout(5000); // 设置超时时间为5秒

处理 EPIPE

EPIPE 表示管道中断,通常是由于接收方关闭了连接。

解决方案

  1. 捕获并处理错误:在接收到数据之前检查socket的状态。
  2. 优雅地关闭连接:当捕获到错误时,优雅地关闭连接,并从客户端列表中移除。
cli.on('error', function(err) {
    if (err.code === 'EPIPE') {
        console.log('Client disconnected unexpectedly');
        cli.destroy(); // 销毁socket
    } else {
        console.log(err);
    }
});

完整示例

const net = require('net');

const server = net.createServer();

server.on('listening', function() {
    const port = server.address().port;
    console.log(`Server listening on port ${port}`);
});

server.on('connection', function(cli) {
    cli.setKeepAlive(true);
    cli.setNoDelay(true);
    cli.setTimeout(5000);

    cli.on('connect', function() {
        server.clients.push(cli);
    });

    cli.on('close', function() {
        const index = server.clients.indexOf(cli);
        server.clients.splice(index, 1);
    });

    cli.on('data', function(buf) {
        server.emit('data', cli, buf);
        if (op.autoBroadcast) {
            _.each(server.clients, function(c) {
                if (c !== cli) c.write(buf);
            });
        }
    });

    cli.on('error', function(err) {
        if (err.code === 'EPIPE') {
            console.log('Client disconnected unexpectedly');
            cli.destroy(); // 销毁socket
        } else {
            console.log(err);
        }
    });
});

server.on('error', function(err) {
    console.log('Error with server!');
    console.log(err);
});

server.listen(8080);

通过上述代码示例,我们可以更好地管理和处理客户端连接中的常见错误。

回到顶部