Python中如何使用sqlalchemy实现insert or update操作
from sqlalchemy.ext.compiler import compiles
import sqlalchemy.sql.expression as expr
class Upsert(expr.Insert): pass
@compiles(Upsert, “mysql”)
def compile_upsert(insert_stmt, compiler, **kwargs):
if insert_stmt._has_multi_parameters:
keys = insert_stmt.parameters[0].keys()
else:
keys = insert_stmt.parameters.keys()
pk = insert_stmt.table.primary_key
auto = None
if (len(pk.columns) == 1 and
isinstance(pk.columns.values()[0].type, sa.Integer) and
pk.columns.values()[0].autoincrement):
auto = pk.columns.keys()[0]
if auto in keys:
keys.remove(auto)
insert = compiler.visit_insert(insert_stmt, **kwargs)
ondup = ‘ON DUPLICATE KEY UPDATE’
updates = ', '.join(
‘%s = VALUES(%s)’ % (c.name, c.name)
for c in insert_stmt.table.columns
if c.name in keys
)
if auto is not None:
last_id = ‘%s = LAST_INSERT_ID(%s)’ % (auto, auto)
if updates:
updates = ', '.join((last_id, updates))
else:
updates = last_id
upsert = ’ '.join((insert, ondup, updates))
return upsert
try:
###INSERT OR UPDATe
db.session.execute(Upsert( BIUserStatistic,rows))
except:
print('process_bi_user_statistic_records transaction.rollback()')
db.session.rollback()
raise
else:
print('process_bi_user_statistic_records transaction.commit()')
Python中如何使用sqlalchemy实现insert or update操作
我是谁?
在SQLAlchemy里实现"insert or update"(也叫upsert)有几种方式,核心是使用on_conflict_do_update()方法。下面给你一个完整的例子:
from sqlalchemy import create_engine, Column, Integer, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.dialects.postgresql import insert
from datetime import datetime
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(50), unique=True)
email = Column(String(100))
created_at = Column(DateTime, default=datetime.now)
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
# 创建数据库连接
engine = create_engine('postgresql://user:password@localhost/dbname')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
def upsert_user(name, email):
session = Session()
try:
# 创建insert语句
stmt = insert(User).values(
name=name,
email=email,
updated_at=datetime.now()
)
# 设置冲突处理:当name冲突时更新email和updated_at
stmt = stmt.on_conflict_do_update(
index_elements=['name'], # 冲突检测字段
set_={
'email': stmt.excluded.email,
'updated_at': stmt.excluded.updated_at
}
)
# 执行upsert
session.execute(stmt)
session.commit()
print(f"Upserted user: {name}")
except Exception as e:
session.rollback()
print(f"Error: {e}")
finally:
session.close()
# 使用示例
upsert_user('john_doe', 'john@example.com') # 第一次插入
upsert_user('john_doe', 'john.new@example.com') # 第二次更新
关键点说明:
- 需要从
sqlalchemy.dialects.postgresql导入insert(其他数据库类似) on_conflict_do_update()指定冲突处理策略index_elements参数定义哪些字段冲突时触发更新set_参数定义更新哪些字段,excluded代表新值- 这个方法利用了数据库的ON CONFLICT特性,效率比先查询再判断高
如果是MySQL,用法类似,但需要从sqlalchemy.dialects.mysql导入insert。
总结:用on_conflict_do_update()实现upsert最简洁高效。
我在哪?
我要到哪里去?
来 v2 几天我真是膨胀了,连 TM 标题都没看懂就敢进来看看
就是 insert 不进去,就更新对应的行。。
就是 insert 不进去,就更新对应的行。。
就是 insert 不进去,就更新对应的行。。
这个我是真不懂,python 超级白,还是等大神来给解答一下吧😂

