Python中SQLAlchemy复杂查询写法求助
大佬们 求助! 有没有 orm 写的很 6 的大佬!!!
救救萌新!
现有一条原生 sql。
select a.date,sum(a.tag_count) as tag_count,a.tag_id,a.tag_name,b.user_id,b.user_name from (
(
select z.netflow_alias,count(1) as tag_count,z.date,z.product_id,x.tag_id,x.tag_name from
(
SELECT netflow_alias,left(create_time,10) as date,product_id FROM weidian_operator.t_tj_order_detail_qingshe
where left(create_time,10)=DATE_SUB(curdate(),INTERVAL 1 DAY)
) z
left join
(
select q.tag_id,q.product_id,w.tag_name from
(SELECT tag_id,product_id FROM weidian_operator.t_product_tag ) q
left join
(select id,tag_name from weidian_operator.t_tag) w
on q.tag_id=w.id
) x
on z.product_id=x.product_id
group by x.tag_id
) a
left join
(select netflow_alias,user_name,user_id from weidian_operator.t_tj_team_relation where user_id is not null) b
on a.netflow_alias=b.netflow_alias
)
where user_id is not null
group by a.date,b.user_id;
那么请问 orm 怎么写。 哪位大佬肯帮帮我。呜呜呜
Python中SQLAlchemy复杂查询写法求助
orm 不适合或者写不了复杂的 SQL,你还是老老实实的直接用 SQL 查吧。
在SQLAlchemy里写复杂查询,核心就两招:要么用session.query()配合各种filter和join,要么直接用Core的select()。
1. 经典Query API写法(适合ORM场景)
from sqlalchemy.orm import Session, joinedload
from your_models import User, Order, Product
# 多表关联+条件过滤
query = session.query(User, Order, Product)\
.join(Order, User.id == Order.user_id)\
.join(Product, Order.product_id == Product.id)\
.filter(
User.active == True,
Order.status == 'paid',
Product.price > 100
)\
.order_by(Order.created_at.desc())
# 分组统计
from sqlalchemy import func
stats = session.query(
User.country,
func.count(Order.id).label('order_count'),
func.sum(Order.amount).label('total_amount')
).join(Order)\
.group_by(User.country)\
.having(func.count(Order.id) > 5)
2. 现代Core写法(推荐,更灵活)
from sqlalchemy import select, and_, or_, case, cast, Integer
from sqlalchemy.orm import aliased
# 复杂条件+子查询
subq = select(Order.user_id).where(Order.amount > 1000).subquery()
stmt = select(User).where(User.id.in_(subq))
# CASE WHEN + 类型转换
stmt = select(
User.name,
case(
(User.score >= 90, 'A'),
(User.score >= 80, 'B'),
else_='C'
).label('grade'),
cast(User.extra_data['views'], Integer).label('view_count')
).where(
or_(
User.email.like('%@gmail.com'),
and_(
User.age.between(18, 35),
User.city == 'Beijing'
)
)
)
# 执行查询
result = session.execute(stmt)
for row in result:
print(row.name, row.grade, row.view_count)
3. 动态查询构建
def build_query(filters):
stmt = select(User)
conditions = []
if filters.get('name'):
conditions.append(User.name.ilike(f"%{filters['name']}%"))
if filters.get('min_age'):
conditions.append(User.age >= filters['min_age'])
if filters.get('tags'):
conditions.append(User.tags.contains(filters['tags']))
if conditions:
stmt = stmt.where(and_(*conditions))
return stmt
关键点:
- 多表用
join()时明确指定关联条件 - 复杂逻辑用
and_()、or_()组合 - 聚合查询记得
group_by()和having() - 子查询可以
.subquery()或scalar_subquery() - 用
aliased()处理自关联或重复表
建议多用Core的select()写法,现在这是官方推荐的方式,比老的Query API更清晰。
emmm 好的吧。谢谢大佬!!
我记得 SQLAlchemy 可以把 SQL 结果提取出来变成 object。
这么多子查询 抽出来不就简单了吗
- 把子查询分开查,然后在程序里面做处理;
2. 加索引,不要使用 Null ;
3. 看能否优化你的数据模型,避免这种复杂的 join group 查询;
4. 如果数据量小的话,估计还将就;对于大量数据来说,还是建议使用 Flink Spark 这种工具先聚合处理一次,避免重复在代码里面进行大量的 group join 以及函数运算操作,否则性能会很差。
先去看看业务逻辑吧 照着这个 sql 硬改事倍功半
这个排版真是头疼,一点看的欲望都没有
这种复杂查询,量小还好,量大没专门优化过,直接要死
还拿 ORM 来做…

