Python中Excel异步上传并处理后导入数据库的设计思路及填坑

这是对前面一个帖子的补充及填坑。

之前一个帖子大概说了异步处理 Excel 导入数据库的问题,但留了一个坑。就是那个 celery 的任务注册偶现失败问题。

技术栈: python3.6.5 ,celery4.2.1,mysql 5.7.22 , sqlalchemy, xlrd,tornado,redis

设计思路再大概啰嗦一下,顺便解决问题:

  1. 准备材料一份 Excel 报表,利用 tornado.web 库中的 RequestHandler 类 request.files 属性获取到前端传来的文件内容(注:此时获取的数据类型 tornado.httputil.HTTPFile, 格式是 bytes)。

  2. 将前端传来的 bytes 数据直接放入 celery 进行异步处理,发现总是报错,原因是 delay()函数不接受非 JSON 序列化类型的参数(Object of type 'bytes' is not JSON serializable"),然后尝试了利用 encode(xx, 'utf-8')进行转码,又会提示 decoding to str: need a bytes-like object, HTTPFile found"( http 文件不是 bytes 类型对象),算了,退而求其次,先将文件上传然后保存至本地再说

  3. 利用 xlrd 库进行本地临时 Excel 文件读取,这样读取出来的格式为字符串格式,celery 处理毫无障碍,读取之后,可以进行数据重组,获取你想要的数据格式,可能是一个列表,字典,或着其他,同时进行去重,排序等操作。

  4. 数据导入,第一次 code,使用的是 sql 原生语句,包括创建临时表,建索引,去重,插入,这时 celery 有时候会不能正常注册任务,导致任务偶现执行失败。后来,一直找不到原因。先不管别的,保证功能正常,所以又把代码重新撸了遍,弃用原生 sql,改用 sqlalchemy orm 方式,然后偶现问题消失,功能正常支撑。然后回过头来分析日志,查看日志发现 redis 莫名崩溃,由于 redis 做了 celery 的队列,所以导致 Not registered 偶现问题,然后继续分析,发现内存过高,所以确定是内存暴涨引起的,还是自身代码问题。

  5. 出坑要点,在数据插入时,当数据量过大时要进行分批处理,且分次提交,以防内存用尽导致某组件崩溃。

  6. 源码就不贴了,有需要可以联系我

鄙人搞了个 Python 群,有兴趣的加一下,一起讨论问题,如果有好点子,可以在这里找到一起走的道友。 群号:902788038


Python中Excel异步上传并处理后导入数据库的设计思路及填坑

1 回复

这个需求挺典型的,我最近刚做过类似的项目。核心思路是把上传、解析、处理这几个步骤解耦,用消息队列来做异步处理。

基本架构设计:

  1. 前端上传Excel到API,API只负责验证和存储文件,立即返回任务ID
  2. 后台用Celery或RQ启动异步任务处理
  3. 处理流程:读取Excel → 数据清洗 → 批量入库 → 更新任务状态

关键代码示例:

# tasks.py - Celery任务
from celery import Celery
import pandas as pd
from sqlalchemy import create_engine
from models import UploadTask, db

app = Celery('excel_processor', broker='redis://localhost:6379/0')

@app.task(bind=True)
def process_excel(self, file_path, task_id):
    task = UploadTask.query.get(task_id)
    task.status = 'processing'
    db.session.commit()
    
    try:
        # 分块读取大文件
        chunksize = 10000
        engine = create_engine('postgresql://user:pass@localhost/db')
        
        for chunk in pd.read_excel(file_path, chunksize=chunksize):
            # 数据清洗
            chunk = clean_data(chunk)
            
            # 批量入库
            chunk.to_sql('target_table', engine, if_exists='append', index=False)
            
        task.status = 'completed'
        
    except Exception as e:
        task.status = 'failed'
        task.error_message = str(e)
    
    finally:
        db.session.commit()

# 数据清洗函数示例
def clean_data(df):
    # 处理空值
    df = df.fillna({'column1': 'default'})
    
    # 类型转换
    df['date_column'] = pd.to_datetime(df['date_column'], errors='coerce')
    
    # 去重
    df = df.drop_duplicates(subset=['unique_column'])
    
    return df

API接口:

# api.py
from flask import request, jsonify
import os
from tasks import process_excel

@app.route('/upload', methods=['POST'])
def upload_excel():
    file = request.files['file']
    task = UploadTask(filename=file.filename)
    db.session.add(task)
    db.session.commit()
    
    # 保存文件
    file_path = f'uploads/{task.id}_{file.filename}'
    file.save(file_path)
    
    # 异步处理
    process_excel.delay(file_path, task.id)
    
    return jsonify({'task_id': task.id, 'status': 'queued'})

几个实际遇到的坑:

  1. 内存问题:用pd.read_excel()直接读大文件会爆内存。必须用chunksize参数分块读取,或者用openpyxl的只读模式。

  2. 数据库连接:每个chunk都新建连接很耗资源。要用连接池,或者像上面那样保持一个engine实例。

  3. 进度反馈:Celery默认不提供进度信息。可以通过更新数据库状态,或者用Celery的update_state()方法来实现进度查询。

  4. 错误恢复:某一行数据格式错误不应该让整个任务失败。要在清洗阶段做好异常捕获,记录错误行继续处理。

  5. 文件锁:Windows下处理中的文件可能被锁定。处理完要及时删除或移动到其他目录。

  6. 数据类型推断:pandas自动推断类型有时会出错,特别是数字开头的字符串会被转成数字。要显式指定dtype参数。

总结建议:分块处理加异步队列是核心。

回到顶部