Python中如何在Flask框架中添加独立的数据处理层并复用模型层代码?
该数据处理层是异步的,可以做定时任务,也可以做异步任务
Python中如何在Flask框架中添加独立的数据处理层并复用模型层代码?
不想重复做轮子就 celery
在Flask里把数据处理和模型层分开,可以这么干:
首先,模型层用SQLAlchemy的declarative_base定义,放在单独的模块里(比如models.py):
# models.py
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(80), unique=True)
email = Column(String(120))
然后创建独立的数据处理层(比如repositories.py),这里封装所有数据库操作:
# repositories.py
from sqlalchemy.orm import sessionmaker
from models import Base, User
from sqlalchemy import create_engine
class UserRepository:
def __init__(self, db_url='sqlite:///app.db'):
engine = create_engine(db_url)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
self.session = Session()
def get_all_users(self):
return self.session.query(User).all()
def get_user_by_id(self, user_id):
return self.session.query(User).filter_by(id=user_id).first()
def create_user(self, username, email):
user = User(username=username, email=email)
self.session.add(user)
self.session.commit()
return user
def update_user(self, user_id, **kwargs):
user = self.get_user_by_id(user_id)
if user:
for key, value in kwargs.items():
setattr(user, key, value)
self.session.commit()
return user
def delete_user(self, user_id):
user = self.get_user_by_id(user_id)
if user:
self.session.delete(user)
self.session.commit()
最后在Flask应用里复用这个数据处理层:
# app.py
from flask import Flask, request, jsonify
from repositories import UserRepository
app = Flask(__name__)
user_repo = UserRepository()
@app.route('/users', methods=['GET'])
def get_users():
users = user_repo.get_all_users()
return jsonify([{'id': u.id, 'username': u.username, 'email': u.email} for u in users])
@app.route('/users', methods=['POST'])
def create_user():
data = request.json
user = user_repo.create_user(data['username'], data['email'])
return jsonify({'id': user.id, 'username': user.username, 'email': user.email}), 201
@app.route('/users/<int:user_id>', methods=['PUT'])
def update_user(user_id):
data = request.json
user = user_repo.update_user(user_id, **data)
if user:
return jsonify({'id': user.id, 'username': user.username, 'email': user.email})
return jsonify({'error': 'User not found'}), 404
@app.route('/users/<int:user_id>', methods=['DELETE'])
def delete_user(user_id):
user_repo.delete_user(user_id)
return '', 204
if __name__ == '__main__':
app.run(debug=True)
这样搞的好处是:模型层代码完全独立,数据处理逻辑集中管理,Flask路由只负责HTTP交互,各层职责清晰。需要换数据库或者改业务逻辑时,只需要动对应的层就行。
总结:用Repository模式分离关注点。
celery 怎么做异步任务呢? 我这里的异步任务是指第三方系统推送一个消息过来,我这边接受到后读取并执行指定任务
celery 只是一个任务执行框架,你把任务推过去就行了,第三方消息接收还是需要 flask,或者你单独做一个队列 kafka 之类的处理
那第三方怎么推送任务呢?我使用 celery 定义了一个任务,我现在需要一个第三方系统去触发,使用 http api 又有安全问题,这个时候我应该怎么推送任务呢?。
celery 有定时任务啊!可以定时触发么
使用 celery 的话,简单点的模型就是 API —(celery)—> MQ —(celery)—> Worker 这样。
依旧是通过 API 对第三方提供服务,安全问题可以通过认证、白名单等方式来解决。
celery 主要是起到一个任务队列的作用,将 API 这段接受到的任务推给 worker 去处理。
celery 吧,定时,异步都符合了。http api 做好认证,ip 白名单,问题不大。
cron & celery
做个 restful api 接收异步回调,然后调用 celery 任务就好了啊
使用 http 性能是个严重的问题
那就长连接,用 websocket,再不济你用 rpc
APScheduler

