Python中如何使用sqlalchemy操作MySQL的JSON类型
针对于 MySQL 的 JSON 类型,官方上已经做了详细的介绍,在此,简单啰嗦几句吧,毕竟曾经没怎么遇到过。
-
sql 脚本形式: 创建:create table stu(id int primary key, name varchar(20), remark JSON); 插入:insert into stu(id, name, remark) values(1, 'ha', '{'sc':'1a', 'mc':'2b'}'); 更新:update stu set remark = json_set(remark, '$.sc', '3a'); 删除:update stu set remark = json_remove(remark, '$.sc'); 增键:update stu set remark = json_set(remark, '$.tc', '5e');
对于这些操作,是使用 sql 脚本进行,需要执行原生 sql 脚本。但平常更多使用的是 ORM 形式。
-
orm 形式: 对于 ORM 形式,当然也可以使用 嵌入式的 sql 脚本进行完成,(此处不再说)。 或着结构对象来做。 比如上面的这个表,对于 ORM 形式,应该有一个 model 与之对应: 可以为: class Stu: id = Column(types.integer, primary_key=True) name = Column(types.String(20)) remark = Column(types.JSON)
这个时候,可以这么做: the_stu = self.db_session.query(Stu).filter(Stu.id == uid).first() rem = the_stu.remark # 备份 the_stu.remark=None
db_session.flush() # 具体为什么这么做还没明白,直接赋值不起作用。 rem['news'] = 'newsss' # 添加新值 the_stu.remark = rem # 将新的对象重新赋给模型对象的属性。 db_session.commit()
不知道还有没有更好的解决方式,希望多多指教,大家都可以少走弯路,提高工作效率,一起学习加群 902788038, 群里也有 HR 小姐姐们,在进步的同时也祝愿能找到心仪工作。
Python中如何使用sqlalchemy操作MySQL的JSON类型
在SQLAlchemy里操作MySQL的JSON字段挺简单的,主要用JSON类型和对应的查询方法。下面给你个完整例子:
from sqlalchemy import create_engine, Column, Integer, JSON, select, func
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import json
# 创建基类
Base = declarative_base()
# 定义模型
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(50))
profile = Column(JSON) # JSON类型字段
settings = Column(JSON) # 另一个JSON字段
# 创建引擎和表
engine = create_engine('mysql+pymysql://user:password@localhost/testdb')
Base.metadata.create_all(engine)
# 创建会话
Session = sessionmaker(bind=engine)
session = Session()
# 1. 插入JSON数据
user1 = User(
name='张三',
profile={'age': 25, 'city': '北京', 'hobbies': ['读书', '编程']},
settings={'theme': 'dark', 'notifications': True}
)
session.add(user1)
session.commit()
# 2. 查询JSON字段
# 直接访问
user = session.query(User).first()
print(user.profile['city']) # 输出: 北京
# 3. JSON路径查询
# 查询city为北京的用户
users = session.query(User).filter(
User.profile['city'].astext == '北京'
).all()
# 4. JSON包含查询
# 查询hobbies包含"编程"的用户
users = session.query(User).filter(
User.profile['hobbies'].contains(['编程'])
).all()
# 5. JSON键存在性检查
# 查询profile中有age键的用户
users = session.query(User).filter(
User.profile.has_key('age')
).all()
# 6. 更新JSON字段
user = session.query(User).first()
user.profile['age'] = 26 # 修改值
user.profile['married'] = False # 添加新键
session.commit()
# 7. JSON函数操作
# 获取JSON数组长度
query = session.query(
User.name,
func.json_length(User.profile['hobbies']).label('hobby_count')
)
for row in query:
print(f"{row.name}: {row.hobby_count}个爱好")
# 8. 复杂查询示例
# 查询年龄大于25且喜欢编程的用户
users = session.query(User).filter(
User.profile['age'].astext.cast(Integer) > 25,
User.profile['hobbies'].contains(['编程'])
).all()
session.close()
关键点:
- 用
Column(JSON)定义JSON字段 - 插入时直接传Python字典或列表
- 查询用
['key']访问JSON属性,astext转文本 contains()查数组包含,has_key()检查键存在- 更新时直接修改字典然后commit
注意MySQL版本要5.7以上才支持JSON类型。
一句话总结:用JSON类型字段,像操作字典一样查询和更新。
不忙了学习下,谢谢。

