【新手求学】Nodejs+rabbitmq 使用amqp(或者amqplib问题一样) 出现瓶颈求解

【新手求学】Nodejs+rabbitmq 使用amqp(或者amqplib问题一样) 出现瓶颈求解

【内容描述】:用nodejs做生产者发布消息到另外一台的rabbitmq服务器上(重点是rabbitmq在另外一台计算机上)。 【目标】: message为一个4k的文本,最短时间发送10万个message。 【现象】: 1.在message为4k,一次性for循环publish 4万个以内,速度很快完成,cpu使用很低,只会在for循环时高一点(很短),但是,超过5万速度明显慢的比蜗牛还慢,cpu使用率高(几乎满掉),而且很久,速度直到publish回调函数成功完成到剩下的在四万个以内才会有变高,cpu使用率降低。 2.在message很小,就几字节的时候,for循环publish 10万或者20万,没有任何差别,速度高效。 3.上诉情况如果rabbitmq服务器在本地与nodejs生产者在同一台电脑,无论message大小为那种情况,都不会有问题,速度比远程调用速度快上六七倍。

【我的问题】:为什么当message为4k,rabbitmq在远程(另一台电脑上)会有一个5万的界限(即在这个界限两边速度差距这么大)?如何解决这个问题?求大神讲解?在线等…

【代码】: var amqp = require(“amqp”); var fs = require(‘fs’); var exchName = “topic”; //exhange 名称 var routeKey = “topic”; // 路由键

var connOptions = { host: ‘192.168.65.170’ , port: 5672 , login: ‘guest’ , password: ‘guest’ , authMechanism: ‘AMQPLAIN’ , vhost: ‘/’ , ssl: { enabled : false } }

var exchOption = { type: ‘topic’ ,durable: true ,autoDelete: false ,confirm: false }

var messOption = { contentEncoding: “utf-8” ,deliveryMode: 1 }

var message = fs.readFileSync(‘2.txt’).toString(‘utf-8’); var conn = amqp.createConnection(connOptions); //连接rabbitmq var n=100000; // 循环数

var messFunc = function(e){ }

var exchFunc = function(exchange){ now = new Date(); mill = now.getMilliseconds(); console.log(now,mill); //[此处是我问题的点] for (var i = 0; i <n; i++) { exchange.publish(routeKey,new Buffer(message),’’,messFunc); //发布消息 因为exchange属性confirm为false,此处不会回调messFunc } now = new Date(); mill = now.getMilliseconds(); console.log(now,mill); }

var connFunc = function(){ console.log(“ready”); var exch = conn.exchange(exchName,exchOption,exchFunc); //获取exchange 生成生产者 }

conn.on(‘ready’,connFunc); //rabbitmq连接成功调用connFunc


10 回复

【新手求学】Nodejs + RabbitMQ 使用 amqp(或者amqplib问题一样) 出现瓶颈求解

【内容描述】

使用 Node.js 作为生产者将消息发布到位于另一台计算机上的 RabbitMQ 服务器。

【目标】

发送一个 4KB 的文本消息,最短时间发送 10 万个消息。

【现象】

  1. 当消息大小为 4KB,并且一次性通过 for 循环发布 4 万个消息时,速度很快,CPU 使用率较低(仅在 for 循环期间较高,但时间很短)。然而,超过 5 万个消息后,速度显著下降,CPU 使用率显著升高,且需要很长时间才能完成。只有当剩余的消息数量降至 4 万个以内时,速度才会提升,CPU 使用率下降。
  2. 当消息较小(只有几个字节)时,即使循环发布 10 万个或 20 万个消息,速度也很快。
  3. 如果 RabbitMQ 服务器与 Node.js 生产者在同一台机器上,则无论消息大小如何,都不会出现上述问题,速度也会快得多。

【我的问题】

为什么当消息大小为 4KB 并且 RabbitMQ 服务器位于远程机器上时,会出现一个大约 5 万个消息的界限?如何解决这个问题?

【解决方案】

该问题主要由于网络延迟和 RabbitMQ 服务器处理大量消息的能力限制所致。为了解决此问题,可以采取以下措施:

  1. 异步发布消息:确保消息发布操作是异步的,以避免阻塞事件循环。
  2. 批量发布消息:而不是一次性发布所有消息,可以分批次发布消息,减少每次请求的数据量。

示例代码

const amqp = require('amqp');
const fs = require('fs');

// 配置 RabbitMQ 连接
const connOptions = {
    host: '192.168.65.170',
    port: 5672,
    login: 'guest',
    password: 'guest',
    authMechanism: 'AMQPLAIN',
    vhost: '/',
    ssl: {
        enabled: false
    }
};

// 定义交换机选项
const exchOption = {
    type: 'topic',
    durable: true,
    autoDelete: false,
    confirm: false
};

// 定义消息选项
const messOption = {
    contentEncoding: "utf-8",
    deliveryMode: 1
};

// 读取消息内容
const message = fs.readFileSync('2.txt').toString('utf-8');
const n = 100000; // 循环次数

// 异步发布消息
function publishMessage(exchange) {
    const batchSize = 1000;
    let count = 0;

    function publishBatch() {
        for (let i = 0; i < batchSize && count < n; i++) {
            exchange.publish('topic', new Buffer(message), '', () => {
                count++;
                if (count % batchSize === 0 || count === n) {
                    console.log(`Published ${count} messages`);
                }
            });
        }

        if (count < n) {
            setTimeout(publishBatch, 0); // 延迟一段时间继续发布下一批
        }
    }

    publishBatch();
}

// 连接 RabbitMQ 并创建交换机
const conn = amqp.createConnection(connOptions);

conn.on('ready', () => {
    console.log("RabbitMQ connection ready");
    const exch = conn.exchange('topic', exchOption, publishMessage);
});

解释

  1. 异步发布消息:通过 setTimeout 实现异步发布,避免阻塞事件循环。
  2. 批量发布消息:每次发布 1000 条消息,然后等待下一批次发布。这有助于减轻网络负载并提高整体性能。

我所说的速度慢,实在rabbimq检测到的消息接收到的速度

更新:此情况只会在windows系统下存在,在linux系统下就没有了~~

是不是windows的端口号耗尽,新的连接需要等新空出来的端口号的原因?

node http默认用了连接池,把它关掉可能会好些(agent:false)。

在linux下面直接100兆带宽直接跑满一点不剩…可能是系统的堆栈实现原理不同导致的吧。其实很想知道真正原因。

我测试了下貌似没有问题,我的环境如下: 3台linux+一台负载 ,配置不高,分别2G内存的虚拟机,镜像模式+持久化,客户端使用amqp.node

  • 第一次测试

每次发送5K消息打开channal 发送后关闭,发送80000 ,非常慢 ,每秒publish 20左右,等不起直接kill :( !!! ![第一次测试](http://d.pcs.baidu.com/thumbnail/6df9d19b6206e00e6e898e7ff0feb2ae?fid=2183602240-250528-229854399&time=1385520220&sign=FDTAER-DCb740ccc5511e5e8fedcff06b081203-ast2pYYt0F%2BrGMTEciuY5%2FWmk2c%3D&rt=sh&expires=8h&r=363038880&sharesign=unknown&size=c850_u580&quality=100"Optional title")

  • 第二次测试一次连接后循环发送10000, 共发送12万左右 5K ,结果如下:

( !!! !二er次测试](http://d.pcs.baidu.com/thumbnail/6bfe88de3371748a4b4dfb23bc01db13?fid=2183602240-250528-2972554457&time=1385520220&sign=FDTAER-DCb740ccc5511e5e8fedcff06b081203-qwenkVuf8IMFxnz5be75P8iOW%2F0%3D&rt=sh&expires=8h&r=392707045&sharesign=unknown&size=c850_u580&quality=100"Optional title")

回复内容无法修改??? 啊!!!

我最近也在用rabbitmq+node.js,很想和楼主讨论一下这方面的问题,呵呵

从你的描述来看,问题主要出现在网络延迟和批量处理效率上。当消息较大(如4K),并且RabbitMQ服务器位于另一台计算机上时,网络传输成为瓶颈。一次性的大量数据传输会导致网络拥塞,从而影响性能。

解决方案

1. 分批发送消息

为了提高效率并减少网络压力,可以考虑分批发送消息,而不是一次性发送大量消息。

const amqp = require('amqplib'); // 使用amqplib库
const fs = require('fs');

async function sendMessage() {
    const connection = await amqp.connect('amqp://guest:guest@192.168.65.170:5672/');
    const channel = await connection.createChannel();

    const exchangeName = 'topic';
    const routingKey = 'topic';
    const messageContent = fs.readFileSync('2.txt').toString('utf-8');
    const batchSize = 1000; // 每批发送的消息数量
    const totalMessages = 100000;

    for (let i = 0; i < totalMessages; i += batchSize) {
        const batch = [];
        for (let j = 0; j < batchSize && i + j < totalMessages; j++) {
            batch.push({ content: messageContent, index: i + j });
        }
        await Promise.all(batch.map(msg => channel.sendToQueue(routingKey, Buffer.from(msg.content))));
        console.log(`Sent batch ${Math.floor(i / batchSize) + 1} of ${Math.ceil(totalMessages / batchSize)}`);
    }

    channel.close();
    connection.close();
}

sendMessage().catch(console.error);

2. 异步处理回调

确保消息发送操作在异步环境中执行,并且处理每个消息的确认回调以避免阻塞。

代码解析

  • 分批发送:将消息分成小批次发送,这样可以减少单次发送的数据量,避免网络拥塞。
  • 异步处理:使用Promise.all来确保所有消息都已成功发送后再进行下一批次的发送。

通过这种方式,可以有效提升消息发送的速度,并降低CPU和网络的压力。

回到顶部