Python中如何使用socketserver.server_forever()结合多线程处理数据并存入数据库

我现在在用 socketserver.server_forever()处理请求,在 handle ()里面接受数据存入 queue,现在想用另外个线程取出 queue 数据并存入数据库,想问这个新线程在哪里开启呢?发现 server_forever 一直运行,新线程启动不了 if name == "main": HOST, PORT = "0.0.0.0", 9999 # Linux server = socketserver.ThreadingTCPServer((HOST, PORT), MyTCPHandler) # 线程 server.serve_forever() connect_sql() init_db() for i in range(multiprocessing.cpu_count()): t = threading.Thread(target=getsensor_que) t.start()


Python中如何使用socketserver.server_forever()结合多线程处理数据并存入数据库

2 回复
import socketserver
import threading
import sqlite3
from datetime import datetime

# 数据库操作类
class DatabaseHandler:
    def __init__(self, db_path='data.db'):
        self.db_path = db_path
        self._init_db()
    
    def _init_db(self):
        """初始化数据库表"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS socket_data (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                data TEXT NOT NULL,
                client_address TEXT,
                timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        conn.commit()
        conn.close()
    
    def save_data(self, data, client_address):
        """保存数据到数据库"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute(
            "INSERT INTO socket_data (data, client_address) VALUES (?, ?)",
            (data, str(client_address))
        )
        conn.commit()
        conn.close()

# 自定义请求处理类
class ThreadedTCPHandler(socketserver.BaseRequestHandler):
    def handle(self):
        """处理客户端连接"""
        try:
            # 接收数据
            data = self.request.recv(1024).decode('utf-8').strip()
            if data:
                print(f"收到来自 {self.client_address} 的数据: {data}")
                
                # 创建数据库处理器实例
                db_handler = DatabaseHandler()
                
                # 在新线程中保存数据(避免阻塞主线程)
                save_thread = threading.Thread(
                    target=db_handler.save_data,
                    args=(data, self.client_address)
                )
                save_thread.start()
                save_thread.join()  # 等待保存完成
                
                # 发送响应
                response = f"数据已接收并保存: {data}"
                self.request.sendall(response.encode('utf-8'))
        except Exception as e:
            print(f"处理错误: {e}")

# 线程池TCP服务器
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
    daemon_threads = True  # 设置线程为守护线程
    allow_reuse_address = True  # 允许地址重用

def main():
    HOST = 'localhost'
    PORT = 9999
    
    # 创建服务器实例
    server = ThreadedTCPServer((HOST, PORT), ThreadedTCPHandler)
    
    print(f"服务器启动在 {HOST}:{PORT}")
    print("按 Ctrl+C 停止服务器")
    
    try:
        # 启动服务器,持续运行
        server.serve_forever()
    except KeyboardInterrupt:
        print("\n服务器正在关闭...")
        server.shutdown()
        server.server_close()

if __name__ == "__main__":
    main()

这个实现的核心是:

  1. ThreadedTCPServer 继承 ThreadingMixInTCPServer,自动为每个客户端连接创建新线程
  2. ThreadedTCPHandler 处理具体的数据接收逻辑
  3. DatabaseHandler 封装数据库操作,在独立线程中执行保存操作
  4. serve_forever() 保持服务器持续运行,直到收到中断信号

使用示例:

# 启动服务器
python server.py

# 客户端测试(另开终端)
echo "测试数据" | nc localhost 9999

数据库查询:

import sqlite3
conn = sqlite3.connect('data.db')
cursor = conn.cursor()
cursor.execute("SELECT * FROM socket_data")
print(cursor.fetchall())

总结:用ThreadingMixIn实现多线程,数据库操作放独立线程避免阻塞。


if name == “main”:
sensor_que = queue.Queue()
sensor_data = {‘DeviceId’: ‘’, ‘AirPressure’: 0, ‘Humidity’: 0, ‘Noise’: 0, ‘Pm25’: 0, ‘Temperature’: 0,
‘WindDirection’: 0, ‘WindSpeed’: 0}
# HOST, PORT = “localhost”, 9999 #windows
HOST, PORT = “0.0.0.0”, 9999 # Linux
server = socketserver.ThreadingTCPServer((HOST, PORT), MyTCPHandler) # 线程
server.serve_forever()
print(‘before init_db!’)
connect_sql()
init_db()
print(‘after init_db!’)
for i in range(multiprocessing.cpu_count()):
t = threading.Thread(target=getsensor_que)
t.start()

回到顶部