Python中如何解决Java、RabbitMQ与Celery的跨语言通信问题?
背景
老衲是一个 python 的脑残粉(可以看我的昵称),然后遇到一个有脑的问题
细节
SpringBoot 搭建的 web 框架,里面集成了 rabbitmq 组建如下:
<!-- RabbitMQ 集成 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
然后将一些消息发送到 rabbitmq,后端用 celery 起了一堆 worker 来消费这些消息;
本来感觉这个架构很简单,实现起来很 easy,但是 celery 貌似对 json 对象处理有一些不兼容 JAVA 的 JSONobject ; 比如我将对象用 JSONObject 的 fromObject 方法转成 json 对象,然后发送到 MQ 中,celery 的 worker 就会报错:
[2018-11-19 14:21:52,895: CRITICAL/MainProcess] Can't decode message body: ContentDisallowed(u'Refusing to deserialize untrusted content of type application/x-java-serialized-object (application/x-java-serialized-object)',) [type:'application/x-java-serialized-object' encoding:None headers:{}]
…
Traceback (most recent call last):
File “/Users/cnbraid/.virtualenvs/celerytest/lib/python2.7/site-packages/celery/worker/consumer/consumer.py”, line 553, in on_task_received
payload = message.decode()
File “/Users/cnbraid/.virtualenvs/celerytest/lib/python2.7/site-packages/kombu/message.py”, line 192, in decode
self._decoded_cache = self._decode()
File “/Users/cnbraid/.virtualenvs/celerytest/lib/python2.7/site-packages/kombu/message.py”, line 197, in _decode
self.content_encoding, accept=self.accept)
File “/Users/cnbraid/.virtualenvs/celerytest/lib/python2.7/site-packages/kombu/serialization.py”, line 253, in loads
raise self._for_untrusted_content(content_type, ‘untrusted’)
ContentDisallowed: Refusing to deserialize untrusted content of type application/x-java-serialized-object (application/x-java-serialized-object)
我理解的意思是 celery 无法处理(application/x-java-serialized-object)这种对象吧?
这种情况该怎么解决呢? 我其实还是想用 celery 的,好处是起 worker 简单,easy,但是就是报上面的错误
Python中如何解决Java、RabbitMQ与Celery的跨语言通信问题?
这个 application/x-java-serialized-object 一看就是 java 专用的,你需要塞进去的时候转成 json.
这个问题其实挺常见的,核心就是让Java(生产者)和Python(Celery消费者)通过RabbitMQ这个中间人对话。它们之间传递的数据必须是双方都能理解的格式,通常用JSON。
关键点就两个:序列化协议和队列声明。
-
序列化协议:RabbitMQ只传字节流。你需要选一个两边都支持的格式。JSON是最简单通用的。Java端用
Jackson或Gson库把对象变成JSON字符串再发。Python的Celery默认就能处理JSON,所以收下来直接就是字典,用起来很方便。 -
队列声明:两边(Java和Python)声明队列时,名字、交换机、路由键这些参数必须完全一致,不然消息对不上。最好把连接工厂、队列名这些配置单独拿出来,两边共用一套。
代码示例:
Java端(生产者) - 使用Spring AMQP示例:
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class JavaProducerConfig {
// 1. 定义队列,名字要和Python端一致
@Bean
public Queue taskQueue() {
return new Queue("celery", true); // 队列名'celery',持久化
}
// 发送消息
public void sendTask(RabbitTemplate rabbitTemplate, ObjectMapper objectMapper) {
Map<String, Object> task = new HashMap<>();
task.put("id", UUID.randomUUID().toString());
task.put("task", "myapp.tasks.process_data"); // Celery任务路径
task.put("args", new Object[]{1, "hello"});
task.put("kwargs", new HashMap<>());
try {
String jsonMessage = objectMapper.writeValueAsString(task);
rabbitTemplate.convertAndSend("celery", jsonMessage); // 发到'celery'队列
System.out.println(" [Java] Sent: " + jsonMessage);
} catch (Exception e) {
e.printStackTrace();
}
}
}
Python端(Celery消费者):
# celery_config.py 或 直接在app定义里
from celery import Celery
app = Celery('myapp',
broker='pyamqp://guest@localhost//', # 连接RabbitMQ
backend='rpc://', # 结果后端,按需设置
include=['myapp.tasks']) # 包含任务模块
# 配置(关键!)
app.conf.update(
task_serializer='json', # 指定序列化为JSON
accept_content=['json'], # 只接受JSON内容
result_serializer='json',
task_queues = {
'celery': { # 队列名,和Java端声明的一致
'exchange': 'celery',
'routing_key': 'celery'
}
}
)
# tasks.py - 具体的任务
@app.task(name='myapp.tasks.process_data') # 任务名和Java端发送的'task'字段对应
def process_data(num, text):
print(f" [Python] Processing: {num}, {text}")
return f"Done: {text}_{num}"
运行流程:
- Java把任务数据(包含任务名
myapp.tasks.process_data、参数等)打包成JSON。 - Java通过RabbitTemplate发送到名为
celery的队列。 - Python的Celery worker监听着
celery队列,收到JSON消息。 - Celery根据JSON里的
task字段找到对应的process_data函数,把args里的参数传进去执行。 - 任务执行完,结果可以存到后端(如果配置了)。
总结:用JSON序列化,确保队列配置两边一致就行了。
嗯,也用过 Gson 将对象转成 json 对象,java 这边将类型打印出来是 string,如下:
消息已发送,应用的其它操作
{“user_id”:111,“username”:“lili”,“mobile”:“110”,“msg”:“系统信息”}
class java.lang.String
但是 celery 会报另外一种错误:
[2018-11-19 14:38:22,055: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!?
The full contents of the message body was: body: u’{“user_id”:111,“username”:“lili”,“mobile”:“110”,“msg”:“系统信息”}’ (80b)
{content_type:‘text/plain’ content_encoding:‘UTF-8’
delivery_info:{u’consumer_tag’: u’None4’, u’redelivered’: False, u’routing_key’: u’feifei.blog’, u’delivery_tag’: 4, u’exchange’: u’feifei’} headers={}}
这里看到接收的实际类型应该是 content_type:‘text/plain’
但是 celery 报错意思是不认识这个 message
你改成 application/json 就可以了,content_type:"text/plain"当然是不认识,纯文本,没法用
是这个问题,看来理想很丰满
发现还是不行,报错内容变了:
[2018-11-19 15:27:36,288: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!?
The full contents of the message body was: body: {u’username’: u’libin’, u’mobile’: u’15210832508’, u’user_id’: 111, u’device_id’: u’88889999’} (80b)
{content_type:‘application/json’ content_encoding:None
delivery_info:{u’consumer_tag’: u’None4’, u’redelivered’: False, u’routing_key’: u’feifei.blog’, u’delivery_tag’: 2, u’exchange’: u’feifei’} headers={}}
谢谢,试过了,但是不是这个原因导致的,我看了下我都没安装过 librabbitmq 这个组件。
你发到 rabbitmq 的数据格式要遵守 celery 的数据格式呀,随便传一个肯定不行呀
要么消息推送的时候跟 celery 推送的消息格式一样
要么就是避免这种情况,中间层,将消息推送给 API ( api 入 mq,go 不操作 mq )。此 API 只做 tasks.*.apply_async() / delay…
我的场景是 Go RabbitMq Celery Python
https://blog.thinking.mobi/articles/2020/03/06/1583496791208.html

