Python中如何使用socketserver.server_forever()结合多线程处理数据并存入数据库
我现在在用 socketserver.server_forever()处理请求,在 handle ()里面接受数据存入 queue,现在想用另外个线程取出 queue 数据并存入数据库,想问这个新线程在哪里开启呢?发现 server_forever 一直运行,新线程启动不了 if name == "main": HOST, PORT = "0.0.0.0", 9999 # Linux server = socketserver.ThreadingTCPServer((HOST, PORT), MyTCPHandler) # 线程 server.serve_forever() connect_sql() init_db() for i in range(multiprocessing.cpu_count()): t = threading.Thread(target=getsensor_que) t.start()
Python中如何使用socketserver.server_forever()结合多线程处理数据并存入数据库
import socketserver
import threading
import sqlite3
from datetime import datetime
# 数据库操作类
class DatabaseHandler:
def __init__(self, db_path='data.db'):
self.db_path = db_path
self._init_db()
def _init_db(self):
"""初始化数据库表"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS socket_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
data TEXT NOT NULL,
client_address TEXT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
def save_data(self, data, client_address):
"""保存数据到数据库"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute(
"INSERT INTO socket_data (data, client_address) VALUES (?, ?)",
(data, str(client_address))
)
conn.commit()
conn.close()
# 自定义请求处理类
class ThreadedTCPHandler(socketserver.BaseRequestHandler):
def handle(self):
"""处理客户端连接"""
try:
# 接收数据
data = self.request.recv(1024).decode('utf-8').strip()
if data:
print(f"收到来自 {self.client_address} 的数据: {data}")
# 创建数据库处理器实例
db_handler = DatabaseHandler()
# 在新线程中保存数据(避免阻塞主线程)
save_thread = threading.Thread(
target=db_handler.save_data,
args=(data, self.client_address)
)
save_thread.start()
save_thread.join() # 等待保存完成
# 发送响应
response = f"数据已接收并保存: {data}"
self.request.sendall(response.encode('utf-8'))
except Exception as e:
print(f"处理错误: {e}")
# 线程池TCP服务器
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
daemon_threads = True # 设置线程为守护线程
allow_reuse_address = True # 允许地址重用
def main():
HOST = 'localhost'
PORT = 9999
# 创建服务器实例
server = ThreadedTCPServer((HOST, PORT), ThreadedTCPHandler)
print(f"服务器启动在 {HOST}:{PORT}")
print("按 Ctrl+C 停止服务器")
try:
# 启动服务器,持续运行
server.serve_forever()
except KeyboardInterrupt:
print("\n服务器正在关闭...")
server.shutdown()
server.server_close()
if __name__ == "__main__":
main()
这个实现的核心是:
- ThreadedTCPServer 继承
ThreadingMixIn和TCPServer,自动为每个客户端连接创建新线程 - ThreadedTCPHandler 处理具体的数据接收逻辑
- DatabaseHandler 封装数据库操作,在独立线程中执行保存操作
serve_forever()保持服务器持续运行,直到收到中断信号
使用示例:
# 启动服务器
python server.py
# 客户端测试(另开终端)
echo "测试数据" | nc localhost 9999
数据库查询:
import sqlite3
conn = sqlite3.connect('data.db')
cursor = conn.cursor()
cursor.execute("SELECT * FROM socket_data")
print(cursor.fetchall())
总结:用ThreadingMixIn实现多线程,数据库操作放独立线程避免阻塞。
if name == “main”:
sensor_que = queue.Queue()
sensor_data = {‘DeviceId’: ‘’, ‘AirPressure’: 0, ‘Humidity’: 0, ‘Noise’: 0, ‘Pm25’: 0, ‘Temperature’: 0,
‘WindDirection’: 0, ‘WindSpeed’: 0}
# HOST, PORT = “localhost”, 9999 #windows
HOST, PORT = “0.0.0.0”, 9999 # Linux
server = socketserver.ThreadingTCPServer((HOST, PORT), MyTCPHandler) # 线程
server.serve_forever()
print(‘before init_db!’)
connect_sql()
init_db()
print(‘after init_db!’)
for i in range(multiprocessing.cpu_count()):
t = threading.Thread(target=getsensor_que)
t.start()

