Python中是否有类似HTTPSQS的简单队列实现?用于危急值提醒系统

null
Python中是否有类似HTTPSQS的简单队列实现?用于危急值提醒系统

14 回复

celery ?


Python里没有HTTPSQS那种开箱即用的独立服务,但用标准库和几个轻量级包就能轻松搭一个类似的HTTP队列服务。核心思路就是用Flask或FastAPI提供HTTP接口,用queue.Queue或Redis做后端存储。

下面是一个用Flask和内存队列实现的完整示例,包含入队、出队和查看状态的基础功能:

from flask import Flask, request, jsonify
import queue
import threading
import time

app = Flask(__name__)

# 使用线程安全的队列
task_queue = queue.Queue(maxsize=1000)  # 设置队列最大长度防止内存溢出

@app.route('/put', methods=['POST'])
def put_message():
    """入队接口,接收JSON格式消息"""
    data = request.get_json()
    if not data or 'message' not in data:
        return jsonify({'error': 'Missing message field'}), 400
    
    try:
        # 添加时间戳和唯一ID
        import uuid
        message = {
            'id': str(uuid.uuid4()),
            'timestamp': time.time(),
            'content': data['message']
        }
        task_queue.put(message, block=False)
        return jsonify({'status': 'success', 'id': message['id']}), 200
    except queue.Full:
        return jsonify({'error': 'Queue is full'}), 507

@app.route('/get', methods=['GET'])
def get_message():
    """出队接口,支持timeout参数"""
    timeout = request.args.get('timeout', default=0, type=int)
    try:
        message = task_queue.get(block=True, timeout=timeout)
        return jsonify(message), 200
    except queue.Empty:
        return jsonify({'error': 'Queue is empty'}), 404

@app.route('/status', methods=['GET'])
def queue_status():
    """查看队列状态"""
    return jsonify({
        'size': task_queue.qsize(),
        'empty': task_queue.empty(),
        'full': task_queue.full()
    })

if __name__ == '__main__':
    # 启动一个消费者线程模拟处理
    def consumer():
        while True:
            try:
                item = task_queue.get(timeout=1)
                print(f"[Consumer] Processing: {item['id']} - {item['content']}")
                # 这里添加实际的处理逻辑,比如发送短信/邮件提醒
                task_queue.task_done()
            except queue.Empty:
                continue
    
    threading.Thread(target=consumer, daemon=True).start()
    app.run(host='0.0.0.0', port=5000, debug=False)

这个实现提供了三个核心接口:

  1. POST /put - 入队消息,需要JSON格式的{"message": "危急值内容"}
  2. GET /get - 出队消息,可加?timeout=5参数设置等待秒数
  3. GET /status - 查看队列当前状态

对于危急值提醒系统,你还需要:

  • 添加消息持久化(换成Redis或数据库)
  • 实现消息确认机制防止丢失
  • 添加认证和HTTPS支持
  • 用Celery或RQ替代简单的消费者线程来处理复杂任务

如果系统压力不大,这个内存队列版本就能跑起来;要上生产环境的话,建议直接用Redis的List结构配合BRPOP命令,或者考虑现成的消息队列如RabbitMQ。

总结:用Flask+队列几分钟就能搭个简单版,生产环境建议上Redis或专业消息队列。

楼上 celery 是任务队列,不是消息,RabbitMQ

HTTPSQS 有 http 的接口吧,Python 不能直接用么

python 应该可以用,但是需要先整台服务器安装,如果 python 有模块直接 pip 安装后,更方便调用,rabbitMQ 和 zeromq 这些好像也都需要安装到服务器

就算你 pip 直接安装,你还不是要单独起它个进程来跑。这和你装个 rabbitMQ、zeromq 有什么区别?

上 rq 吧, 多個 redis 而已

不考虑用一些公有云的自家消息队列服务?假若你已经用云了。

简单队列用 multiprocess 的 Queue 看看

都是内网使用,无法连公网
消息队列主要未接触过,不知道上手容易不?
httpsqs 这个实在是太简单了,js 好像也可以实现调用




对哦,若全套都单机跑,python pip 里就有的 Queue 也足够了。

没啥特别的,就当成是个带锁 list,标准的生产 /消费模型就准确描述它们了。

不支持 超时重试的队列和 redis 的 list 有什么区别?

http://xiaorui.cc/2015/05/10/使用 disque 做分布式消息队列支持重试和 ack 确认 /

这个帖子里面介绍的消息队列不知性能如何,好像直接 pip 安装

redis
lpush/brpop
pub/sub

回到顶部