golang轻量级高性能物联网边缘规则引擎插件库rulego的使用
Golang轻量级高性能物联网边缘规则引擎插件库RuleGo的使用
RuleGo是一个基于Go语言构建的轻量级、高性能、嵌入式、可编排的组件化规则引擎。它可以帮助你快速构建松散耦合且灵活的系统,能够实时响应和适应业务需求的变化。
主要特性
- 轻量级:无外部中间件依赖,在低成本设备上高效处理数据和联动,适合物联网边缘计算
- 高性能:得益于Go的高性能特性,RuleGo还采用了协程池、对象池等技术
- 双模式:嵌入式模式和独立部署模式
- 组件化:所有业务逻辑都是组件化的,可以灵活配置和复用
- 规则链:灵活组合和复用不同组件,实现高度定制化和可扩展的业务流程
- 工作流编排:支持动态编排规则链组件,无需重启应用即可替换或添加业务逻辑
架构图
安装
使用go get
命令安装RuleGo
:
go get github.com/rulego/rulego
# 或者
go get gitee.com/rulego/rulego
使用示例
1. 定义规则链JSON
首先需要定义一个规则链的JSON文件,例如chain_call_rest_api.json
。
2. 创建规则引擎实例
import "github.com/rulego/rulego"
// 加载规则链定义文件
ruleFile := fs.LoadFile("chain_call_rest_api.json")
// 使用规则链定义创建规则引擎实例
ruleEngine, err := rulego.New("rule01", ruleFile)
3. 处理消息
// 定义消息元数据
metaData := types.NewMetadata()
metaData.PutValue("productType", "test01")
// 定义消息体和消息类型
msg := types.NewMsg(0, "TELEMETRY_MSG", types.JSON, metaData, "{\"temperature\":35}")
// 将消息交给规则引擎处理
ruleEngine.OnMsg(msg)
动态更新规则链
RuleGo支持在不重启应用的情况下实时更新规则链逻辑:
// 动态更新规则链逻辑
err := ruleEngine.ReloadSelf(ruleFile)
// 更新规则链下的某个节点
ruleEngine.ReloadChild("node01", nodeFile)
// 获取规则链定义
ruleEngine.DSL()
规则引擎管理API
// 加载文件夹下所有规则链定义到规则引擎池
rulego.Load("/rules", rulego.WithConfig(config))
// 通过ID获取已经创建的规则引擎实例
ruleEngine, ok := rulego.Get("rule01")
// 删除已经创建的规则引擎实例
rulego.Del("rule01")
配置示例
// 创建默认配置
config := rulego.NewConfig()
// 调试节点回调函数,节点配置必须设置debugMode:true才能触发调用
// 节点进入和退出信息都会回调该函数
config.OnDebug = func (chainId,flowType string, nodeId string, msg types.RuleMsg, relationType string, err error) {
}
// 使用配置
ruleEngine, err := rulego.New("rule01", []byte(ruleFile), rulego.WithConfig(config))
完整示例
下面是一个完整的物联网边缘数据处理示例,包含温度监控和告警触发:
package main
import (
"github.com/rulego/rulego"
"github.com/rulego/rulego/types"
)
func main() {
// 1. 定义规则链JSON
ruleChain := `{
"ruleChain": {
"id":"rule01",
"name": "温度监控规则链",
"root": true
},
"metadata": {
"nodes": [
{
"id": "s1",
"type": "jsFilter",
"name": "温度过滤器",
"debugMode": true,
"configuration": {
"jsScript": "return msg.temperature > 30;"
}
},
{
"id": "s2",
"type": "log",
"name": "记录高温日志",
"debugMode": true,
"configuration": {
"jsScript": "return '温度过高:' + msg.temperature;"
}
},
{
"id": "s3",
"type": "restApiCall",
"name": "触发告警API",
"debugMode": true,
"configuration": {
"restEndpointUrlPattern": "http://alert-system/api/alerts",
"requestMethod": "POST",
"maxParallelRequestsCount": 200
}
}
],
"connections": [
{
"fromId": "s1",
"toId": "s2",
"type": "True"
},
{
"fromId": "s1",
"toId": "s3",
"type": "True"
}
]
}
}`
// 2. 创建规则引擎实例
config := rulego.NewConfig()
// 设置调试回调
config.OnDebug = func(chainId, flowType string, nodeId string, msg types.RuleMsg, relationType string, err error) {
// 这里可以记录调试日志
}
ruleEngine, err := rulego.New("rule01", []byte(ruleChain), rulego.WithConfig(config))
if err != nil {
panic(err)
}
// 3. 模拟设备温度数据
metaData := types.NewMetadata()
metaData.PutValue("deviceName", "sensor-001")
metaData.PutValue("location", "room-101")
// 正常温度消息
normalMsg := types.NewMsg(0, "TELEMETRY_MSG", types.JSON, metaData, "{\"temperature\":25}")
ruleEngine.OnMsg(normalMsg)
// 高温消息
highTempMsg := types.NewMsg(0, "TELEMETRY_MSG", types.JSON, metaData, "{\"temperature\":35}")
ruleEngine.OnMsg(highTempMsg)
}
这个示例展示了如何:
- 定义一个包含温度过滤、日志记录和API调用的规则链
- 创建规则引擎实例
- 处理不同温度的消息
- 当温度超过30度时,会记录日志并调用告警API
性能
RuleGo在树莓派2(900MHz Cortex-A7*4,1GB LPDDR2)上的测试结果:
- 数据大小: 260B
- 规则链: JS脚本过滤->JS复杂转换->HTTP推送
- 测试结果: 100并发和500并发,内存消耗变化不大,保持在19M左右
RuleGo是一个功能强大且高效的规则引擎,特别适合物联网边缘计算场景。通过组件化和规则链的设计,它可以灵活地处理各种业务逻辑,同时保持轻量级和高性能的特性。
更多关于golang轻量级高性能物联网边缘规则引擎插件库rulego的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang轻量级高性能物联网边缘规则引擎插件库rulego的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
RuleGo: Golang轻量级高性能物联网边缘规则引擎插件库使用指南
RuleGo是一个基于Golang的轻量级、高性能规则引擎库,专为物联网边缘计算场景设计。它允许开发者通过灵活的规则链来处理设备数据、实现业务逻辑,非常适合边缘计算场景下的数据处理和转发。
核心特性
- 轻量级:核心库体积小,适合资源受限的边缘设备
- 高性能:基于Golang的高并发特性,处理速度快
- 规则链:可视化规则编排,支持复杂业务逻辑
- 插件化:易于扩展自定义组件
- 多协议支持:MQTT、HTTP、CoAP等
安装
go get github.com/rulego/rulego
基础使用示例
1. 创建规则链
package main
import (
"github.com/rulego/rulego"
"github.com/rulego/rulego/api/types"
"log"
)
func main() {
// 初始化规则引擎
config := rulego.NewConfig()
ruleEngine, err := rulego.New("rule01", []byte(ruleChainFile), rulego.WithConfig(config))
if err != nil {
log.Fatal(err)
}
// 处理消息
msg := types.NewMsg(0, "TEST_MSG", types.JSON, types.NewMetadata(), "{\"temperature\":25}")
ruleEngine.OnMsg(msg)
}
// 示例规则链JSON定义
const ruleChainFile = `
{
"ruleChain": {
"name": "温度监控规则链",
"root": true
},
"metadata": {
"nodes": [
{
"id": "s1",
"type": "jsFilter",
"name": "温度过滤器",
"configuration": {
"jsScript": "return msg.temperature > 30;"
}
},
{
"id": "s2",
"type": "log",
"name": "记录高温日志",
"configuration": {
"msg": "温度过高: ${temperature}"
}
}
],
"connections": [
{
"fromId": "s1",
"toId": "s2",
"type": "True"
}
]
}
}
`
2. 自定义处理节点
// 自定义处理节点
type UpperCaseNode struct{}
func (n *UpperCaseNode) New() types.Node {
return &UpperCaseNode{}
}
func (n *UpperCaseNode) Type() string {
return "test/upperCase"
}
func (n *UpperCaseNode) Init(ruleConfig types.Config, configuration types.Configuration) error {
// 初始化逻辑
return nil
}
func (n *UpperCaseNode) OnMsg(ctx types.RuleContext, msg types.RuleMsg) {
// 处理消息逻辑
if msg.DataType == types.JSON {
// 示例:将JSON消息的value字段转为大写
data := make(map[string]interface{})
if err := json.Unmarshal([]byte(msg.Data), &data); err == nil {
if val, ok := data["value"].(string); ok {
data["value"] = strings.ToUpper(val)
if newData, err := json.Marshal(data); err == nil {
msg.Data = string(newData)
}
}
}
}
// 传递给下一个节点
ctx.TellSuccess(msg)
}
func (n *UpperCaseNode) Destroy() {
// 清理逻辑
}
// 注册自定义节点
func main() {
config := rulego.NewConfig()
config.ComponentsRegistry.Register(&UpperCaseNode{})
// 然后可以像使用内置节点一样使用自定义节点
}
3. MQTT集成示例
func main() {
config := rulego.NewConfig()
ruleEngine, err := rulego.New("rule01", []byte(ruleChainFile), rulego.WithConfig(config))
if err != nil {
log.Fatal(err)
}
// 创建MQTT客户端
opts := mqtt.NewClientOptions().AddBroker("tcp://localhost:1883")
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
log.Fatal(token.Error())
}
// 订阅主题并处理消息
client.Subscribe("sensor/data", 0, func(client mqtt.Client, message mqtt.Message) {
msg := types.NewMsg(0, "MQTT_MSG", types.JSON, types.NewMetadata(), string(message.Payload()))
ruleEngine.OnMsg(msg)
})
// 保持运行
select {}
}
高级功能
1. 规则链动态更新
// 动态更新规则链
func updateRuleChain(ruleEngine types.RuleEngine, newRuleChain []byte) {
if err := ruleEngine.Reload(newRuleChain); err != nil {
log.Println("更新规则链失败:", err)
} else {
log.Println("规则链更新成功")
}
}
2. 集群模式
RuleGo支持分布式部署,可以通过Redis等中间件实现节点间通信:
config := rulego.NewConfig()
config.OnDebug = func(flowType string, nodeId string, msg types.RuleMsg, relationType string, err error) {
// 调试信息处理
}
// 使用Redis作为集群通信中间件
redisPool := &redis.Pool{
MaxIdle: 10,
IdleTimeout: 240 * time.Second,
Dial: func() (redis.Conn, error) {
return redis.Dial("tcp", "localhost:6379")
},
}
config.Pool = redisPool
性能优化建议
- 合理设计规则链:避免过于复杂的规则链结构
- 批量处理:对批量消息进行聚合处理
- 资源复用:复用连接池、对象池等资源
- 异步处理:对耗时操作使用异步节点
- 监控调试:利用内置的调试接口优化性能瓶颈
总结
RuleGo为Golang物联网边缘计算场景提供了一个高效、灵活的规则引擎解决方案。通过规则链的可视化编排和自定义节点扩展,开发者可以快速构建复杂的边缘计算业务逻辑。其轻量级特性使其非常适合部署在资源受限的边缘设备上,而高性能设计则能够满足物联网场景下的高并发需求。
更多高级用法和详细文档可以参考RuleGo的GitHub仓库和官方文档。