Nodejs:用redis做消息(任务)队列的包——redis-as-queue
Nodejs:用redis做消息(任务)队列的包——redis-as-queue
较之于其它包有一个功能点不一样,就是分 Normal Queue 和 Unique Queue。Normal Queue 就是一个普通的先进先出队列,而 Unique Queue 就是一个唯一的队列,当我们往 Unique Queue 中插入一个已有的消息时,你可以自主选择是更新这个消息到队尾还是不做任何事情。
包名叫 redis-as-queue,仓库地址在:https://github.com/XadillaX/node-redis-as-queue,求 star,求 follow。
所以安装只需要:
$ npm install redis-as-queue
最后为了凑字数帖上流氓的 README:
Usage
There are two kinds of queue - normal queue and unique queue.
Each value in unique queue is unique. If a new value that equals any value in the queue, you may choose to update or not.
First you should require this package:
var raq = require("redis-as-queue");
Normal Queue
Create
var normalQueue = new raq.NormalQueue(QUEUE_NAME, [...]);
Note:
[...]
is the option(s) to connect redis server. Refer to node-redis document.Eg.
new raq.NormalQueue(QUEUE_NAME, 6379, '127.0.0.1', {}); new raq.NormalQueue(QUEUE_NAME, unix_socket, options); ...
Push
To push a message to your queue, you may use this function:
normalQueue.push("YOUR MESSAGE", function(err) {
console.log(err);
});
Get
To get message(s) from your redis queue, you may use get
function.
Get earlist one message
normalQueue.get(function(err, messages) {
console.log(err);
if(messages.length) console.log(messages[0]);
});
Get earlist N message(s)
// This call gets 10 earlist messages
normalQueue.get(10, function(err, messages) {
console.log(err);
for(var i = 0; i < messages.length; i++) console.log(messages[i]);
});
Get all message(s)
normalQueue.get(-1, function(err, messages) {
console.log(err);
for(var i = 0; i < messages.length; i++) console.log(messages[i]);
});
Remove
To remove message(s) from your redis queue, you may use removeAmount
function.
Remove earlist one message
normalQueue.removeAmount(function(err) {
console.log(err);
});
Remove earlist N message(s)
normalQueue.removeAmount(5, function(err) {
console.log(err);
});
Remove all the message(s)
normalQueue.removeAmount(-1, function(err) {
console.log(err);
});
Length
Get the length of current queue.
normalQueue.length(function(err, len) {
console.log(len);
});
Unique Queue
Each message in unique queue is unique.
Create
var uniqueQueue = new raq.UniqueQueue(QUEUE_NAME, [...]);
Note:
[...]
is the option(s) to connect redis server. Refer to node-redis document.Eg.
new raq.UniqueQueue(QUEUE_NAME, 6379, '127.0.0.1', {}); new raq.UniqueQueue(QUEUE_NAME, unix_socket, options); ...
Push
Push a message to the queue.
Need update
This call will check if your message is existing. If existing, it will move the previous message to the end of queue. Otherwise, your message will be pushed at the end of queue.
uniqueQueue.push("YOUR_MESSAGE", true, function(err) {
console.log(err);
});
Do not update
This call will check if your message is existing. If existing, it won’t do anything. Otherwise, your message will be pushed at the end of queue.
uniqueQueue.push("YOUR_MESSAGE", function(err) {
console.log(err);
});
// or you can do this
uniqueQueue.push(“YOUR_MESSAGE”, false, function(err) {
console.log(err);
});
Get
Get first message
uniqueQueue.get(function(err, messages) {
console.log(err);
if(messages.length) {
console.log(messages[0].message);
console.log(messages[0].updatedAt);
}
});
Get first N message(s)
uniqueQueue.get(3, function(err, messages) {
console.log(err);
for(var i = 0; i < messages.length; i++) {
console.log(messages[i].message);
console.log(messages[i].updatedAt);
}
});
Get all the message(s)
uniqueQueue.get(-1, function(err, messages) {
console.log(err);
for(var i = 0; i < messages.length; i++) {
console.log(messages[i].message);
console.log(messages[i].updatedAt);
}
});
Remove Amount
Refer to remove section in Normal Queue.
Remove Messages
Remove one or more certain message(s) in the queue.
uniqueQueue.get(10, function(err, messages) {
for(var i = 0; i < 5; i++) messages.shift();
// Remove Messages
uniqueQueue.removeMessages(messages, function(err, removeCount, notRemovedMessages) {
console.log(err);
console.log(removeCount);
for(var i = 0; i < notRemovedMessages.length; i++) console.log(notRemovedMessages[i].message);
});
});
Length
Refer to length section in Normal Queue.
Common Functions
Delete Queue
Delete this queue key and values in redis.
uniqueQueue.deleteQueue(function() {});
normalQueue.deleteQueue(function() {});
Destroy Queue Object
Let the queue object disconnect from redis server and let this instance can’t do any other thing any more.
uniqueQueue.destroy();
normalQueue.destroy();
Node.js: 使用 redis-as-queue
包实现 Redis 消息(任务)队列
本文介绍了一个名为 redis-as-queue
的 Node.js 包,用于通过 Redis 实现消息队列。该包支持两种类型的队列:普通队列(Normal Queue)和唯一队列(Unique Queue)。普通队列是一个典型的先进先出(FIFO)队列,而唯一队列则确保每个消息都是唯一的,并且在插入重复消息时可以选择更新或忽略。
安装
首先,你需要安装 redis-as-queue
包:
$ npm install redis-as-queue
使用 Normal Queue
创建 Normal Queue
const raq = require("redis-as-queue");
const normalQueue = new raq.NormalQueue("normal_queue", 6379, "127.0.0.1", {});
推送消息
normalQueue.push("Hello, World!", function(err) {
console.log(err); // 输出错误信息
});
获取消息
获取最早的一个消息:
normalQueue.get(function(err, messages) {
console.log(err); // 输出错误信息
if (messages.length) console.log(messages[0]); // 输出消息
});
获取最早的一批消息:
normalQueue.get(10, function(err, messages) {
console.log(err); // 输出错误信息
for (let i = 0; i < messages.length; i++) console.log(messages[i]); // 输出消息
});
获取所有消息:
normalQueue.get(-1, function(err, messages) {
console.log(err); // 输出错误信息
for (let i = 0; i < messages.length; i++) console.log(messages[i]); // 输出消息
});
删除消息
删除最早的一个消息:
normalQueue.removeAmount(function(err) {
console.log(err); // 输出错误信息
});
删除最早的一批消息:
normalQueue.removeAmount(5, function(err) {
console.log(err); // 输出错误信息
});
删除所有消息:
normalQueue.removeAmount(-1, function(err) {
console.log(err); // 输出错误信息
});
获取队列长度
normalQueue.length(function(err, len) {
console.log(len); // 输出队列长度
});
使用 Unique Queue
创建 Unique Queue
const uniqueQueue = new raq.UniqueQueue("unique_queue", 6379, "127.0.0.1", {});
推送消息
如果消息已存在,可以选择更新或不更新:
uniqueQueue.push("Hello, World!", true, function(err) {
console.log(err); // 输出错误信息
});
// 或者
uniqueQueue.push("Hello, World!", false, function(err) {
console.log(err); // 输出错误信息
});
获取消息
获取最早的一个消息:
uniqueQueue.get(function(err, messages) {
console.log(err); // 输出错误信息
if (messages.length) {
console.log(messages[0].message); // 输出消息内容
console.log(messages[0].updatedAt); // 输出更新时间
}
});
获取最早的一批消息:
uniqueQueue.get(3, function(err, messages) {
console.log(err); // 输出错误信息
for (let i = 0; i < messages.length; i++) {
console.log(messages[i].message); // 输出消息内容
console.log(messages[i].updatedAt); // 输出更新时间
}
});
获取所有消息:
uniqueQueue.get(-1, function(err, messages) {
console.log(err); // 输出错误信息
for (let i = 0; i < messages.length; i++) {
console.log(messages[i].message); // 输出消息内容
console.log(messages[i].updatedAt); // 输出更新时间
}
});
删除特定消息
删除一批特定的消息:
uniqueQueue.get(10, function(err, messages) {
for (let i = 0; i < 5; i++) messages.shift();
// 移除消息
uniqueQueue.removeMessages(messages, function(err, removeCount, notRemovedMessages) {
console.log(err); // 输出错误信息
console.log(removeCount); // 输出移除的消息数量
for (let i = 0; i < notRemovedMessages.length; i++) console.log(notRemovedMessages[i].message); // 输出未移除的消息
});
});
删除队列
uniqueQueue.deleteQueue(function() {}); // 删除队列
normalQueue.deleteQueue(function() {}); // 删除队列
销毁队列对象
uniqueQueue.destroy(); // 销毁队列对象
normalQueue.destroy(); // 销毁队列对象
以上代码展示了如何使用 redis-as-queue
包来创建和操作普通队列和唯一队列。希望这些示例能帮助你更好地理解和使用这个包。
顶!看了一下代码,之前也利用redis的list写过类似但是没有你这么全的queue,想问一下,如果我需要一次性push出来多个元素的需求,有什么好的方法吗,redis的lis貌似没有提供这样的方法。
nodejs本身是不是做一个队列呢?
var container=[]; onConnect(function(){ onData(‘push’,function(data){ container.push(data); }); onData(“pull”,function(){ return container.pop(); } });
uniqueQueue内部采用有序集合,顶一个
正在找这个,用用看。
路过,看到这个觉得还是不错啊,mark
数据入队列后, 取队列的话,怎么能矜夸监听到有数据了, 需要处理呢
希望楼主出现,看了代码,希望有些问题质疑,给个联系QQ啥的就好
Node.js: 使用 redis-as-queue
包实现消息队列
redis-as-queue
是一个基于 Redis 的消息队列库,它提供了两种类型的队列:Normal Queue 和 Unique Queue。
- Normal Queue 是一个普通的先进先出队列。
- Unique Queue 是一个唯一的队列,在插入重复消息时可以选择是否更新消息位置。
安装
首先,通过 npm 安装该包:
npm install redis-as-queue
示例代码
Normal Queue
创建 Normal 队列:
const raq = require('redis-as-queue');
const normalQueue = new raq.NormalQueue('myQueue', 6379, '127.0.0.1', {});
推送消息:
normalQueue.push('Hello, World!', function(err) {
if (err) console.error(err);
});
获取消息:
normalQueue.get(function(err, messages) {
if (err) console.error(err);
if (messages.length) console.log(messages[0]);
});
删除消息:
normalQueue.removeAmount(1, function(err) {
if (err) console.error(err);
});
删除整个队列:
normalQueue.deleteQueue(function() {});
Unique Queue
创建 Unique 队列:
const uniqueQueue = new raq.UniqueQueue('myUniqueQueue', 6379, '127.0.0.1', {});
推送消息(需要更新):
uniqueQueue.push('Hello, World!', true, function(err) {
if (err) console.error(err);
});
推送消息(不需要更新):
uniqueQueue.push('Hello, World!', false, function(err) {
if (err) console.error(err);
});
获取消息:
uniqueQueue.get(function(err, messages) {
if (err) console.error(err);
if (messages.length) console.log(messages[0].message);
});
删除特定消息:
uniqueQueue.get(5, function(err, messages) {
if (err) console.error(err);
uniqueQueue.removeMessages(messages, function(err, removeCount, notRemovedMessages) {
if (err) console.error(err);
console.log(removeCount);
for (let i = 0; i < notRemovedMessages.length; i++) {
console.log(notRemovedMessages[i].message);
}
});
});
删除整个队列:
uniqueQueue.deleteQueue(function() {});
以上示例展示了如何使用 redis-as-queue
包来实现普通和唯一的消息队列。