Python中如何实现服务器接收传感器比特流数据(如0X73,0X77,0X99)并存入数据库

如果我用 python 写 用什么框架比较高效 望各位 V 友提提意见


Python中如何实现服务器接收传感器比特流数据(如0X73,0X77,0X99)并存入数据库
5 回复

这里是性能瓶颈?不是的话就暂时不用考虑


import socket
import struct
import sqlite3
from datetime import datetime
import threading

class SensorDataServer:
    def __init__(self, host='0.0.0.0', port=8888):
        self.host = host
        self.port = port
        self.init_database()
        
    def init_database(self):
        """初始化SQLite数据库"""
        self.conn = sqlite3.connect('sensor_data.db', check_same_thread=False)
        self.cursor = self.conn.cursor()
        self.cursor.execute('''
            CREATE TABLE IF NOT EXISTS sensor_readings (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
                data_bytes BLOB,
                hex_string TEXT
            )
        ''')
        self.conn.commit()
    
    def save_to_db(self, data_bytes):
        """将接收到的字节数据存入数据库"""
        hex_str = '0x' + ''.join(f'{b:02X}' for b in data_bytes)
        self.cursor.execute(
            'INSERT INTO sensor_readings (data_bytes, hex_string) VALUES (?, ?)',
            (data_bytes, hex_str)
        )
        self.conn.commit()
        print(f"已保存数据: {hex_str}")
    
    def handle_client(self, client_socket, address):
        """处理单个客户端连接"""
        print(f"客户端 {address} 已连接")
        try:
            while True:
                # 接收4字节的包头(包含数据长度)
                header = client_socket.recv(4)
                if not header:
                    break
                    
                # 解析数据长度(大端字节序)
                data_length = struct.unpack('>I', header)[0]
                
                # 接收实际数据
                data_bytes = b''
                while len(data_bytes) < data_length:
                    chunk = client_socket.recv(data_length - len(data_bytes))
                    if not chunk:
                        break
                    data_bytes += chunk
                
                if len(data_bytes) == data_length:
                    self.save_to_db(data_bytes)
                else:
                    print(f"数据接收不完整: {address}")
                    
        except ConnectionResetError:
            print(f"客户端 {address} 断开连接")
        finally:
            client_socket.close()
    
    def start(self):
        """启动服务器"""
        server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        server.bind((self.host, self.port))
        server.listen(5)
        print(f"服务器启动,监听 {self.host}:{self.port}")
        
        try:
            while True:
                client_socket, address = server.accept()
                client_thread = threading.Thread(
                    target=self.handle_client,
                    args=(client_socket, address)
                )
                client_thread.daemon = True
                client_thread.start()
        except KeyboardInterrupt:
            print("\n服务器关闭")
        finally:
            server.close()
            self.conn.close()

# 客户端发送示例
def send_sensor_data(host='127.0.0.1', port=8888):
    """模拟传感器发送数据"""
    import time
    
    # 示例数据:0X73, 0X77, 0X99
    sensor_data = bytes([0x73, 0x77, 0x99])
    
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.connect((host, port))
    
    # 发送数据长度(4字节大端)+ 实际数据
    data_length = len(sensor_data)
    header = struct.pack('>I', data_length)
    client.sendall(header + sensor_data)
    
    print(f"发送数据: {sensor_data.hex()}")
    client.close()

if __name__ == '__main__':
    # 启动服务器
    server = SensorDataServer()
    
    # 在后台启动服务器线程
    import threading
    server_thread = threading.Thread(target=server.start)
    server_thread.daemon = True
    server_thread.start()
    
    # 等待服务器启动
    import time
    time.sleep(1)
    
    # 测试发送数据
    send_sensor_data()
    
    # 查询数据库验证
    time.sleep(1)
    conn = sqlite3.connect('sensor_data.db')
    cursor = conn.cursor()
    cursor.execute('SELECT * FROM sensor_readings')
    rows = cursor.fetchall()
    for row in rows:
        print(f"ID: {row[0]}, 时间: {row[1]}, 数据: {row[3]}")
    conn.close()

这个方案用TCP协议传输,加了4字节的包头来标识数据长度,这样能处理变长数据。数据库用SQLite,存了原始字节和十六进制字符串两种格式。多线程处理能同时接多个传感器。数据验证和错误处理也基本够用。

简单说就是TCP传数据,SQLite存,多线程处理。

不是框架问题 性能在于少线程切换 保持吞吐量

UDP 最合适。
写数据库可以合并写。

曾经写过 tcp 的每秒 700 插入数据库,性能瓶颈在数据库,网络完全不是问题

回到顶部