Python中如何使用sqlalchemy实现insert or update操作

  from sqlalchemy.ext.compiler import compiles
 import sqlalchemy.sql.expression as expr

class Upsert(expr.Insert): pass

@compiles(Upsert, “mysql”) def compile_upsert(insert_stmt, compiler, **kwargs): if insert_stmt._has_multi_parameters: keys = insert_stmt.parameters[0].keys() else: keys = insert_stmt.parameters.keys() pk = insert_stmt.table.primary_key auto = None if (len(pk.columns) == 1 and isinstance(pk.columns.values()[0].type, sa.Integer) and pk.columns.values()[0].autoincrement): auto = pk.columns.keys()[0] if auto in keys: keys.remove(auto) insert = compiler.visit_insert(insert_stmt, **kwargs) ondup = ‘ON DUPLICATE KEY UPDATE’ updates = ', '.join( ‘%s = VALUES(%s)’ % (c.name, c.name) for c in insert_stmt.table.columns if c.name in keys ) if auto is not None: last_id = ‘%s = LAST_INSERT_ID(%s)’ % (auto, auto) if updates: updates = ', '.join((last_id, updates)) else: updates = last_id upsert = ’ '.join((insert, ondup, updates)) return upsert

 	try:

        ###INSERT OR UPDATe
        db.session.execute(Upsert( BIUserStatistic,rows))
    except:
        print('process_bi_user_statistic_records transaction.rollback()')
        db.session.rollback()
        raise
    else:
        print('process_bi_user_statistic_records transaction.commit()')


Python中如何使用sqlalchemy实现insert or update操作

9 回复

我是谁?


在SQLAlchemy里实现"insert or update"(也叫upsert)有几种方式,核心是使用on_conflict_do_update()方法。下面给你一个完整的例子:

from sqlalchemy import create_engine, Column, Integer, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.dialects.postgresql import insert
from datetime import datetime

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String(50), unique=True)
    email = Column(String(100))
    created_at = Column(DateTime, default=datetime.now)
    updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)

# 创建数据库连接
engine = create_engine('postgresql://user:password@localhost/dbname')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)

def upsert_user(name, email):
    session = Session()
    try:
        # 创建insert语句
        stmt = insert(User).values(
            name=name,
            email=email,
            updated_at=datetime.now()
        )
        
        # 设置冲突处理:当name冲突时更新email和updated_at
        stmt = stmt.on_conflict_do_update(
            index_elements=['name'],  # 冲突检测字段
            set_={
                'email': stmt.excluded.email,
                'updated_at': stmt.excluded.updated_at
            }
        )
        
        # 执行upsert
        session.execute(stmt)
        session.commit()
        print(f"Upserted user: {name}")
        
    except Exception as e:
        session.rollback()
        print(f"Error: {e}")
    finally:
        session.close()

# 使用示例
upsert_user('john_doe', 'john@example.com')  # 第一次插入
upsert_user('john_doe', 'john.new@example.com')  # 第二次更新

关键点说明:

  1. 需要从sqlalchemy.dialects.postgresql导入insert(其他数据库类似)
  2. on_conflict_do_update()指定冲突处理策略
  3. index_elements参数定义哪些字段冲突时触发更新
  4. set_参数定义更新哪些字段,excluded代表新值
  5. 这个方法利用了数据库的ON CONFLICT特性,效率比先查询再判断高

如果是MySQL,用法类似,但需要从sqlalchemy.dialects.mysql导入insert

总结:用on_conflict_do_update()实现upsert最简洁高效。

我在哪?

我要到哪里去?

来 v2 几天我真是膨胀了,连 TM 标题都没看懂就敢进来看看

就是 insert 不进去,就更新对应的行。。

就是 insert 不进去,就更新对应的行。。

就是 insert 不进去,就更新对应的行。。

这个我是真不懂,python 超级白,还是等大神来给解答一下吧😂

回到顶部