Python中如何在Flask框架中添加独立的数据处理层并复用模型层代码?

该数据处理层是异步的,可以做定时任务,也可以做异步任务
Python中如何在Flask框架中添加独立的数据处理层并复用模型层代码?

13 回复

不想重复做轮子就 celery


在Flask里把数据处理和模型层分开,可以这么干:

首先,模型层用SQLAlchemy的declarative_base定义,放在单独的模块里(比如models.py):

# models.py
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    username = Column(String(80), unique=True)
    email = Column(String(120))

然后创建独立的数据处理层(比如repositories.py),这里封装所有数据库操作:

# repositories.py
from sqlalchemy.orm import sessionmaker
from models import Base, User
from sqlalchemy import create_engine

class UserRepository:
    def __init__(self, db_url='sqlite:///app.db'):
        engine = create_engine(db_url)
        Base.metadata.create_all(engine)
        Session = sessionmaker(bind=engine)
        self.session = Session()
    
    def get_all_users(self):
        return self.session.query(User).all()
    
    def get_user_by_id(self, user_id):
        return self.session.query(User).filter_by(id=user_id).first()
    
    def create_user(self, username, email):
        user = User(username=username, email=email)
        self.session.add(user)
        self.session.commit()
        return user
    
    def update_user(self, user_id, **kwargs):
        user = self.get_user_by_id(user_id)
        if user:
            for key, value in kwargs.items():
                setattr(user, key, value)
            self.session.commit()
        return user
    
    def delete_user(self, user_id):
        user = self.get_user_by_id(user_id)
        if user:
            self.session.delete(user)
            self.session.commit()

最后在Flask应用里复用这个数据处理层:

# app.py
from flask import Flask, request, jsonify
from repositories import UserRepository

app = Flask(__name__)
user_repo = UserRepository()

@app.route('/users', methods=['GET'])
def get_users():
    users = user_repo.get_all_users()
    return jsonify([{'id': u.id, 'username': u.username, 'email': u.email} for u in users])

@app.route('/users', methods=['POST'])
def create_user():
    data = request.json
    user = user_repo.create_user(data['username'], data['email'])
    return jsonify({'id': user.id, 'username': user.username, 'email': user.email}), 201

@app.route('/users/<int:user_id>', methods=['PUT'])
def update_user(user_id):
    data = request.json
    user = user_repo.update_user(user_id, **data)
    if user:
        return jsonify({'id': user.id, 'username': user.username, 'email': user.email})
    return jsonify({'error': 'User not found'}), 404

@app.route('/users/<int:user_id>', methods=['DELETE'])
def delete_user(user_id):
    user_repo.delete_user(user_id)
    return '', 204

if __name__ == '__main__':
    app.run(debug=True)

这样搞的好处是:模型层代码完全独立,数据处理逻辑集中管理,Flask路由只负责HTTP交互,各层职责清晰。需要换数据库或者改业务逻辑时,只需要动对应的层就行。

总结:用Repository模式分离关注点。

celery 怎么做异步任务呢? 我这里的异步任务是指第三方系统推送一个消息过来,我这边接受到后读取并执行指定任务

celery 只是一个任务执行框架,你把任务推过去就行了,第三方消息接收还是需要 flask,或者你单独做一个队列 kafka 之类的处理

那第三方怎么推送任务呢?我使用 celery 定义了一个任务,我现在需要一个第三方系统去触发,使用 http api 又有安全问题,这个时候我应该怎么推送任务呢?。

celery 有定时任务啊!可以定时触发么

使用 celery 的话,简单点的模型就是 API —(celery)—> MQ —(celery)—> Worker 这样。
依旧是通过 API 对第三方提供服务,安全问题可以通过认证、白名单等方式来解决。
celery 主要是起到一个任务队列的作用,将 API 这段接受到的任务推给 worker 去处理。

celery 吧,定时,异步都符合了。http api 做好认证,ip 白名单,问题不大。

cron & celery

做个 restful api 接收异步回调,然后调用 celery 任务就好了啊

使用 http 性能是个严重的问题

那就长连接,用 websocket,再不济你用 rpc

APScheduler

回到顶部