Nodejs关于net模块下的socket,接收数据存储数据库问题,请高手指点 数据接收不全,对Nodejs还是没有很好理解 我觉得是处理回调函数时的问题,刚上手,不太清楚,贴代码!
Nodejs关于net模块下的socket,接收数据存储数据库问题,请高手指点
数据接收不全,对Nodejs还是没有很好理解
我觉得是处理回调函数时的问题,刚上手,不太清楚,贴代码!
一个小程序,用来接收一些数字信息,并写入数据库。
var net = require(‘net’);
var sql = require(‘msnodesql’);
var useTrustedConnection = true;
var conn_str = “Driver={” + driver + “};Server=” + server + “;” + (useTrustedConnection == false ? “Trusted_Connection={Yes};” : “UID=” + user + “;PWD=” + pwd + “;”) + “Database={” + database + “};”;
var Time = new Array();
var Ip=new Array();
var index1 = 0;
var index2 = 0;
var index3 = 0;//接受数据条数
var flag = true;
var length = 12 *100;//每条数据长度为12个字节
buf1 = new Buffer(length);
buf2 = new Buffer(length);
net.createServer(function(sock) { sock.setEncoding(‘binary’); // 我们获得一个连接 - 该连接自动关联一个socket对象 console.log('CONNECTED: ’ + sock.remoteAddress + ‘:’ + sock.remotePort);
// 为这个socket实例添加一个"data"事件处理函数
sock.on(‘data’, function(data) {
//判断buf1是否已满,满了就存入数据库,并修改个标志位
if ((flag == true) && (index1 == length)) {
sql.open(conn_str, function(err, conn) {
if (err) {
console.log(“err:” + err);
}
else {
for (var i = 0; i < index3; i++) {
conn.queryRaw(“INSERT INTO RoanCondition(speed,number,No,StartTime,Ip) VALUES(” + buf1.readFloatLE(8 + i * 12) + “,” + buf1.readInt32LE(4 + i * 12) + “,” + buf1.readInt32LE(0 + i * 12) + “,’” + Time[i] + “’,’” + Ip[i] + “’)”, function(err, results) {
if (err) {
console.log(“data1 err:” + err);
}
});
}
flag = false;
index3 = 0;
index1 = 0;
}
});
}
//判断buf2是否已满,满了就存入数据库,并修改个标志位
if ((flag == false) && (index2 == length)) {
sql.open(conn_str, function(err, conn) {
if (err) {
console.log("err:" + err);
}
else {
for (var i = 0; i < index3; i++) {
conn.queryRaw("INSERT INTO RoanCondition(speed,number,No,StartTime,Ip) VALUES(" + buf2.readFloatLE(8 + i * 12) + "," + buf2.readInt32LE(4 + i * 12) + "," + buf2.readInt32LE(0 + i * 12) + ",'" + Time[i] + "','" + Ip[i] + "')", function(err, results) {
if (err) {
console.log("data2 err:" + err);
}
});
}
flag = true;
index2 = 0;
index3 = 0;
}
});
}
//接受客户端数据
var len = data.length;
//若buf1未满,则存入
if ((flag == true) && (index1 < length)) {
for (var i = 0; i < len; i++) {
buf1[index1] = data.charCodeAt(i);
index1++;
}
//记录时间和ip
Ip[index3] = sock.remoteAddress;
var Datetime = new Date();
var time = Datetime.toLocaleTimeString();
var year = Datetime.getFullYear();
var mouth = Datetime.getMonth() + 1;
var day = Datetime.getDate();
Time[index3] = year + "-" + mouth + "-" + day + " " + time;
}
///若buf2未满,则存入
else if ((flag == false) && (index2 < length)) {
for (var i = 0; i < len; i++) {
buf2[index2] = data.charCodeAt(i);
index2++;
}
Ip[index3] = sock.remoteAddress;
var Datetime = new Date();
var time = Datetime.toLocaleTimeString();
var year = Datetime.getFullYear();
var mouth = Datetime.getMonth() + 1;
var day = Datetime.getDate();
Time[index3] = year + "-" + mouth + "-" + day + " " + time;
}
index3++;
});
// 为这个socket实例添加一个"close"事件处理函数
sock.on('close', function(data) {
console.log('CLOSED: ' +
sock.remoteAddress + ' ' + sock.remotePort);
});
//存储间隔时间
}).listen(PORT, HOST);
console.log('Server listening on ’ + HOST + ‘:’ + PORT);
针对你的问题,你的代码中存在一些潜在的逻辑错误和性能问题。特别是在处理缓冲区(Buffer)和回调函数时。以下是一个优化后的示例代码,它应该能更好地解决你的问题。
const net = require('net');
const sql = require('msnodesqlv8'); // 假设你使用的是 msnodesqlv8
const PORT = 12345;
const HOST = 'localhost';
const conn_str = "Driver={SQL Server};Server=localhost;Database=testdb;Trusted_Connection=true;";
let clients = [];
net.createServer((sock) => {
console.log(`CONNECTED: ${sock.remoteAddress}:${sock.remotePort}`);
clients.push(sock);
sock.on('data', (data) => {
const ip = sock.remoteAddress;
const datetime = new Date().toISOString();
// 将数据解析并插入数据库
processData(data, ip, datetime, (err) => {
if (err) {
console.error(err);
}
});
});
sock.on('end', () => {
console.log(`CONNECTION CLOSED: ${sock.remoteAddress}:${sock.remotePort}`);
clients = clients.filter(client => client !== sock);
});
}).listen(PORT, HOST, () => {
console.log(`Server listening on ${HOST}:${PORT}`);
});
function processData(data, ip, datetime, callback) {
try {
// 假设每个数据包包含多个记录,每条记录长度为12字节
const recordLength = 12;
const recordsCount = Math.floor(data.length / recordLength);
let insertQueries = [];
for (let i = 0; i < recordsCount; i++) {
const offset = i * recordLength;
const speed = data.readFloatLE(offset);
const number = data.readInt32LE(offset + 4);
const no = data.readInt32LE(offset + 8);
insertQueries.push({
speed,
number,
no,
ip,
datetime
});
}
// 批量插入数据库
insertQueries.forEach(query => {
sql.query(conn_str, `INSERT INTO RoanCondition(speed, number, No, StartTime, Ip) VALUES(${query.speed}, ${query.number}, ${query.no}, '${query.datetime}', '${query.ip}')`, (err, res) => {
if (err) {
callback(err);
}
});
});
callback(null);
} catch (error) {
callback(error);
}
}
关键改进点:
- 简化缓冲区管理:不再需要两个缓冲区
buf1
和buf2
,而是直接处理接收到的数据。 - 批量处理:将所有接收到的数据一次性解析并插入数据库,避免了多次调用数据库操作。
- 错误处理:在处理数据时添加了错误处理逻辑,确保出现问题时能够及时反馈。
希望这些改进能帮助你解决问题。如果有更多具体的需求或问题,欢迎继续提问。
只会动态语言的标记对手动操作 Buffer 好陌生…
从你的描述来看,问题可能出在如何正确处理Socket接收到的数据以及如何有效管理数据存储。下面是一个简化版的示例代码,展示了如何正确地接收数据并存储到数据库中。
示例代码
const net = require('net');
const sql = require('msnodesql');
// 数据库连接字符串
const conn_str = "...";
// 创建服务器
const server = net.createServer((socket) => {
console.log(`Client connected: ${socket.remoteAddress}:${socket.remotePort}`);
let buffer = Buffer.alloc(0);
socket.on('data', (data) => {
buffer = Buffer.concat([buffer, data]);
// 检查是否已接收到完整数据块
while (buffer.length >= 12) {
const speed = buffer.readFloatLE(0);
const number = buffer.readInt32LE(4);
const no = buffer.readInt32LE(8);
// 存储到数据库
saveToDatabase(speed, number, no, socket.remoteAddress);
// 移除已处理的数据块
buffer = buffer.slice(12);
}
});
socket.on('end', () => {
console.log(`Client disconnected: ${socket.remoteAddress}:${socket.remotePort}`);
});
socket.on('error', (err) => {
console.error(`Socket error: ${err.message}`);
});
});
server.listen(PORT, HOST, () => {
console.log(`Server listening on ${HOST}:${PORT}`);
});
// 将数据存储到数据库
function saveToDatabase(speed, number, no, ip) {
const query = `INSERT INTO RoanCondition(speed, number, No, StartTime, Ip) VALUES(${speed}, ${number}, ${no}, GETDATE(), '${ip}')`;
sql.open(conn_str, (err, conn) => {
if (err) {
console.error(`Error opening connection: ${err.message}`);
return;
}
conn.queryRaw(query, (err, results) => {
if (err) {
console.error(`Error executing query: ${err.message}`);
} else {
console.log(`Data saved successfully`);
}
});
conn.close();
});
}
解释
- Buffer管理: 使用
Buffer
来累积从Socket接收到的数据。确保当数据量达到12个字节时,才将其拆分出来进行处理。 - 数据拆分: 每次接收到数据时,检查是否有足够的数据来完成一条完整的记录(12个字节)。如果有,立即处理并移除已处理的数据。
- 数据库存储: 使用
saveToDatabase
函数将数据插入到数据库中。这里假设数据库中有一个名为RoanCondition
的表,并且使用了SQL Server的GETDATE()
函数获取当前时间。 - 错误处理: 在每个可能出错的地方都添加了错误处理逻辑,以确保程序的健壮性。
通过这种方式,你可以更有效地管理数据接收和存储过程,避免因为数据不完整导致的数据丢失或重复存储的问题。