golang轻量级物联网流处理引擎插件sensorbee的使用

Golang轻量级物联网流处理引擎插件SensorBee的使用

项目状态

该项目已停止维护。

Build Status Coverage Status

SensorBee简介

SensorBee是一个轻量级的物联网流处理引擎,专为IoT场景设计。

使用示例

下面是一个简单的SensorBee使用示例,展示如何创建一个简单的流处理管道:

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/sensorbee/sensorbee/bql"
	"github.com/sensorbee/sensorbee/core"
	"github.com/sensorbee/sensorbee/io"
)

func main() {
	// 1. 创建一个新的SensorBee上下文
	ctx := core.NewContext(context.Background())

	// 2. 创建一个内存数据源
	source, err := io.NewMemSource(ctx, &io.MemSourceConfig{
		Name: "test_source",
	})
	if err != nil {
		panic(err)
	}

	// 3. 创建一个BQL查询处理器
	bqlStr := `
		CREATE STREAM test_stream AS
		SELECT RSTREAM * FROM test_source [RANGE 5 SECONDS]
	`
	parser := bql.NewParser()
	stmts, _, err := parser.Parse(bqlStr)
	if err != nil {
		panic(err)
	}

	// 4. 执行BQL语句
	for _, stmt := range stmts {
		if _, err := stmt.Execute(ctx); err != nil {
			panic(err)
		}
	}

	// 5. 向数据源写入测试数据
	go func() {
		for i := 0; i < 10; i++ {
			data := map[string]interface{}{
				"value": i,
				"timestamp": time.Now(),
			}
			if err := source.Write(ctx, core.NewTuple(data)); err != nil {
				fmt.Println("写入数据失败:", err)
			}
			time.Sleep(1 * time.Second)
		}
	}()

	// 6. 从流中读取数据
	stream := ctx.GetStream("test_stream")
	for {
		tuple, err := stream.Next(ctx)
		if err != nil {
			fmt.Println("读取数据结束:", err)
			break
		}
		fmt.Println("收到数据:", tuple.Data)
	}
}

示例说明

  1. 首先创建了一个SensorBee上下文,这是所有操作的基础
  2. 创建了一个内存数据源(MemSource),用于模拟IoT设备产生的数据
  3. 使用BQL(类似SQL的查询语言)定义了一个流处理管道
  4. 执行BQL语句创建流处理拓扑
  5. 模拟IoT设备向数据源写入数据
  6. 从处理后的流中读取并打印数据

注意事项

由于该项目已停止维护,建议在生产环境中谨慎使用。如需类似功能的流处理引擎,可以考虑其他活跃的Golang项目如Benthos或GoFlow等。

如需了解更多细节,请参考项目文档。


更多关于golang轻量级物联网流处理引擎插件sensorbee的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang轻量级物联网流处理引擎插件sensorbee的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


SensorBee:Golang轻量级物联网流处理引擎

SensorBee是一个用Go语言编写的轻量级流处理引擎,专为物联网(IoT)场景设计。它提供了简单易用的API来处理实时数据流,特别适合边缘计算和物联网设备数据处理。

SensorBee核心特性

  1. 轻量级:专为资源受限环境设计
  2. 可扩展:支持自定义插件和UDF(用户定义函数)
  3. 易用API:简化流处理任务
  4. 支持多种数据源:MQTT、HTTP、文件等
  5. 内置窗口操作:支持滑动窗口、滚动窗口等

安装SensorBee

go get github.com/sensorbee/sensorbee

基本使用示例

1. 创建流处理拓扑

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/sensorbee/sensorbee/bql"
	"github.com/sensorbee/sensorbee/core"
	"github.com/sensorbee/sensorbee/io"
)

func main() {
	// 创建SensorBee上下文
	ctx := core.NewContext(nil)

	// 创建BQL执行器
	bqlExecutor, err := bql.NewExecutor(ctx, nil)
	if err != nil {
		panic(err)
	}

	// 定义流处理拓扑
	query := `
		CREATE SOURCE sensor_data TYPE mqtt_consumer 
		WITH uri = "tcp://iot.eclipse.org:1883", topic = "sensors/temperature";
		
		CREATE STREAM processed_data AS 
		SELECT 
			device_id, 
			temperature, 
			CASE 
				WHEN temperature > 30 THEN 'high'
				WHEN temperature > 20 THEN 'normal'
				ELSE 'low'
			END AS status
		FROM sensor_data [RANGE 1 MINUTES SLIDE 30 SECONDS];
		
		CREATE SINK alert_notification TYPE websocket 
		WITH uri = "ws://localhost:8080/notifications";
		
		CREATE SINK data_storage TYPE file 
		WITH path = "/var/log/sensor_data.log";
		
		INSERT INTO alert_notification FROM processed_data WHERE status = 'high';
		INSERT INTO data_storage FROM processed_data;
	`

	// 执行BQL查询
	if _, err := bqlExecutor.ExecuteQuery(context.Background(), query, nil); err != nil {
		panic(err)
	}

	// 保持程序运行
	fmt.Println("SensorBee流处理引擎已启动...")
	select {}
}

2. 自定义处理函数

package main

import (
	"context"
	"fmt"

	"github.com/sensorbee/sensorbee/core"
	"github.com/sensorbee/sensorbee/bql/udf"
)

// 注册自定义函数
func init() {
	udf.MustRegisterGlobalUDF("celsius_to_fahrenheit",
		udf.MustConvertGeneric(convertCelsiusToFahrenheit))
}

// 摄氏转华氏温度函数
func convertCelsiusToFahrenheit(ctx *core.Context, c float64) (float64, error) {
	return (c * 9/5) + 32, nil
}

func main() {
	ctx := core.NewContext(nil)
	bqlExecutor, err := bql.NewExecutor(ctx, nil)
	if err != nil {
		panic(err)
	}

	query := `
		CREATE SOURCE temp_data TYPE memory;
		
		CREATE STREAM converted_temp AS
		SELECT 
			device_id,
			celsius_to_fahrenheit(temperature) AS fahrenheit
		FROM temp_data;
	`

	if _, err := bqlExecutor.ExecuteQuery(context.Background(), query, nil); err != nil {
		panic(err)
	}

	// 模拟数据输入
	source, _ := ctx.Topology().Source("temp_data")
	source.Write(core.NewTuple(map[string]interface{}{
		"device_id":   "sensor-001",
		"temperature": 25.0,
	}))

	// 保持程序运行
	fmt.Println("等待数据处理...")
	select {}
}

3. 使用状态存储

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/sensorbee/sensorbee/core"
	"github.com/sensorbee/sensorbee/bql"
)

func main() {
	ctx := core.NewContext(nil)
	bqlExecutor, err := bql.NewExecutor(ctx, nil)
	if err != nil {
		panic(err)
	}

	query := `
		CREATE STATE avg_temp_state TYPE sliding_window
		WITH size = 10, interval = 1 MINUTES;
		
		CREATE SOURCE sensor_stream TYPE memory;
		
		CREATE STREAM processed_stream AS
		SELECT 
			device_id,
			temperature,
			avg(temperature) OVER avg_temp_state AS moving_avg
		FROM sensor_stream;
		
		CREATE SINK output TYPE blackhole;
		
		INSERT INTO output FROM processed_stream;
	`

	if _, err := bqlExecutor.ExecuteQuery(context.Background(), query, nil); err != nil {
		panic(err)
	}

	// 模拟数据输入
	source, _ := ctx.Topology().Source("sensor_stream")
	for i := 0; i < 15; i++ {
		temp := 20.0 + float64(i%5)
		source.Write(core.NewTuple(map[string]interface{}{
			"device_id":   fmt.Sprintf("sensor-%03d", i),
			"temperature": temp,
		}))
		time.Sleep(1 * time.Second)
	}

	fmt.Println("数据处理完成")
}

SensorBee高级功能

1. 错误处理与恢复

// 自定义错误处理中间件
type errorHandler struct {
	next core.Operator
}

func (h *errorHandler) Process(ctx *core.Context, t *core.Tuple, w core.Writer) error {
	defer func() {
		if r := recover(); r != nil {
			ctx.Log().Errorf("Recovered from panic: %v", r)
		}
	}()
	
	err := h.next.Process(ctx, t, w)
	if err != nil {
		ctx.Log().Errorf("Error processing tuple: %v", err)
		// 可以选择重试或转发到错误队列
	}
	return nil
}

func (h *errorHandler) Close(ctx *core.Context) error {
	return h.next.Close(ctx)
}

2. 自定义源插件示例

package customsource

import (
	"context"
	"time"

	"github.com/sensorbee/sensorbee/core"
	"github.com/sensorbee/sensorbee/io"
)

type customSource struct {
	interval time.Duration
	cancel   context.CancelFunc
}

func (s *customSource) GenerateStream(ctx *core.Context, w core.Writer) error {
	childCtx, cancel := context.WithCancel(ctx)
	s.cancel = cancel

	ticker := time.NewTicker(s.interval)
	defer ticker.Stop()

	count := 0
	for {
		select {
		case <-childCtx.Done():
			return nil
		case <-ticker.C:
			count++
			t := core.NewTuple(map[string]interface{}{
				"count": count,
				"time":  time.Now().Unix(),
			})
			if err := w.Write(ctx, t); err != nil {
				return err
			}
		}
	}
}

func (s *customSource) Stop(ctx *core.Context) error {
	if s.cancel != nil {
		s.cancel()
	}
	return nil
}

func init() {
	io.RegisterSource("custom_source", func(params map[string]interface{}) (core.Source, error) {
		interval := time.Second
		if v, ok := params["interval"].(string); ok {
			d, err := time.ParseDuration(v)
			if err != nil {
				return nil, err
			}
			interval = d
		}
		return &customSource{interval: interval}, nil
	})
}

性能优化建议

  1. 批量处理:对于高吞吐量场景,考虑实现批量处理
  2. 并行处理:利用SensorBee的并行处理能力
  3. 状态管理:合理使用状态存储,避免内存泄漏
  4. 资源限制:为流处理设置适当的速率限制
  5. 监控:集成监控以跟踪性能指标

总结

SensorBee为Golang开发者提供了一个轻量级但功能强大的物联网流处理解决方案。通过简单的BQL语法或直接使用Go API,开发者可以快速构建复杂的流处理拓扑。其插件系统允许轻松扩展功能,满足各种物联网场景需求。

对于资源受限的边缘设备,SensorBee是一个特别合适的选择,它能够在保持低资源占用的同时提供实时数据处理能力。

回到顶部