Nodejs关于net模块下的socket,接收数据存储数据库问题,请高手指点 数据接收不全,对Nodejs还是没有很好理解 我觉得是处理回调函数时的问题,刚上手,不太清楚,贴代码!

Nodejs关于net模块下的socket,接收数据存储数据库问题,请高手指点
数据接收不全,对Nodejs还是没有很好理解
我觉得是处理回调函数时的问题,刚上手,不太清楚,贴代码!

一个小程序,用来接收一些数字信息,并写入数据库。 var net = require(‘net’); var sql = require(‘msnodesql’); var useTrustedConnection = true; var conn_str = “Driver={” + driver + “};Server=” + server + “;” + (useTrustedConnection == false ? “Trusted_Connection={Yes};” : “UID=” + user + “;PWD=” + pwd + “;”) + “Database={” + database + “};”; var Time = new Array();
var Ip=new Array(); var index1 = 0; var index2 = 0; var index3 = 0;//接受数据条数 var flag = true; var length = 12 *100;//每条数据长度为12个字节 buf1 = new Buffer(length); buf2 = new Buffer(length);

net.createServer(function(sock) { sock.setEncoding(‘binary’); // 我们获得一个连接 - 该连接自动关联一个socket对象 console.log('CONNECTED: ’ + sock.remoteAddress + ‘:’ + sock.remotePort);

// 为这个socket实例添加一个"data"事件处理函数

sock.on(‘data’, function(data) {

//判断buf1是否已满,满了就存入数据库,并修改个标志位 if ((flag == true) && (index1 == length)) { sql.open(conn_str, function(err, conn) { if (err) { console.log(“err:” + err); } else { for (var i = 0; i < index3; i++) { conn.queryRaw(“INSERT INTO RoanCondition(speed,number,No,StartTime,Ip) VALUES(” + buf1.readFloatLE(8 + i * 12) + “,” + buf1.readInt32LE(4 + i * 12) + “,” + buf1.readInt32LE(0 + i * 12) + “,’” + Time[i] + “’,’” + Ip[i] + “’)”, function(err, results) { if (err) { console.log(“data1 err:” + err); }
}); } flag = false; index3 = 0; index1 = 0; } });

}
//判断buf2是否已满,满了就存入数据库,并修改个标志位
    if ((flag == false) && (index2 == length)) {
        sql.open(conn_str, function(err, conn) {
            if (err) {
                console.log("err:" + err);
            }
            else {
                for (var i = 0; i < index3; i++) {
                    conn.queryRaw("INSERT INTO RoanCondition(speed,number,No,StartTime,Ip) VALUES(" + buf2.readFloatLE(8 + i * 12) + "," + buf2.readInt32LE(4 + i * 12) + "," + buf2.readInt32LE(0 + i * 12) + ",'" + Time[i] + "','" + Ip[i] + "')", function(err, results) {
                        if (err) {
                            console.log("data2 err:" + err);
                        }
                    });
                }
                flag = true;
                index2 = 0;
                index3 = 0;   
            }
        });          
    }
    //接受客户端数据       
    var len = data.length;
    //若buf1未满,则存入
    if ((flag == true) && (index1 < length)) {
        for (var i = 0; i < len; i++) {
            buf1[index1] = data.charCodeAt(i);
            index1++;
        }
      //记录时间和ip
        Ip[index3] = sock.remoteAddress;
        var Datetime = new Date();
        var time = Datetime.toLocaleTimeString();
        var year = Datetime.getFullYear();
        var mouth = Datetime.getMonth() + 1;
        var day = Datetime.getDate();
        Time[index3] = year + "-" + mouth + "-" + day + " " + time;

    }
    ///若buf2未满,则存入
    else if ((flag == false) && (index2 < length)) {
        for (var i = 0; i < len; i++) {
            buf2[index2] = data.charCodeAt(i);
            index2++;
        }
        Ip[index3] = sock.remoteAddress;
        var Datetime = new Date();
        var time = Datetime.toLocaleTimeString();
        var year = Datetime.getFullYear();
        var mouth = Datetime.getMonth() + 1;
        var day = Datetime.getDate();
        Time[index3] = year + "-" + mouth + "-" + day + " " + time;
    }
    index3++;
});

// 为这个socket实例添加一个"close"事件处理函数
sock.on('close', function(data) {
    console.log('CLOSED: ' +
        sock.remoteAddress + ' ' + sock.remotePort);
});
//存储间隔时间

}).listen(PORT, HOST);

console.log('Server listening on ’ + HOST + ‘:’ + PORT);


3 回复

针对你的问题,你的代码中存在一些潜在的逻辑错误和性能问题。特别是在处理缓冲区(Buffer)和回调函数时。以下是一个优化后的示例代码,它应该能更好地解决你的问题。

const net = require('net');
const sql = require('msnodesqlv8'); // 假设你使用的是 msnodesqlv8
const PORT = 12345;
const HOST = 'localhost';

const conn_str = "Driver={SQL Server};Server=localhost;Database=testdb;Trusted_Connection=true;";
let clients = [];

net.createServer((sock) => {
    console.log(`CONNECTED: ${sock.remoteAddress}:${sock.remotePort}`);
    
    clients.push(sock);

    sock.on('data', (data) => {
        const ip = sock.remoteAddress;
        const datetime = new Date().toISOString();

        // 将数据解析并插入数据库
        processData(data, ip, datetime, (err) => {
            if (err) {
                console.error(err);
            }
        });
    });

    sock.on('end', () => {
        console.log(`CONNECTION CLOSED: ${sock.remoteAddress}:${sock.remotePort}`);
        clients = clients.filter(client => client !== sock);
    });
}).listen(PORT, HOST, () => {
    console.log(`Server listening on ${HOST}:${PORT}`);
});

function processData(data, ip, datetime, callback) {
    try {
        // 假设每个数据包包含多个记录,每条记录长度为12字节
        const recordLength = 12;
        const recordsCount = Math.floor(data.length / recordLength);

        let insertQueries = [];
        for (let i = 0; i < recordsCount; i++) {
            const offset = i * recordLength;
            const speed = data.readFloatLE(offset);
            const number = data.readInt32LE(offset + 4);
            const no = data.readInt32LE(offset + 8);

            insertQueries.push({
                speed,
                number,
                no,
                ip,
                datetime
            });
        }

        // 批量插入数据库
        insertQueries.forEach(query => {
            sql.query(conn_str, `INSERT INTO RoanCondition(speed, number, No, StartTime, Ip) VALUES(${query.speed}, ${query.number}, ${query.no}, '${query.datetime}', '${query.ip}')`, (err, res) => {
                if (err) {
                    callback(err);
                }
            });
        });

        callback(null);
    } catch (error) {
        callback(error);
    }
}

关键改进点:

  1. 简化缓冲区管理:不再需要两个缓冲区 buf1buf2,而是直接处理接收到的数据。
  2. 批量处理:将所有接收到的数据一次性解析并插入数据库,避免了多次调用数据库操作。
  3. 错误处理:在处理数据时添加了错误处理逻辑,确保出现问题时能够及时反馈。

希望这些改进能帮助你解决问题。如果有更多具体的需求或问题,欢迎继续提问。


只会动态语言的标记对手动操作 Buffer 好陌生…

从你的描述来看,问题可能出在如何正确处理Socket接收到的数据以及如何有效管理数据存储。下面是一个简化版的示例代码,展示了如何正确地接收数据并存储到数据库中。

示例代码

const net = require('net');
const sql = require('msnodesql');

// 数据库连接字符串
const conn_str = "..."; 

// 创建服务器
const server = net.createServer((socket) => {
    console.log(`Client connected: ${socket.remoteAddress}:${socket.remotePort}`);

    let buffer = Buffer.alloc(0);

    socket.on('data', (data) => {
        buffer = Buffer.concat([buffer, data]);

        // 检查是否已接收到完整数据块
        while (buffer.length >= 12) {
            const speed = buffer.readFloatLE(0);
            const number = buffer.readInt32LE(4);
            const no = buffer.readInt32LE(8);

            // 存储到数据库
            saveToDatabase(speed, number, no, socket.remoteAddress);

            // 移除已处理的数据块
            buffer = buffer.slice(12);
        }
    });

    socket.on('end', () => {
        console.log(`Client disconnected: ${socket.remoteAddress}:${socket.remotePort}`);
    });

    socket.on('error', (err) => {
        console.error(`Socket error: ${err.message}`);
    });
});

server.listen(PORT, HOST, () => {
    console.log(`Server listening on ${HOST}:${PORT}`);
});

// 将数据存储到数据库
function saveToDatabase(speed, number, no, ip) {
    const query = `INSERT INTO RoanCondition(speed, number, No, StartTime, Ip) VALUES(${speed}, ${number}, ${no}, GETDATE(), '${ip}')`;

    sql.open(conn_str, (err, conn) => {
        if (err) {
            console.error(`Error opening connection: ${err.message}`);
            return;
        }

        conn.queryRaw(query, (err, results) => {
            if (err) {
                console.error(`Error executing query: ${err.message}`);
            } else {
                console.log(`Data saved successfully`);
            }
        });

        conn.close();
    });
}

解释

  1. Buffer管理: 使用Buffer来累积从Socket接收到的数据。确保当数据量达到12个字节时,才将其拆分出来进行处理。
  2. 数据拆分: 每次接收到数据时,检查是否有足够的数据来完成一条完整的记录(12个字节)。如果有,立即处理并移除已处理的数据。
  3. 数据库存储: 使用saveToDatabase函数将数据插入到数据库中。这里假设数据库中有一个名为RoanCondition的表,并且使用了SQL Server的GETDATE()函数获取当前时间。
  4. 错误处理: 在每个可能出错的地方都添加了错误处理逻辑,以确保程序的健壮性。

通过这种方式,你可以更有效地管理数据接收和存储过程,避免因为数据不完整导致的数据丢失或重复存储的问题。

回到顶部