Python中SQLAlchemy复杂查询写法求助

大佬们 求助! 有没有 orm 写的很 6 的大佬!!!

救救萌新!
现有一条原生 sql。
select a.date,sum(a.tag_count) as tag_count,a.tag_id,a.tag_name,b.user_id,b.user_name from (
(

select z.netflow_alias,count(1) as tag_count,z.date,z.product_id,x.tag_id,x.tag_name from
(
SELECT netflow_alias,left(create_time,10) as date,product_id FROM weidian_operator.t_tj_order_detail_qingshe
where left(create_time,10)=DATE_SUB(curdate(),INTERVAL 1 DAY)
) z
left join
(

select q.tag_id,q.product_id,w.tag_name from
(SELECT tag_id,product_id FROM weidian_operator.t_product_tag ) q

left join

(select id,tag_name from weidian_operator.t_tag) w
on q.tag_id=w.id

) x
on z.product_id=x.product_id
group by x.tag_id

) a

left join

(select netflow_alias,user_name,user_id from weidian_operator.t_tj_team_relation where user_id is not null) b

on a.netflow_alias=b.netflow_alias
)
where user_id is not null
group by a.date,b.user_id;

那么请问 orm 怎么写。 哪位大佬肯帮帮我。呜呜呜
Python中SQLAlchemy复杂查询写法求助


9 回复

orm 不适合或者写不了复杂的 SQL,你还是老老实实的直接用 SQL 查吧。


在SQLAlchemy里写复杂查询,核心就两招:要么用session.query()配合各种filterjoin,要么直接用Core的select()

1. 经典Query API写法(适合ORM场景)

from sqlalchemy.orm import Session, joinedload
from your_models import User, Order, Product

# 多表关联+条件过滤
query = session.query(User, Order, Product)\
    .join(Order, User.id == Order.user_id)\
    .join(Product, Order.product_id == Product.id)\
    .filter(
        User.active == True,
        Order.status == 'paid',
        Product.price > 100
    )\
    .order_by(Order.created_at.desc())

# 分组统计
from sqlalchemy import func
stats = session.query(
    User.country,
    func.count(Order.id).label('order_count'),
    func.sum(Order.amount).label('total_amount')
).join(Order)\
 .group_by(User.country)\
 .having(func.count(Order.id) > 5)

2. 现代Core写法(推荐,更灵活)

from sqlalchemy import select, and_, or_, case, cast, Integer
from sqlalchemy.orm import aliased

# 复杂条件+子查询
subq = select(Order.user_id).where(Order.amount > 1000).subquery()
stmt = select(User).where(User.id.in_(subq))

# CASE WHEN + 类型转换
stmt = select(
    User.name,
    case(
        (User.score >= 90, 'A'),
        (User.score >= 80, 'B'),
        else_='C'
    ).label('grade'),
    cast(User.extra_data['views'], Integer).label('view_count')
).where(
    or_(
        User.email.like('%@gmail.com'),
        and_(
            User.age.between(18, 35),
            User.city == 'Beijing'
        )
    )
)

# 执行查询
result = session.execute(stmt)
for row in result:
    print(row.name, row.grade, row.view_count)

3. 动态查询构建

def build_query(filters):
    stmt = select(User)
    conditions = []
    
    if filters.get('name'):
        conditions.append(User.name.ilike(f"%{filters['name']}%"))
    if filters.get('min_age'):
        conditions.append(User.age >= filters['min_age'])
    if filters.get('tags'):
        conditions.append(User.tags.contains(filters['tags']))
    
    if conditions:
        stmt = stmt.where(and_(*conditions))
    
    return stmt

关键点:

  • 多表用join()时明确指定关联条件
  • 复杂逻辑用and_()or_()组合
  • 聚合查询记得group_by()having()
  • 子查询可以.subquery()scalar_subquery()
  • aliased()处理自关联或重复表

建议多用Core的select()写法,现在这是官方推荐的方式,比老的Query API更清晰。

emmm 好的吧。谢谢大佬!!

我记得 SQLAlchemy 可以把 SQL 结果提取出来变成 object。

这么多子查询 抽出来不就简单了吗

  1. 把子查询分开查,然后在程序里面做处理;
    2. 加索引,不要使用 Null ;
    3. 看能否优化你的数据模型,避免这种复杂的 join group 查询;
    4. 如果数据量小的话,估计还将就;对于大量数据来说,还是建议使用 Flink Spark 这种工具先聚合处理一次,避免重复在代码里面进行大量的 group join 以及函数运算操作,否则性能会很差。

先去看看业务逻辑吧 照着这个 sql 硬改事倍功半

这个排版真是头疼,一点看的欲望都没有

这种复杂查询,量小还好,量大没专门优化过,直接要死
还拿 ORM 来做…

回到顶部