Python中如何实现服务器接收传感器比特流数据(如0X73,0X77,0X99)并存入数据库
如果我用 python 写 用什么框架比较高效 望各位 V 友提提意见
Python中如何实现服务器接收传感器比特流数据(如0X73,0X77,0X99)并存入数据库
5 回复
这里是性能瓶颈?不是的话就暂时不用考虑
import socket
import struct
import sqlite3
from datetime import datetime
import threading
class SensorDataServer:
def __init__(self, host='0.0.0.0', port=8888):
self.host = host
self.port = port
self.init_database()
def init_database(self):
"""初始化SQLite数据库"""
self.conn = sqlite3.connect('sensor_data.db', check_same_thread=False)
self.cursor = self.conn.cursor()
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS sensor_readings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
data_bytes BLOB,
hex_string TEXT
)
''')
self.conn.commit()
def save_to_db(self, data_bytes):
"""将接收到的字节数据存入数据库"""
hex_str = '0x' + ''.join(f'{b:02X}' for b in data_bytes)
self.cursor.execute(
'INSERT INTO sensor_readings (data_bytes, hex_string) VALUES (?, ?)',
(data_bytes, hex_str)
)
self.conn.commit()
print(f"已保存数据: {hex_str}")
def handle_client(self, client_socket, address):
"""处理单个客户端连接"""
print(f"客户端 {address} 已连接")
try:
while True:
# 接收4字节的包头(包含数据长度)
header = client_socket.recv(4)
if not header:
break
# 解析数据长度(大端字节序)
data_length = struct.unpack('>I', header)[0]
# 接收实际数据
data_bytes = b''
while len(data_bytes) < data_length:
chunk = client_socket.recv(data_length - len(data_bytes))
if not chunk:
break
data_bytes += chunk
if len(data_bytes) == data_length:
self.save_to_db(data_bytes)
else:
print(f"数据接收不完整: {address}")
except ConnectionResetError:
print(f"客户端 {address} 断开连接")
finally:
client_socket.close()
def start(self):
"""启动服务器"""
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((self.host, self.port))
server.listen(5)
print(f"服务器启动,监听 {self.host}:{self.port}")
try:
while True:
client_socket, address = server.accept()
client_thread = threading.Thread(
target=self.handle_client,
args=(client_socket, address)
)
client_thread.daemon = True
client_thread.start()
except KeyboardInterrupt:
print("\n服务器关闭")
finally:
server.close()
self.conn.close()
# 客户端发送示例
def send_sensor_data(host='127.0.0.1', port=8888):
"""模拟传感器发送数据"""
import time
# 示例数据:0X73, 0X77, 0X99
sensor_data = bytes([0x73, 0x77, 0x99])
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((host, port))
# 发送数据长度(4字节大端)+ 实际数据
data_length = len(sensor_data)
header = struct.pack('>I', data_length)
client.sendall(header + sensor_data)
print(f"发送数据: {sensor_data.hex()}")
client.close()
if __name__ == '__main__':
# 启动服务器
server = SensorDataServer()
# 在后台启动服务器线程
import threading
server_thread = threading.Thread(target=server.start)
server_thread.daemon = True
server_thread.start()
# 等待服务器启动
import time
time.sleep(1)
# 测试发送数据
send_sensor_data()
# 查询数据库验证
time.sleep(1)
conn = sqlite3.connect('sensor_data.db')
cursor = conn.cursor()
cursor.execute('SELECT * FROM sensor_readings')
rows = cursor.fetchall()
for row in rows:
print(f"ID: {row[0]}, 时间: {row[1]}, 数据: {row[3]}")
conn.close()
这个方案用TCP协议传输,加了4字节的包头来标识数据长度,这样能处理变长数据。数据库用SQLite,存了原始字节和十六进制字符串两种格式。多线程处理能同时接多个传感器。数据验证和错误处理也基本够用。
简单说就是TCP传数据,SQLite存,多线程处理。
不是框架问题 性能在于少线程切换 保持吞吐量
UDP 最合适。
写数据库可以合并写。
曾经写过 tcp 的每秒 700 插入数据库,性能瓶颈在数据库,网络完全不是问题

