【新手求学】Nodejs+rabbitmq 使用amqp(或者amqplib问题一样) 出现瓶颈求解
【新手求学】Nodejs+rabbitmq 使用amqp(或者amqplib问题一样) 出现瓶颈求解
【内容描述】:用nodejs做生产者发布消息到另外一台的rabbitmq服务器上(重点是rabbitmq在另外一台计算机上)。 【目标】: message为一个4k的文本,最短时间发送10万个message。 【现象】: 1.在message为4k,一次性for循环publish 4万个以内,速度很快完成,cpu使用很低,只会在for循环时高一点(很短),但是,超过5万速度明显慢的比蜗牛还慢,cpu使用率高(几乎满掉),而且很久,速度直到publish回调函数成功完成到剩下的在四万个以内才会有变高,cpu使用率降低。 2.在message很小,就几字节的时候,for循环publish 10万或者20万,没有任何差别,速度高效。 3.上诉情况如果rabbitmq服务器在本地与nodejs生产者在同一台电脑,无论message大小为那种情况,都不会有问题,速度比远程调用速度快上六七倍。
【我的问题】:为什么当message为4k,rabbitmq在远程(另一台电脑上)会有一个5万的界限(即在这个界限两边速度差距这么大)?如何解决这个问题?求大神讲解?在线等…
【代码】: var amqp = require(“amqp”); var fs = require(‘fs’); var exchName = “topic”; //exhange 名称 var routeKey = “topic”; // 路由键
var connOptions = { host: ‘192.168.65.170’ , port: 5672 , login: ‘guest’ , password: ‘guest’ , authMechanism: ‘AMQPLAIN’ , vhost: ‘/’ , ssl: { enabled : false } }
var exchOption = { type: ‘topic’ ,durable: true ,autoDelete: false ,confirm: false }
var messOption = { contentEncoding: “utf-8” ,deliveryMode: 1 }
var message = fs.readFileSync(‘2.txt’).toString(‘utf-8’); var conn = amqp.createConnection(connOptions); //连接rabbitmq var n=100000; // 循环数
var messFunc = function(e){ }
var exchFunc = function(exchange){ now = new Date(); mill = now.getMilliseconds(); console.log(now,mill); //[此处是我问题的点] for (var i = 0; i <n; i++) { exchange.publish(routeKey,new Buffer(message),’’,messFunc); //发布消息 因为exchange属性confirm为false,此处不会回调messFunc } now = new Date(); mill = now.getMilliseconds(); console.log(now,mill); }
var connFunc = function(){ console.log(“ready”); var exch = conn.exchange(exchName,exchOption,exchFunc); //获取exchange 生成生产者 }
conn.on(‘ready’,connFunc); //rabbitmq连接成功调用connFunc
【新手求学】Nodejs + RabbitMQ 使用 amqp(或者amqplib问题一样) 出现瓶颈求解
【内容描述】
使用 Node.js 作为生产者将消息发布到位于另一台计算机上的 RabbitMQ 服务器。
【目标】
发送一个 4KB 的文本消息,最短时间发送 10 万个消息。
【现象】
- 当消息大小为 4KB,并且一次性通过 for 循环发布 4 万个消息时,速度很快,CPU 使用率较低(仅在 for 循环期间较高,但时间很短)。然而,超过 5 万个消息后,速度显著下降,CPU 使用率显著升高,且需要很长时间才能完成。只有当剩余的消息数量降至 4 万个以内时,速度才会提升,CPU 使用率下降。
- 当消息较小(只有几个字节)时,即使循环发布 10 万个或 20 万个消息,速度也很快。
- 如果 RabbitMQ 服务器与 Node.js 生产者在同一台机器上,则无论消息大小如何,都不会出现上述问题,速度也会快得多。
【我的问题】
为什么当消息大小为 4KB 并且 RabbitMQ 服务器位于远程机器上时,会出现一个大约 5 万个消息的界限?如何解决这个问题?
【解决方案】
该问题主要由于网络延迟和 RabbitMQ 服务器处理大量消息的能力限制所致。为了解决此问题,可以采取以下措施:
- 异步发布消息:确保消息发布操作是异步的,以避免阻塞事件循环。
- 批量发布消息:而不是一次性发布所有消息,可以分批次发布消息,减少每次请求的数据量。
示例代码
const amqp = require('amqp');
const fs = require('fs');
// 配置 RabbitMQ 连接
const connOptions = {
host: '192.168.65.170',
port: 5672,
login: 'guest',
password: 'guest',
authMechanism: 'AMQPLAIN',
vhost: '/',
ssl: {
enabled: false
}
};
// 定义交换机选项
const exchOption = {
type: 'topic',
durable: true,
autoDelete: false,
confirm: false
};
// 定义消息选项
const messOption = {
contentEncoding: "utf-8",
deliveryMode: 1
};
// 读取消息内容
const message = fs.readFileSync('2.txt').toString('utf-8');
const n = 100000; // 循环次数
// 异步发布消息
function publishMessage(exchange) {
const batchSize = 1000;
let count = 0;
function publishBatch() {
for (let i = 0; i < batchSize && count < n; i++) {
exchange.publish('topic', new Buffer(message), '', () => {
count++;
if (count % batchSize === 0 || count === n) {
console.log(`Published ${count} messages`);
}
});
}
if (count < n) {
setTimeout(publishBatch, 0); // 延迟一段时间继续发布下一批
}
}
publishBatch();
}
// 连接 RabbitMQ 并创建交换机
const conn = amqp.createConnection(connOptions);
conn.on('ready', () => {
console.log("RabbitMQ connection ready");
const exch = conn.exchange('topic', exchOption, publishMessage);
});
解释
- 异步发布消息:通过
setTimeout
实现异步发布,避免阻塞事件循环。 - 批量发布消息:每次发布 1000 条消息,然后等待下一批次发布。这有助于减轻网络负载并提高整体性能。
我所说的速度慢,实在rabbimq检测到的消息接收到的速度
更新:此情况只会在windows系统下存在,在linux系统下就没有了~~
是不是windows的端口号耗尽,新的连接需要等新空出来的端口号的原因?
node http默认用了连接池,把它关掉可能会好些(agent:false)。
在linux下面直接100兆带宽直接跑满一点不剩…可能是系统的堆栈实现原理不同导致的吧。其实很想知道真正原因。
我测试了下貌似没有问题,我的环境如下: 3台linux+一台负载 ,配置不高,分别2G内存的虚拟机,镜像模式+持久化,客户端使用amqp.node
- 第一次测试
每次发送5K消息打开channal 发送后关闭,发送80000 ,非常慢 ,每秒publish 20左右,等不起直接kill :( !!! 
- 第二次测试一次连接后循环发送10000, 共发送12万左右 5K ,结果如下:
( !!! !二er次测试](http://d.pcs.baidu.com/thumbnail/6bfe88de3371748a4b4dfb23bc01db13?fid=2183602240-250528-2972554457&time=1385520220&sign=FDTAER-DCb740ccc5511e5e8fedcff06b081203-qwenkVuf8IMFxnz5be75P8iOW%2F0%3D&rt=sh&expires=8h&r=392707045&sharesign=unknown&size=c850_u580&quality=100"Optional title")
回复内容无法修改??? 啊!!!
我最近也在用rabbitmq+node.js,很想和楼主讨论一下这方面的问题,呵呵
从你的描述来看,问题主要出现在网络延迟和批量处理效率上。当消息较大(如4K),并且RabbitMQ服务器位于另一台计算机上时,网络传输成为瓶颈。一次性的大量数据传输会导致网络拥塞,从而影响性能。
解决方案
1. 分批发送消息
为了提高效率并减少网络压力,可以考虑分批发送消息,而不是一次性发送大量消息。
const amqp = require('amqplib'); // 使用amqplib库
const fs = require('fs');
async function sendMessage() {
const connection = await amqp.connect('amqp://guest:guest@192.168.65.170:5672/');
const channel = await connection.createChannel();
const exchangeName = 'topic';
const routingKey = 'topic';
const messageContent = fs.readFileSync('2.txt').toString('utf-8');
const batchSize = 1000; // 每批发送的消息数量
const totalMessages = 100000;
for (let i = 0; i < totalMessages; i += batchSize) {
const batch = [];
for (let j = 0; j < batchSize && i + j < totalMessages; j++) {
batch.push({ content: messageContent, index: i + j });
}
await Promise.all(batch.map(msg => channel.sendToQueue(routingKey, Buffer.from(msg.content))));
console.log(`Sent batch ${Math.floor(i / batchSize) + 1} of ${Math.ceil(totalMessages / batchSize)}`);
}
channel.close();
connection.close();
}
sendMessage().catch(console.error);
2. 异步处理回调
确保消息发送操作在异步环境中执行,并且处理每个消息的确认回调以避免阻塞。
代码解析
- 分批发送:将消息分成小批次发送,这样可以减少单次发送的数据量,避免网络拥塞。
- 异步处理:使用
Promise.all
来确保所有消息都已成功发送后再进行下一批次的发送。
通过这种方式,可以有效提升消息发送的速度,并降低CPU和网络的压力。