golang轻量级物联网边缘数据流处理引擎插件库ekuiper的使用
Golang轻量级物联网边缘数据流处理引擎插件库eKuiper的使用
概述
LF Edge eKuiper是一个运行在资源受限边缘设备上的轻量级物联网数据分析和流处理引擎。eKuiper的主要目标是在边缘侧提供一个流式软件框架(类似于Apache Flink)。eKuiper的规则引擎允许用户提供基于SQL或基于图形(类似于Node-RED)的规则,在几分钟内创建物联网边缘分析应用。
主要特性
- 轻量级:核心服务器包仅约4.5MB,内存占用约10MB
- 跨平台:支持X86、ARM、PPC等多种CPU架构
- 数据分析支持:
- 支持数据ETL
- 支持数据排序、分组、聚合和连接不同数据源
- 60+内置函数
- 4种时间窗口和计数窗口
- 高度可扩展:支持通过Golang或Python扩展Source、Functions和Sink
- 管理:提供CLI、REST API和Kubernetes配置映射管理
- 与EMQX产品集成:与EMQX、Neuron和NanoMQ无缝集成
快速开始
安装
# 使用Docker快速启动
docker run -p 9081:9081 -d lfedge/ekuiper:latest
示例代码
以下是一个完整的Golang示例,展示如何使用eKuiper处理物联网数据流:
package main
import (
"fmt"
"log"
"time"
"github.com/lf-edge/ekuiper/sdk/go/api"
)
func main() {
// 1. 创建流处理规则
rule := `
{
"id": "rule1",
"sql": "SELECT temperature, humidity FROM demo WHERE temperature > 30",
"actions": [
{
"log": {}
},
{
"mqtt": {
"server": "tcp://broker.emqx.io:1883",
"topic": "result/alert"
}
}
]
}`
// 2. 创建eKuiper客户端
client, err := api.NewLocalClient()
if err != nil {
log.Fatal(err)
}
// 3. 创建流
err = client.CreateStream(&api.Stream{
Name: "demo",
Stmt: "CREATE STREAM demo (temperature FLOAT, humidity FLOAT) WITH (DATASOURCE=\"demo\")",
})
if err != nil {
log.Fatal(err)
}
// 4. 创建规则
err = client.CreateRule("rule1", rule)
if err != nil {
log.Fatal(err)
}
// 5. 启动规则
err = client.StartRule("rule1")
if err != nil {
log.Fatal(err)
}
fmt.Println("规则已启动,正在处理数据...")
// 保持程序运行
for {
time.Sleep(1 * time.Second)
}
}
使用场景
eKuiper可以运行在各种物联网边缘场景中,例如:
- 工业物联网(IIoT)中生产线的实时数据处理
- 车联网(IoV)中网关分析来自CAN的数据
- 智能能源中风力涡轮机和智能大容量储能数据的实时分析
性能测试结果
MQTT吞吐量测试
设备 | 每秒消息数 | CPU使用率 | 内存使用 |
---|---|---|---|
Raspberry Pi 3B+ | 12k | sys+user: 70% | 20MB |
AWS t2.micro(1核*1GB) | 10k | sys+user: 25% | 20MB |
最大规则支持
- 8000条规则,总共800条消息/秒
- 配置:AWS 2核*4GB内存
- 资源使用:
- 内存:89%~72%
- CPU:25%
- 每条规则400KB-500KB
扩展开发
eKuiper支持通过Golang或Python扩展Source、Functions和Sink。以下是一个Golang扩展示例:
package main
import (
"fmt"
"github.com/lf-edge/ekuiper/sdk/go/api"
"github.com/lf-edge/ekuiper/sdk/go/function"
)
// 自定义函数
type myFunc struct{}
func (f *myFunc) Validate(args []interface{}) error {
if len(args) != 1 {
return fmt.Errorf("myFunc expects exactly 1 argument")
}
return nil
}
func (f *myFunc) Exec(args []interface{}, ctx api.FunctionContext) (interface{}, bool) {
val, ok := args[0].(float64)
if !ok {
return fmt.Errorf("arg must be a number"), false
}
return val * 2, true
}
func (f *myFunc) IsAggregate() bool {
return false
}
// 插件入口
func MyFunc() function.Function {
return &myFunc{}
}
编译并安装插件后,可以在SQL规则中使用自定义函数:
SELECT myFunc(temperature) FROM demo
总结
eKuiper是一个功能强大且轻量级的边缘流处理引擎,特别适合资源受限的物联网边缘设备。通过简单的SQL规则或扩展插件,可以快速实现复杂的数据处理逻辑。
更多关于golang轻量级物联网边缘数据流处理引擎插件库ekuiper的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang轻量级物联网边缘数据流处理引擎插件库ekuiper的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang轻量级物联网边缘数据流处理引擎插件库eKuiper使用指南
eKuiper是EMQ公司开发的一款轻量级物联网边缘数据流处理引擎,专为边缘计算场景设计。它可以在资源受限的设备上运行,提供类似SQL的语法进行数据流处理。
1. eKuiper简介
eKuiper主要特点:
- 轻量级:核心二进制文件仅约7MB
- 高性能:单核可处理每秒数千条消息
- 支持SQL语法:简化流处理逻辑开发
- 插件扩展:支持自定义函数、源和目标
- 跨平台:支持x86、ARM等多种架构
2. 安装eKuiper
2.1 直接下载二进制
# 下载最新版本
wget https://github.com/lf-edge/ekuiper/releases/download/1.8.0/kuiper-1.8.0-linux-amd64.zip
unzip kuiper-1.8.0-linux-amd64.zip
2.2 使用Docker
docker pull lfedge/ekuiper:1.8.0
docker run -p 9081:9081 -d --name kuiper lfedge/ekuiper:1.8.0
3. 基本使用示例
3.1 创建流
# 通过REST API创建流
curl -X POST \
http://localhost:9081/streams \
-H 'Content-Type: application/json' \
-d '{
"sql": "CREATE STREAM demoStream (temperature float, humidity float) WITH (DATASOURCE=\"demo\", FORMAT=\"JSON\")"
}'
3.2 创建规则
curl -X POST \
http://localhost:9081/rules \
-H 'Content-Type: application/json' \
-d '{
"id": "rule1",
"sql": "SELECT temperature, humidity FROM demoStream WHERE temperature > 30",
"actions": [{
"log": {}
}]
}'
4. Golang集成示例
4.1 使用eKuiper Go SDK
package main
import (
"fmt"
"github.com/lf-edge/ekuiper/sdk/go/api"
)
func main() {
// 创建eKuiper实例
cfg := api.NewConfig()
cfg.Broker = "tcp://localhost:1883"
cfg.Port = 9081
// 初始化规则管理器
ruleMgr, err := api.NewRuleManager(cfg)
if err != nil {
panic(err)
}
// 创建流
streamSql := `CREATE STREAM deviceStream (
deviceId string,
temperature float,
humidity float
) WITH (DATASOURCE="devices/+/data", FORMAT="JSON")`
err = ruleMgr.ExecStmt(streamSql)
if err != nil {
fmt.Printf("Create stream error: %v\n", err)
}
// 创建规则
rule := api.NewRule("highTempAlert")
rule.Sql = `SELECT deviceId, temperature, humidity
FROM deviceStream
WHERE temperature > 30`
rule.Actions = []map[string]interface{}{
{
"log": map[string]interface{}{},
},
{
"mqtt": map[string]interface{}{
"server": "tcp://localhost:1883",
"topic": "alerts/high_temp",
},
},
}
err = ruleMgr.CreateRule(rule)
if err != nil {
fmt.Printf("Create rule error: %v\n", err)
}
// 启动规则
err = ruleMgr.StartRule("highTempAlert")
if err != nil {
fmt.Printf("Start rule error: %v\n", err)
}
fmt.Println("Rule started successfully")
}
4.2 自定义函数插件
package main
import (
"fmt"
"github.com/lf-edge/ekuiper/sdk/go/function"
"github.com/lf-edge/ekuiper/sdk/go/context"
)
type celsiusToFahrenheit struct{}
func (f *celsiusToFahrenheit) Validate(args []interface{}) error {
if len(args) != 1 {
return fmt.Errorf("celsiusToFahrenheit function only accepts 1 parameter")
}
return nil
}
func (f *celsiusToFahrenheit) Exec(args []interface{}, ctx api.FunctionContext) (interface{}, bool) {
c, ok := args[0].(float64)
if !ok {
return fmt.Errorf("parameter must be float64"), false
}
return c*1.8 + 32, true
}
func (f *celsiusToFahrenheit) IsAggregate() bool {
return false
}
func main() {
// 注册函数
err := function.Register("celsiusToFahrenheit", &celsiusToFahrenheit{})
if err != nil {
panic(err)
}
// 保持运行
select {}
}
5. 高级功能
5.1 窗口操作
-- 滚动窗口(每5秒)
SELECT avg(temperature) FROM demoStream GROUP BY TUMBLINGWINDOW(ss, 5)
-- 滑动窗口(每5秒滑动1秒)
SELECT avg(temperature) FROM demoStream GROUP BY HOPPINGWINDOW(ss, 5, 1)
-- 会话窗口(超时5秒)
SELECT avg(temperature) FROM demoStream GROUP BY SESSIONWINDOW(ss, 5)
5.2 多流连接
SELECT d.temperature, l.location
FROM deviceStream AS d
LEFT JOIN locationStream AS l ON d.deviceId = l.deviceId
6. 性能优化建议
- 合理使用窗口:避免过大的窗口导致内存消耗过高
- 减少不必要字段:在SELECT中只选择需要的字段
- 使用过滤条件:尽早过滤掉不需要的数据
- 合理部署插件:将计算密集型插件部署在性能更好的节点上
- 监控规则状态:定期检查规则执行状态和资源使用情况
7. 总结
eKuiper作为轻量级边缘流处理引擎,非常适合物联网场景下的实时数据处理。通过Golang SDK可以方便地集成到现有系统中,其插件机制也提供了良好的扩展性。在实际应用中,可以根据具体场景选择合适的窗口类型和数据处理逻辑,以达到最佳的性能和效果。
更多详细信息和示例可以参考官方文档。