golang轻量级高性能物联网边缘规则引擎插件库rulego的使用

Golang轻量级高性能物联网边缘规则引擎插件库RuleGo的使用

RuleGo是一个基于Go语言构建的轻量级、高性能、嵌入式、可编排的组件化规则引擎。它可以帮助你快速构建松散耦合且灵活的系统,能够实时响应和适应业务需求的变化。

主要特性

  • 轻量级:无外部中间件依赖,在低成本设备上高效处理数据和联动,适合物联网边缘计算
  • 高性能:得益于Go的高性能特性,RuleGo还采用了协程池、对象池等技术
  • 双模式:嵌入式模式和独立部署模式
  • 组件化:所有业务逻辑都是组件化的,可以灵活配置和复用
  • 规则链:灵活组合和复用不同组件,实现高度定制化和可扩展的业务流程
  • 工作流编排:支持动态编排规则链组件,无需重启应用即可替换或添加业务逻辑

架构图

RuleGo Architecture Diagram

安装

使用go get命令安装RuleGo

go get github.com/rulego/rulego
# 或者
go get gitee.com/rulego/rulego

使用示例

1. 定义规则链JSON

首先需要定义一个规则链的JSON文件,例如chain_call_rest_api.json

2. 创建规则引擎实例

import "github.com/rulego/rulego"
// 加载规则链定义文件
ruleFile := fs.LoadFile("chain_call_rest_api.json")
// 使用规则链定义创建规则引擎实例
ruleEngine, err := rulego.New("rule01", ruleFile)

3. 处理消息

// 定义消息元数据
metaData := types.NewMetadata()
metaData.PutValue("productType", "test01")
// 定义消息体和消息类型
msg := types.NewMsg(0, "TELEMETRY_MSG", types.JSON, metaData, "{\"temperature\":35}")

// 将消息交给规则引擎处理
ruleEngine.OnMsg(msg)

动态更新规则链

RuleGo支持在不重启应用的情况下实时更新规则链逻辑:

// 动态更新规则链逻辑
err := ruleEngine.ReloadSelf(ruleFile)
// 更新规则链下的某个节点
ruleEngine.ReloadChild("node01", nodeFile)
// 获取规则链定义
ruleEngine.DSL()

规则引擎管理API

// 加载文件夹下所有规则链定义到规则引擎池
rulego.Load("/rules", rulego.WithConfig(config))
// 通过ID获取已经创建的规则引擎实例
ruleEngine, ok := rulego.Get("rule01")
// 删除已经创建的规则引擎实例
rulego.Del("rule01")

配置示例

// 创建默认配置
config := rulego.NewConfig()
// 调试节点回调函数,节点配置必须设置debugMode:true才能触发调用
// 节点进入和退出信息都会回调该函数
config.OnDebug = func (chainId,flowType string, nodeId string, msg types.RuleMsg, relationType string, err error) {
}
// 使用配置
ruleEngine, err := rulego.New("rule01", []byte(ruleFile), rulego.WithConfig(config))

完整示例

下面是一个完整的物联网边缘数据处理示例,包含温度监控和告警触发:

package main

import (
	"github.com/rulego/rulego"
	"github.com/rulego/rulego/types"
)

func main() {
	// 1. 定义规则链JSON
	ruleChain := `{
		"ruleChain": {
			"id":"rule01",
			"name": "温度监控规则链",
			"root": true
		},
		"metadata": {
			"nodes": [
				{
					"id": "s1",
					"type": "jsFilter",
					"name": "温度过滤器",
					"debugMode": true,
					"configuration": {
						"jsScript": "return msg.temperature > 30;"
					}
				},
				{
					"id": "s2",
					"type": "log",
					"name": "记录高温日志",
					"debugMode": true,
					"configuration": {
						"jsScript": "return '温度过高:' + msg.temperature;"
					}
				},
				{
					"id": "s3",
					"type": "restApiCall",
					"name": "触发告警API",
					"debugMode": true,
					"configuration": {
						"restEndpointUrlPattern": "http://alert-system/api/alerts",
						"requestMethod": "POST",
						"maxParallelRequestsCount": 200
					}
				}
			],
			"connections": [
				{
					"fromId": "s1",
					"toId": "s2",
					"type": "True"
				},
				{
					"fromId": "s1",
					"toId": "s3",
					"type": "True"
				}
			]
		}
	}`

	// 2. 创建规则引擎实例
	config := rulego.NewConfig()
	// 设置调试回调
	config.OnDebug = func(chainId, flowType string, nodeId string, msg types.RuleMsg, relationType string, err error) {
		// 这里可以记录调试日志
	}
	
	ruleEngine, err := rulego.New("rule01", []byte(ruleChain), rulego.WithConfig(config))
	if err != nil {
		panic(err)
	}

	// 3. 模拟设备温度数据
	metaData := types.NewMetadata()
	metaData.PutValue("deviceName", "sensor-001")
	metaData.PutValue("location", "room-101")
	
	// 正常温度消息
	normalMsg := types.NewMsg(0, "TELEMETRY_MSG", types.JSON, metaData, "{\"temperature\":25}")
	ruleEngine.OnMsg(normalMsg)
	
	// 高温消息
	highTempMsg := types.NewMsg(0, "TELEMETRY_MSG", types.JSON, metaData, "{\"temperature\":35}")
	ruleEngine.OnMsg(highTempMsg)
}

这个示例展示了如何:

  1. 定义一个包含温度过滤、日志记录和API调用的规则链
  2. 创建规则引擎实例
  3. 处理不同温度的消息
  4. 当温度超过30度时,会记录日志并调用告警API

性能

RuleGo在树莓派2(900MHz Cortex-A7*4,1GB LPDDR2)上的测试结果:

  • 数据大小: 260B
  • 规则链: JS脚本过滤->JS复杂转换->HTTP推送
  • 测试结果: 100并发和500并发,内存消耗变化不大,保持在19M左右

RuleGo是一个功能强大且高效的规则引擎,特别适合物联网边缘计算场景。通过组件化和规则链的设计,它可以灵活地处理各种业务逻辑,同时保持轻量级和高性能的特性。


更多关于golang轻量级高性能物联网边缘规则引擎插件库rulego的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang轻量级高性能物联网边缘规则引擎插件库rulego的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


RuleGo: Golang轻量级高性能物联网边缘规则引擎插件库使用指南

RuleGo是一个基于Golang的轻量级、高性能规则引擎库,专为物联网边缘计算场景设计。它允许开发者通过灵活的规则链来处理设备数据、实现业务逻辑,非常适合边缘计算场景下的数据处理和转发。

核心特性

  1. 轻量级:核心库体积小,适合资源受限的边缘设备
  2. 高性能:基于Golang的高并发特性,处理速度快
  3. 规则链:可视化规则编排,支持复杂业务逻辑
  4. 插件化:易于扩展自定义组件
  5. 多协议支持:MQTT、HTTP、CoAP等

安装

go get github.com/rulego/rulego

基础使用示例

1. 创建规则链

package main

import (
	"github.com/rulego/rulego"
	"github.com/rulego/rulego/api/types"
	"log"
)

func main() {
	// 初始化规则引擎
	config := rulego.NewConfig()
	ruleEngine, err := rulego.New("rule01", []byte(ruleChainFile), rulego.WithConfig(config))
	if err != nil {
		log.Fatal(err)
	}
	
	// 处理消息
	msg := types.NewMsg(0, "TEST_MSG", types.JSON, types.NewMetadata(), "{\"temperature\":25}")
	ruleEngine.OnMsg(msg)
}

// 示例规则链JSON定义
const ruleChainFile = `
{
  "ruleChain": {
    "name": "温度监控规则链",
    "root": true
  },
  "metadata": {
    "nodes": [
      {
        "id": "s1",
        "type": "jsFilter",
        "name": "温度过滤器",
        "configuration": {
          "jsScript": "return msg.temperature > 30;"
        }
      },
      {
        "id": "s2",
        "type": "log",
        "name": "记录高温日志",
        "configuration": {
          "msg": "温度过高: ${temperature}"
        }
      }
    ],
    "connections": [
      {
        "fromId": "s1",
        "toId": "s2",
        "type": "True"
      }
    ]
  }
}
`

2. 自定义处理节点

// 自定义处理节点
type UpperCaseNode struct{}

func (n *UpperCaseNode) New() types.Node {
	return &UpperCaseNode{}
}

func (n *UpperCaseNode) Type() string {
	return "test/upperCase"
}

func (n *UpperCaseNode) Init(ruleConfig types.Config, configuration types.Configuration) error {
	// 初始化逻辑
	return nil
}

func (n *UpperCaseNode) OnMsg(ctx types.RuleContext, msg types.RuleMsg) {
	// 处理消息逻辑
	if msg.DataType == types.JSON {
		// 示例:将JSON消息的value字段转为大写
		data := make(map[string]interface{})
		if err := json.Unmarshal([]byte(msg.Data), &data); err == nil {
			if val, ok := data["value"].(string); ok {
				data["value"] = strings.ToUpper(val)
				if newData, err := json.Marshal(data); err == nil {
					msg.Data = string(newData)
				}
			}
		}
	}
	// 传递给下一个节点
	ctx.TellSuccess(msg)
}

func (n *UpperCaseNode) Destroy() {
	// 清理逻辑
}

// 注册自定义节点
func main() {
	config := rulego.NewConfig()
	config.ComponentsRegistry.Register(&UpperCaseNode{})
	
	// 然后可以像使用内置节点一样使用自定义节点
}

3. MQTT集成示例

func main() {
	config := rulego.NewConfig()
	ruleEngine, err := rulego.New("rule01", []byte(ruleChainFile), rulego.WithConfig(config))
	if err != nil {
		log.Fatal(err)
	}

	// 创建MQTT客户端
	opts := mqtt.NewClientOptions().AddBroker("tcp://localhost:1883")
	client := mqtt.NewClient(opts)
	if token := client.Connect(); token.Wait() && token.Error() != nil {
		log.Fatal(token.Error())
	}

	// 订阅主题并处理消息
	client.Subscribe("sensor/data", 0, func(client mqtt.Client, message mqtt.Message) {
		msg := types.NewMsg(0, "MQTT_MSG", types.JSON, types.NewMetadata(), string(message.Payload()))
		ruleEngine.OnMsg(msg)
	})

	// 保持运行
	select {}
}

高级功能

1. 规则链动态更新

// 动态更新规则链
func updateRuleChain(ruleEngine types.RuleEngine, newRuleChain []byte) {
	if err := ruleEngine.Reload(newRuleChain); err != nil {
		log.Println("更新规则链失败:", err)
	} else {
		log.Println("规则链更新成功")
	}
}

2. 集群模式

RuleGo支持分布式部署,可以通过Redis等中间件实现节点间通信:

config := rulego.NewConfig()
config.OnDebug = func(flowType string, nodeId string, msg types.RuleMsg, relationType string, err error) {
	// 调试信息处理
}

// 使用Redis作为集群通信中间件
redisPool := &redis.Pool{
	MaxIdle:     10,
	IdleTimeout: 240 * time.Second,
	Dial: func() (redis.Conn, error) {
		return redis.Dial("tcp", "localhost:6379")
	},
}

config.Pool = redisPool

性能优化建议

  1. 合理设计规则链:避免过于复杂的规则链结构
  2. 批量处理:对批量消息进行聚合处理
  3. 资源复用:复用连接池、对象池等资源
  4. 异步处理:对耗时操作使用异步节点
  5. 监控调试:利用内置的调试接口优化性能瓶颈

总结

RuleGo为Golang物联网边缘计算场景提供了一个高效、灵活的规则引擎解决方案。通过规则链的可视化编排和自定义节点扩展,开发者可以快速构建复杂的边缘计算业务逻辑。其轻量级特性使其非常适合部署在资源受限的边缘设备上,而高性能设计则能够满足物联网场景下的高并发需求。

更多高级用法和详细文档可以参考RuleGo的GitHub仓库和官方文档。

回到顶部