Python中如何解决Java、RabbitMQ与Celery的跨语言通信问题?

背景

老衲是一个 python 的脑残粉(可以看我的昵称),然后遇到一个有脑的问题

细节

SpringBoot 搭建的 web 框架,里面集成了 rabbitmq 组建如下:

        <!-- RabbitMQ 集成 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

然后将一些消息发送到 rabbitmq,后端用 celery 起了一堆 worker 来消费这些消息;

本来感觉这个架构很简单,实现起来很 easy,但是 celery 貌似对 json 对象处理有一些不兼容 JAVA 的 JSONobject ; 比如我将对象用 JSONObject 的 fromObject 方法转成 json 对象,然后发送到 MQ 中,celery 的 worker 就会报错:

[2018-11-19 14:21:52,895: CRITICAL/MainProcess] Can't decode message body: ContentDisallowed(u'Refusing to deserialize untrusted content of type application/x-java-serialized-object (application/x-java-serialized-object)',) [type:'application/x-java-serialized-object' encoding:None headers:{}]

Traceback (most recent call last): File “/Users/cnbraid/.virtualenvs/celerytest/lib/python2.7/site-packages/celery/worker/consumer/consumer.py”, line 553, in on_task_received payload = message.decode() File “/Users/cnbraid/.virtualenvs/celerytest/lib/python2.7/site-packages/kombu/message.py”, line 192, in decode self._decoded_cache = self._decode() File “/Users/cnbraid/.virtualenvs/celerytest/lib/python2.7/site-packages/kombu/message.py”, line 197, in _decode self.content_encoding, accept=self.accept) File “/Users/cnbraid/.virtualenvs/celerytest/lib/python2.7/site-packages/kombu/serialization.py”, line 253, in loads raise self._for_untrusted_content(content_type, ‘untrusted’) ContentDisallowed: Refusing to deserialize untrusted content of type application/x-java-serialized-object (application/x-java-serialized-object)

我理解的意思是 celery 无法处理(application/x-java-serialized-object)这种对象吧?

这种情况该怎么解决呢? 我其实还是想用 celery 的,好处是起 worker 简单,easy,但是就是报上面的错误


Python中如何解决Java、RabbitMQ与Celery的跨语言通信问题?

10 回复

这个 application/x-java-serialized-object 一看就是 java 专用的,你需要塞进去的时候转成 json.


这个问题其实挺常见的,核心就是让Java(生产者)和Python(Celery消费者)通过RabbitMQ这个中间人对话。它们之间传递的数据必须是双方都能理解的格式,通常用JSON。

关键点就两个:序列化协议和队列声明。

  1. 序列化协议:RabbitMQ只传字节流。你需要选一个两边都支持的格式。JSON是最简单通用的。Java端用JacksonGson库把对象变成JSON字符串再发。Python的Celery默认就能处理JSON,所以收下来直接就是字典,用起来很方便。

  2. 队列声明:两边(Java和Python)声明队列时,名字、交换机、路由键这些参数必须完全一致,不然消息对不上。最好把连接工厂、队列名这些配置单独拿出来,两边共用一套。

代码示例:

Java端(生产者) - 使用Spring AMQP示例:

import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class JavaProducerConfig {

    // 1. 定义队列,名字要和Python端一致
    @Bean
    public Queue taskQueue() {
        return new Queue("celery", true); // 队列名'celery',持久化
    }

    // 发送消息
    public void sendTask(RabbitTemplate rabbitTemplate, ObjectMapper objectMapper) {
        Map<String, Object> task = new HashMap<>();
        task.put("id", UUID.randomUUID().toString());
        task.put("task", "myapp.tasks.process_data"); // Celery任务路径
        task.put("args", new Object[]{1, "hello"});
        task.put("kwargs", new HashMap<>());

        try {
            String jsonMessage = objectMapper.writeValueAsString(task);
            rabbitTemplate.convertAndSend("celery", jsonMessage); // 发到'celery'队列
            System.out.println(" [Java] Sent: " + jsonMessage);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Python端(Celery消费者):

# celery_config.py 或 直接在app定义里
from celery import Celery

app = Celery('myapp',
             broker='pyamqp://guest@localhost//', # 连接RabbitMQ
             backend='rpc://', # 结果后端,按需设置
             include=['myapp.tasks']) # 包含任务模块

# 配置(关键!)
app.conf.update(
    task_serializer='json', # 指定序列化为JSON
    accept_content=['json'], # 只接受JSON内容
    result_serializer='json',
    task_queues = {
        'celery': { # 队列名,和Java端声明的一致
            'exchange': 'celery',
            'routing_key': 'celery'
        }
    }
)

# tasks.py - 具体的任务
@app.task(name='myapp.tasks.process_data') # 任务名和Java端发送的'task'字段对应
def process_data(num, text):
    print(f" [Python] Processing: {num}, {text}")
    return f"Done: {text}_{num}"

运行流程:

  1. Java把任务数据(包含任务名myapp.tasks.process_data、参数等)打包成JSON。
  2. Java通过RabbitTemplate发送到名为celery的队列。
  3. Python的Celery worker监听着celery队列,收到JSON消息。
  4. Celery根据JSON里的task字段找到对应的process_data函数,把args里的参数传进去执行。
  5. 任务执行完,结果可以存到后端(如果配置了)。

总结:用JSON序列化,确保队列配置两边一致就行了。

嗯,也用过 Gson 将对象转成 json 对象,java 这边将类型打印出来是 string,如下:

消息已发送,应用的其它操作
{“user_id”:111,“username”:“lili”,“mobile”:“110”,“msg”:“系统信息”}
class java.lang.String

但是 celery 会报另外一种错误:

[2018-11-19 14:38:22,055: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!?

The full contents of the message body was: body: u’{“user_id”:111,“username”:“lili”,“mobile”:“110”,“msg”:“系统信息”}’ (80b)
{content_type:‘text/plain’ content_encoding:‘UTF-8’
delivery_info:{u’consumer_tag’: u’None4’, u’redelivered’: False, u’routing_key’: u’feifei.blog’, u’delivery_tag’: 4, u’exchange’: u’feifei’} headers={}}


这里看到接收的实际类型应该是 content_type:‘text/plain’

但是 celery 报错意思是不认识这个 message

你改成 application/json 就可以了,content_type:"text/plain"当然是不认识,纯文本,没法用

是这个问题,看来理想很丰满

发现还是不行,报错内容变了:

[2018-11-19 15:27:36,288: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!?

The full contents of the message body was: body: {u’username’: u’libin’, u’mobile’: u’15210832508’, u’user_id’: 111, u’device_id’: u’88889999’} (80b)
{content_type:‘application/json’ content_encoding:None
delivery_info:{u’consumer_tag’: u’None4’, u’redelivered’: False, u’routing_key’: u’feifei.blog’, u’delivery_tag’: 2, u’exchange’: u’feifei’} headers={}}

谢谢,试过了,但是不是这个原因导致的,我看了下我都没安装过 librabbitmq 这个组件。

你发到 rabbitmq 的数据格式要遵守 celery 的数据格式呀,随便传一个肯定不行呀

要么消息推送的时候跟 celery 推送的消息格式一样
要么就是避免这种情况,中间层,将消息推送给 API ( api 入 mq,go 不操作 mq )。此 API 只做 tasks.*.apply_async() / delay…
我的场景是 Go RabbitMq Celery Python
https://blog.thinking.mobi/articles/2020/03/06/1583496791208.html

回到顶部