如何使用Python的SQLAlchemy返回不同name的最新一条数据

models.py 里这么定义的

class Var(db.Model):
    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    name = db.Column(db.String(20))
    time = db.Column(db.Integer)
    ......

现在假设有这么几个数据

| id | name | time |
| 1 | a | 1500016227 |
| 2 | a | 1500016237 |
| 3 | b | 1500016227 |
| 4 | b | 1500016237 |
| 5 | c | 1500016216 |
| 6 | c | 1500016226 |

原来单独获得某一个 name 的最新几条,返回不同 name 就多次查询。

model = Var.query.filter_by(name).order_by(Var.time.desc()).limit(1).all()

现在需要同时获得几个 name 的最新几条,比如上面的数据中,同时获得 b 和 c 的最新一条数据,应该让其返回 id 为 4 和 6 两条。

试了 distinct 和 group_by,前面只能写成 db.session.query(Var.name).group_by(Var.name),这样还得再循环单独查询后添加到列表里去。请问有没有什么好的方法?


如何使用Python的SQLAlchemy返回不同name的最新一条数据

9 回复

这样
model = Var.query.filter_by(Var.name.in_(list_of_name)).order_by(Var.time.desc()).all()

参考:
https://stackoverflow.com/questions/29326297/sqlalchemy-filter-by-field-in-list-but-keep-original-order


from sqlalchemy import create_engine, Column, Integer, String, DateTime, func
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime

# 创建模型
Base = declarative_base()

class Record(Base):
    __tablename__ = 'records'
    
    id = Column(Integer, primary_key=True)
    name = Column(String(50))
    value = Column(Integer)
    created_at = Column(DateTime, default=datetime.now)

# 创建数据库连接
engine = create_engine('sqlite:///example.db')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)

# 方法1:使用子查询(推荐,效率高)
def get_latest_records_subquery():
    session = Session()
    
    # 先找到每个name的最大created_at
    subquery = session.query(
        Record.name,
        func.max(Record.created_at).label('max_time')
    ).group_by(Record.name).subquery()
    
    # 然后关联回原表获取完整记录
    results = session.query(Record).join(
        subquery,
        (Record.name == subquery.c.name) & 
        (Record.created_at == subquery.c.max_time)
    ).all()
    
    session.close()
    return results

# 方法2:使用窗口函数(SQLAlchemy 1.4+)
def get_latest_records_window():
    from sqlalchemy import over
    from sqlalchemy.sql import func as sql_func
    
    session = Session()
    
    # 使用ROW_NUMBER窗口函数
    row_number = over(
        sql_func.row_number(),
        partition_by=Record.name,
        order_by=Record.created_at.desc()
    ).label('row_number')
    
    subquery = session.query(
        Record,
        row_number
    ).subquery()
    
    results = session.query(subquery).filter(
        subquery.c.row_number == 1
    ).all()
    
    session.close()
    return results

# 方法3:使用DISTINCT ON(PostgreSQL特有)
def get_latest_records_distinct_on():
    # 仅适用于PostgreSQL
    from sqlalchemy.dialects.postgresql import distinct_on
    
    session = Session()
    
    results = session.query(
        distinct_on(Record.name, Record)
    ).order_by(
        Record.name,
        Record.created_at.desc()
    ).all()
    
    session.close()
    return results

# 测试代码
if __name__ == '__main__':
    # 创建测试数据
    session = Session()
    test_data = [
        Record(name='A', value=1, created_at=datetime(2024, 1, 1)),
        Record(name='A', value=2, created_at=datetime(2024, 1, 2)),
        Record(name='B', value=3, created_at=datetime(2024, 1, 1)),
        Record(name='B', value=4, created_at=datetime(2024, 1, 3)),
        Record(name='C', value=5, created_at=datetime(2024, 1, 2)),
    ]
    session.add_all(test_data)
    session.commit()
    
    # 使用方法1
    latest = get_latest_records_subquery()
    for record in latest:
        print(f"Name: {record.name}, Value: {record.value}, Time: {record.created_at}")
    
    session.close()

核心思路:

  1. 子查询方法:先分组找出每个name的最新时间,再关联回原表获取完整数据
  2. 窗口函数方法:用ROW_NUMBER给每个name的数据按时间倒序编号,取编号1的
  3. DISTINCT ON:PostgreSQL特有语法,最简单但数据库受限

建议: 用子查询方法最通用,所有数据库都支持。

这样的话,list_of_name 里包括 b,c 时,会得到 id 为 3,4,5,6 共 4 条结果,而不是两条。如果在后面加 limit(2)的话,会得到 3,4 两条结果,不能得到需要的 4,6 两条。


前面有其他可选的筛选条件,不一定直接提供查询哪些 name,所以我需要从筛选结果中来获得到底有几个 name。
set 那一步遍历了整个之前的结果,如果直接筛选条件少,返回结果多,这一步时间就会特长。
name_list = set((m.name for m in query))
models = [model
for name in name_list
for model in query.filter(Var.name == name).limit(limit).all()
]

a=select max(id), name from var group by name;
ids = [x.id for x in a]
select id, name, time from var where id in $ids

如果 DBMS 支持 window function 和 over 子句,可以用 zrq495 回复的方法。( Postgres, Oracle, SQL Server )
如果 DBMS 支持 cross apply,也可以。
MySQL 的话:
1. 循环处理,取每个 name 的最新几条。(适用循环项不多的情况)
2. 取出所有数据,在 Python 中取最新几条。(适用总数据量不多的情况)
3. group by 出每个 name 的最新时间,再 join 取最新一条数据。(只能取最新一条,而且若时间有重复还需要去重)
没想到更好的方法了。

你说得这个场景类似是微信朋友圈这种场景吧,查找照片的同时把点赞和评论查找出来。我以前也只想到 1+N 查询了。

看来是我理解错了

最后写出来了吗?求答案

回到顶部