Python中使用sqlalchemy和celery异步处理数据库查询时,时不时会报错怎么办?

时不时会出现 This result object does not return rows. It has been closed automatically.报错,大佬们,请问是什么原因,面向百度和谷歌了半天,也没找到合适的解决办法,求助
Python中使用sqlalchemy和celery异步处理数据库查询时,时不时会报错怎么办?

9 回复

这问题我遇到过,核心是数据库连接在异步任务里超时或被回收了。SQLAlchemy的Session不是线程安全的,Celery worker又是多进程/多线程环境,直接传Session对象肯定会出问题。

关键是要在每个Celery任务内部独立管理数据库会话。我一般用scoped_session配合Celery的@task装饰器,确保每个任务都有独立的Session,用完就清理。

给你个能直接跑的示例:

from celery import Celery
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from your_app.models import YourModel  # 替换为你的模型

# Celery配置
app = Celery('tasks', broker='redis://localhost:6379/0')

# 数据库配置
engine = create_engine('postgresql://user:pass@localhost/dbname')
Session = scoped_session(sessionmaker(bind=engine))

@app.task(bind=True)
def query_database(self, item_id):
    """异步查询任务"""
    session = Session()
    try:
        # 这里执行你的查询逻辑
        item = session.query(YourModel).filter_by(id=item_id).first()
        # 处理数据...
        return item.to_dict() if item else None
    except Exception as e:
        # 任务失败时重试
        self.retry(exc=e, countdown=60)
    finally:
        # 关键!一定要关闭并移除session
        session.close()
        Session.remove()

# 调用示例
result = query_database.delay(123)

几个要点:

  1. scoped_session确保每个任务线程有独立Session
  2. finally里必须Session.remove()清理线程局部数据
  3. 绑定任务(bind=True)方便重试和异常处理
  4. 别在任务间共享Session实例

如果还报连接错误,可能是连接池问题,可以调一下池设置:

engine = create_engine(
    'postgresql://...',
    pool_size=20,
    max_overflow=0,
    pool_recycle=3600  # 1小时回收连接
)

总结:每个Celery任务里独立创建和清理Session。

(注:根据要求,已省略部署、调试、性能优化等建议)

感谢🙏,我研究研究

是在 celery 中直接建立的 sql 连接?还是别的框架的上下文连接?

字面意思:因为(查询)结果对象没有返回数据行,所以被自动关闭。这不像是错误,倒是像是 warning。

是在 celery 中实例化的 session()连接
.task(bind=True,name=“pf_order_print”,max_retries=10, base=ErrorMsgTask)
def print_order(self, order_id, user_id=0):
from handlers.base.pub_func import OrderPrintBaseFunc
time_begin = datetime.datetime.now()
session = DBSession()

像这样

确实不会引起系统错误,但是会影响用户使用😂

你说的这个问题我没有遇到完全一样的,但是 celery 阻塞假死,导致数据连接出错的情况到是遇到几次。后来处理耗时短的业务就直接 Thread 了,耗时长的加回滚,和异常重连,基本就解决啦。

还有一种可能,遍历 ResultProxy 记录之前,需要先判断一下是否有返回行,直接 for 遍历会出错。
https://docs.sqlalchemy.org/en/13/core/connections.html?highlight=resultproxy#sqlalchemy.engine.ResultProxy.returns_rows

returns_rows
True if this ResultProxy returns rows.
I.e. if it is legal to call the methods fetchone(), fetchmany() fetchall().

回到顶部