Nodejs:jsGen技术总结之在Node.js中构建redis同步缓存

Nodejs:jsGen技术总结之在Node.js中构建redis同步缓存

以前版本的jsGen直接利用Node.js的Buffer内存缓存数据,这样带来的一个问题是无法开启Cluster,多个Node.js进程的内存都是相互独立的,不能相互访问,不能及时更新数据变动。

新本(0.6.0)jsGen使用了第三方内存数据库redis作为缓存,如此以来多进程或多机运行jsGen成为可能。redis作为内存缓存的唯一缺陷就是——异步驱动,读取或写入数据都得callback!。

var myData;

redisCache.get(key, function (err, data) { // callback读取缓存数据 myData = data; });

redisCache.put(key, myData, function (err, reply) { // 写入缓存,callback确认写入结果 });

那么,有没有办法构建一个“同步”的redis缓存呢,使得读取、写入缓存像下面一样简单:

// 从缓存读取数据
var myData = redisCache.data;

// 往缓存写入数据
redisCache.data = myData;

redis同步缓存原理

我采用JavaScript的getter、setter和闭包构建了这个redis同步缓存

利用闭包创建一个缓存数据镜像,读取缓存时,getter从镜像读取;写入缓存时,setter把值写入镜像,再写入redis数据库。

如果开启多进程,缓存镜像仍然是分布在各个进程中,是相互独立的。如果一个进程更新了缓存数据,如何及时更新其它进程的缓存镜像呢?这就用到了redis的Pub/Sub系统,setter更新缓存时,更新数据写入数据库后,发布更新通知,其它redis进程收到通知就从redis数据库读取数据来更新镜像。

各进程的缓存虽然不是真正的同步更新,但也算及时更新了,可以满足一般业务需要。缺点是多消耗了一倍的内存。对于频繁访问更新的小数据,如config数据,很适合采用这个方案。下面是来自jsGen/lib/redia.js的源代码,通过一个config的json数据模板构建一个redis同步缓存的config对象,数据不但写入了redis数据库,还按照一定频率写入MongoDB数据库。

jsGen源代码片段

// clientSub:专用于订阅的redis client
// client[globalCacheDb]:存取数据的redis client
// 异步任务函数then及then.each,见 https://github.com/zensh/then.js

function initConfig(configTpl, callback) { var config = {}, // 新构建的config缓存 _config = union(configTpl), // 从configTpl克隆的config闭包镜像 subPubId = MD5(’’ + Date.now() + Math.random(), ‘base64’); // 本进程的唯一识别ID

callback = callback || callbackFn;

var update = throttle(function () {
    jsGen.dao.index.setGlobalConfig(_config);
}, 300000); // 将config写入MongoDB,每五分钟内最多执行一次

function updateKey(key) {
// 更新镜像的key键值
    return then(function (defer) {
        client[globalCacheDb].hget('config.hash', key, defer);
        // 从redis读取更新的数据
    }).then(function (defer, reply) {
        reply = JSON.parse(reply);
        _config[key] = typeof _config[key] === typeof reply ? reply : _config[key];
        // 数据写入config镜像
        defer(null, _config[key]);
    }).fail(errorHandler);
}

clientSub.on('message', function (channel, key) {
    var ID = key.slice(0, 24);
    key = key.slice(24);
    // 分离识别ID和key
    if (channel === 'updateConfig' && ID !== subPubId) {
    // 来自于updateConfig频道且不是本进程发出的更新通知
        if (key in _config) {
            updateKey(key); // 更新一个key
        } else {
            each(_config, function (value, key) { // 更新整个config镜像
                updateKey(key);
            });
        }
    }
});
clientSub.subscribe('updateConfig');
// 订阅updateConfig频道

each(configTpl, function (value, key) {
// 从configTpl模板构建getter/setter,利用Hash类型存储config
    Object.defineProperty(config, key, {
        set: function (value) {
            then(function (defer) {
                if ((value === 1 || value === -1) && typeof _config[key] === 'number') {
                    _config[key] += value;
                    // 按1递增或递减,更新镜像,再更新redis
                    client[globalCacheDb].hincrby('config.hash', key, value, defer);
                } else {
                    _config[key] = value;
                    // 因为redis存储字符串,下面先序列化。
                    client[globalCacheDb].hset('config.hash', key, JSON.stringify(value), defer);
                }
            }).then(function () {
            // redis数据库更新完成,向其他进程发出更新通知
                client[globalCacheDb].publish('updateConfig', subPubId + key);
            }).fail(jsGen.thenErrLog);
            update(); // 更新MongoDB
        },
        get: function () {
            return _config[key];
            // 从镜像读取数据
        },
        enumerable: true,
        configurable: true
    });
});
// 初始化config对象的值,如重启进程后,如果redis数据库原来存有数据,读取该数据
then.each(Object.keys(configTpl), function (next, key) {
    updateKey(key).then(function (defer, value) {
        return next ? next() : callback(null, config);
        // 异步返回新的config对象,已初始化数据值
    }).fail(function (defer, err) {
        callback(err);
    });
});
return config; // 同步返回新的config对象

}

初始化代码,详见jsGen/app.js

then(function (defer) {
    redis.initConfig(jsGen.lib.json.GlobalConfig, defer);
    // 异步初始化config缓存
}).then(function (defer, config) {
    jsGen.config = config;
    // config缓存引用到全局变量jsGen
    // ...
}).then(function (defer, config) {
    // ...
});

调用示例

// 从config缓存取配置值并new一个LRU缓存
jsGen.cache.user = new CacheLRU(jsGen.config.userCache);

// 更新网站访问次数
jsGen.config.visitors = 1; // 网站访问次数+1

2 回复

Nodejs:jsGen技术总结之在Node.js中构建redis同步缓存

问题背景

以前版本的jsGen使用Node.js的Buffer内存缓存数据,这种方式存在明显的局限性:无法支持集群模式,因为每个Node.js进程的内存是独立的,无法实现跨进程的数据共享和即时更新。

解决方案

新版jsGen(0.6.0)采用了Redis作为缓存,解决了上述问题。然而,Redis的异步特性使得读取和写入操作必须通过回调函数来处理。这带来了编程上的复杂性,特别是在需要同步操作时。

目标

我们希望构建一个“同步”的Redis缓存机制,使得读取和写入操作能够像操作普通对象一样简单:

// 从缓存读取数据
var myData = redisCache.data;

// 往缓存写入数据
redisCache.data = myData;

实现原理

为了实现这种同步效果,我们利用JavaScript的getter和setter以及闭包来封装Redis操作。具体步骤如下:

  1. 创建缓存数据镜像:在内存中维护一份数据镜像,用于快速读取数据。
  2. getter和setter:使用getter读取镜像中的数据,使用setter将数据写入镜像,并同步写入Redis。
  3. Pub/Sub机制:利用Redis的发布/订阅功能,确保当一个进程更新数据时,其他进程能及时更新其缓存镜像。

示例代码

const redis = require('redis');
const client = redis.createClient();
const clientSub = redis.createClient();

function initConfig(configTpl, callback) {
    const config = {};
    const _config = union(configTpl);
    const subPubId = MD5('' + Date.now() + Math.random(), 'base64');

    callback = callback || callbackFn;

    let update = throttle(function () {
        jsGen.dao.index.setGlobalConfig(_config);
    }, 300000); // 每五分钟写入一次MongoDB

    function updateKey(key) {
        return then(function (defer) {
            client.hget('config.hash', key, defer);
        }).then(function (defer, reply) {
            reply = JSON.parse(reply);
            _config[key] = typeof _config[key] === typeof reply ? reply : _config[key];
            defer(null, _config[key]);
        }).fail(errorHandler);
    }

    clientSub.on('message', function (channel, key) {
        const ID = key.slice(0, 24);
        key = key.slice(24);
        if (channel === 'updateConfig' && ID !== subPubId) {
            if (key in _config) {
                updateKey(key);
            } else {
                each(_config, function (value, key) {
                    updateKey(key);
                });
            }
        }
    });

    clientSub.subscribe('updateConfig');

    each(configTpl, function (value, key) {
        Object.defineProperty(config, key, {
            set: function (value) {
                then(function (defer) {
                    if ((value === 1 || value === -1) && typeof _config[key] === 'number') {
                        _config[key] += value;
                        client.hincrby('config.hash', key, value, defer);
                    } else {
                        _config[key] = value;
                        client.hset('config.hash', key, JSON.stringify(value), defer);
                    }
                }).then(function () {
                    client.publish('updateConfig', subPubId + key);
                }).fail(jsGen.thenErrLog);
                update();
            },
            get: function () {
                return _config[key];
            },
            enumerable: true,
            configurable: true
        });
    });

    then.each(Object.keys(configTpl), function (next, key) {
        updateKey(key).then(function (defer, value) {
            return next ? next() : callback(null, config);
        }).fail(function (defer, err) {
            callback(err);
        });
    });

    return config;
}

// 初始化代码
then(function (defer) {
    redis.initConfig(jsGen.lib.json.GlobalConfig, defer);
}).then(function (defer, config) {
    jsGen.config = config;
}).then(function (defer, config) {
    // 其他逻辑
});

// 调用示例
jsGen.cache.user = new CacheLRU(jsGen.config.userCache);
jsGen.config.visitors = 1; // 网站访问次数+1

总结

通过上述方法,我们可以构建一个简单的“同步”Redis缓存机制,使得在Node.js应用中使用Redis缓存更加方便和高效。


在Node.js中构建Redis同步缓存的关键在于使用JavaScript的getter和setter特性以及闭包机制,从而实现对Redis缓存操作的同步化。通过这种方式,你可以让读取和写入缓存的操作看起来像操作普通的JavaScript对象一样简单。

原理说明

  • 闭包:用来存储Redis缓存数据的镜像副本,以便可以在不阻塞的情况下提供同步访问。
  • Getter和Setter:通过定义对象属性的getter和setter,实现了数据读取和写入时的同步行为。
  • Pub/Sub:当一个进程更新了缓存,它会通过Redis的发布/订阅功能通知其他进程更新它们的缓存镜像。

示例代码

const redis = require('redis');
const client = redis.createClient();
const clientSub = redis.createClient();

function createSyncRedisCache(initialData) {
    const cacheMirror = {...initialData};
    
    function getValue(key) {
        return cacheMirror[key];
    }

    function setValue(key, value) {
        cacheMirror[key] = value;
        client.hset('cache:mirror', key, JSON.stringify(value));
        
        client.publish('updateCache', `${subPubId}${key}`);
        updateMongoDB();
    }

    function updateMongoDB() {
        // 模拟写入MongoDB的过程
        console.log("Data updated in MongoDB");
    }

    client.on('message', (channel, message) => {
        if (channel === 'updateCache' && message.startsWith(subPubId)) {
            const key = message.substring(24);
            client.hget('cache:mirror', key, (err, reply) => {
                if (reply) {
                    cacheMirror[key] = JSON.parse(reply);
                }
            });
        }
    });

    client.subscribe('updateCache');

    return new Proxy({}, {
        get(target, prop) {
            return getValue(prop);
        },
        set(target, prop, value) {
            setValue(prop, value);
            return true;
        }
    });
}

const config = createSyncRedisCache({visitors: 0, userCache: 100});

console.log(config.visitors); // 0
config.visitors = 1; // 更新网站访问次数
console.log(config.visitors); // 1

这段代码展示了如何通过Proxy对象和Redis结合,模拟一个同步的缓存层。注意,实际应用中需要处理错误和异常情况,并且可能需要更复杂的逻辑来确保数据一致性。

回到顶部