Python中Excel异步上传并处理后导入数据库的设计思路及填坑
这是对前面一个帖子的补充及填坑。
之前一个帖子大概说了异步处理 Excel 导入数据库的问题,但留了一个坑。就是那个 celery 的任务注册偶现失败问题。
技术栈: python3.6.5 ,celery4.2.1,mysql 5.7.22 , sqlalchemy, xlrd,tornado,redis
设计思路再大概啰嗦一下,顺便解决问题:
-
准备材料一份 Excel 报表,利用 tornado.web 库中的 RequestHandler 类 request.files 属性获取到前端传来的文件内容(注:此时获取的数据类型 tornado.httputil.HTTPFile, 格式是 bytes)。
-
将前端传来的 bytes 数据直接放入 celery 进行异步处理,发现总是报错,原因是 delay()函数不接受非 JSON 序列化类型的参数(Object of type 'bytes' is not JSON serializable"),然后尝试了利用 encode(xx, 'utf-8')进行转码,又会提示 decoding to str: need a bytes-like object, HTTPFile found"( http 文件不是 bytes 类型对象),算了,退而求其次,先将文件上传然后保存至本地再说
-
利用 xlrd 库进行本地临时 Excel 文件读取,这样读取出来的格式为字符串格式,celery 处理毫无障碍,读取之后,可以进行数据重组,获取你想要的数据格式,可能是一个列表,字典,或着其他,同时进行去重,排序等操作。
-
数据导入,第一次 code,使用的是 sql 原生语句,包括创建临时表,建索引,去重,插入,这时 celery 有时候会不能正常注册任务,导致任务偶现执行失败。后来,一直找不到原因。先不管别的,保证功能正常,所以又把代码重新撸了遍,弃用原生 sql,改用 sqlalchemy orm 方式,然后偶现问题消失,功能正常支撑。然后回过头来分析日志,查看日志发现 redis 莫名崩溃,由于 redis 做了 celery 的队列,所以导致 Not registered 偶现问题,然后继续分析,发现内存过高,所以确定是内存暴涨引起的,还是自身代码问题。
-
出坑要点,在数据插入时,当数据量过大时要进行分批处理,且分次提交,以防内存用尽导致某组件崩溃。
-
源码就不贴了,有需要可以联系我
鄙人搞了个 Python 群,有兴趣的加一下,一起讨论问题,如果有好点子,可以在这里找到一起走的道友。 群号:902788038
Python中Excel异步上传并处理后导入数据库的设计思路及填坑
这个需求挺典型的,我最近刚做过类似的项目。核心思路是把上传、解析、处理这几个步骤解耦,用消息队列来做异步处理。
基本架构设计:
- 前端上传Excel到API,API只负责验证和存储文件,立即返回任务ID
- 后台用Celery或RQ启动异步任务处理
- 处理流程:读取Excel → 数据清洗 → 批量入库 → 更新任务状态
关键代码示例:
# tasks.py - Celery任务
from celery import Celery
import pandas as pd
from sqlalchemy import create_engine
from models import UploadTask, db
app = Celery('excel_processor', broker='redis://localhost:6379/0')
@app.task(bind=True)
def process_excel(self, file_path, task_id):
task = UploadTask.query.get(task_id)
task.status = 'processing'
db.session.commit()
try:
# 分块读取大文件
chunksize = 10000
engine = create_engine('postgresql://user:pass@localhost/db')
for chunk in pd.read_excel(file_path, chunksize=chunksize):
# 数据清洗
chunk = clean_data(chunk)
# 批量入库
chunk.to_sql('target_table', engine, if_exists='append', index=False)
task.status = 'completed'
except Exception as e:
task.status = 'failed'
task.error_message = str(e)
finally:
db.session.commit()
# 数据清洗函数示例
def clean_data(df):
# 处理空值
df = df.fillna({'column1': 'default'})
# 类型转换
df['date_column'] = pd.to_datetime(df['date_column'], errors='coerce')
# 去重
df = df.drop_duplicates(subset=['unique_column'])
return df
API接口:
# api.py
from flask import request, jsonify
import os
from tasks import process_excel
@app.route('/upload', methods=['POST'])
def upload_excel():
file = request.files['file']
task = UploadTask(filename=file.filename)
db.session.add(task)
db.session.commit()
# 保存文件
file_path = f'uploads/{task.id}_{file.filename}'
file.save(file_path)
# 异步处理
process_excel.delay(file_path, task.id)
return jsonify({'task_id': task.id, 'status': 'queued'})
几个实际遇到的坑:
-
内存问题:用
pd.read_excel()直接读大文件会爆内存。必须用chunksize参数分块读取,或者用openpyxl的只读模式。 -
数据库连接:每个chunk都新建连接很耗资源。要用连接池,或者像上面那样保持一个engine实例。
-
进度反馈:Celery默认不提供进度信息。可以通过更新数据库状态,或者用Celery的
update_state()方法来实现进度查询。 -
错误恢复:某一行数据格式错误不应该让整个任务失败。要在清洗阶段做好异常捕获,记录错误行继续处理。
-
文件锁:Windows下处理中的文件可能被锁定。处理完要及时删除或移动到其他目录。
-
数据类型推断:pandas自动推断类型有时会出错,特别是数字开头的字符串会被转成数字。要显式指定
dtype参数。
总结建议:分块处理加异步队列是核心。

