golang物联网边缘应用与集成开源框架插件库flogo的使用

Golang物联网边缘应用与集成开源框架插件库Flogo的使用

项目Flogo简介

Project Flogo Logo

Project Flogo是一个用于事件驱动应用的开源生态系统

Flogo是一个基于Go语言的超轻量级开源生态系统,用于构建事件驱动应用。它采用"触发器(triggers)"和"动作(actions)"的概念来处理传入事件。

核心概念

  • 应用 = 触发器 + 动作[&活动]
  • 触发器:从外部源接收数据
  • 处理器:将事件分派给动作
  • 动作:以适合实现的方式处理事件

Flogo生态系统

Flogo技术栈

Flogo生态系统包括以下关键组件:

  • 集成流:具有条件分支和可视化开发环境的应用程序集成流程引擎
  • 流处理:基于管道的简单流处理动作
  • 上下文决策:用于实时上下文决策的声明性规则
  • 微网关:用于条件、基于内容的路由、JWT验证、速率限制等常见模式

Flogo核心特性

Flogo Core是一个事件驱动的应用框架,用于开发云和物联网边缘应用,主要优势包括:

  • 动作链:在单个小于10MB的二进制文件中实现多个功能之间的通信
  • 通用贡献模型:构建可被所有功能利用的活动和触发器
  • 可扩展性:通过构建自己的动作轻松扩展可用功能

完整示例Demo

以下是一个使用Flogo构建REST API的完整Go代码示例:

package main

import (
	"context"
	"fmt"

	"github.com/project-flogo/contrib/activity/log"
	"github.com/project-flogo/contrib/trigger/rest"
	"github.com/project-flogo/core/activity"
	"github.com/project-flogo/core/api"
	"github.com/project-flogo/core/data/coerce"
	"github.com/project-flogo/core/engine"
)

func main() {
	app := myApp()

	e, err := api.NewEngine(app)

	if err != nil {
		fmt.Println("Error:", err)
		return
	}

	engine.RunEngine(e)
}

func myApp() *api.App {
	app := api.NewApp()

	// 创建REST触发器,监听8080端口
	trg := app.NewTrigger(&rest.Trigger{}, &rest.Settings{Port: 8080})
	h, _ := trg.NewHandler(&rest.HandlerSettings{Method: "GET", Path: "/blah/:num"})
	h.NewAction(RunActivities)

	// 创建日志活动实例
	logAct, _ := api.NewActivity(&log.Activity{})
	activities = map[string]activity.Activity{"log": logAct}

	return app
}

var activities map[string]activity.Activity

func RunActivities(ctx context.Context, inputs map[string]interface{}) (map[string]interface{}, error) {
	// 处理触发器输出
	trgOut := &rest.Output{}
	trgOut.FromMap(inputs)

	// 记录日志
	msg, _ := coerce.ToString(trgOut.PathParams)
	_, err := api.EvalActivity(activities["log"], &log.Input{Message: msg})
	if err != nil {
		return nil, err
	}

	// 构建响应
	response := make(map[string]interface{})
	response["id"] = "123"
	response["amount"] = "1"
	response["balance"] = "500"
	response["currency"] = "USD"

	// 返回响应
	reply := &rest.Reply{Code: 200, Data: response}
	return reply.ToMap(), nil
}

流处理示例

Flogo提供了强大的流处理能力,以下是一个流处理管道的JSON配置示例:

"stages": [
    {
      "ref": "github.com/project-flogo/stream/activity/aggregate",
      "settings": {
        "function": "sum",
        "windowType": "timeTumbling",
        "windowSize": "5000"
      },
      "input": {
        "value": "=$.input"
      }
    },
    {
      "ref": "github.com/project-flogo/contrib/activity/log",
      "input": {
        "message": "=$.result"
      }
    }
]

安装与使用

  1. 安装Flogo CLI工具:
go install github.com/project-flogo/cli/...@latest
  1. 创建并构建应用:
flogo create -f myapp
cd myapp
flogo build
  1. 运行应用:
./bin/myapp

适用场景

Flogo特别适合以下场景:

  • 需要连接事件驱动消息平台、数据存储、SaaS应用等
  • 需要部署到多种目标环境:
    • 无服务器计算
    • IoT边缘设备
    • 容器

Flogo生态系统为构建事件驱动应用提供了多种处理方式:

  • 具有流控制支持的长时间运行流程,面向应用程序集成
  • 通过管道消费和操作大量事件流,作为时间序列数据的预处理器
  • 用于实时决策的上下文声明性规则

贡献与许可

Flogo采用BSD风格许可证,欢迎贡献代码。可以通过以下方式参与:

  • 贡献新的活动或触发器
  • 修复现有组件中的错误
  • 参与社区讨论

如需了解更多详情,请参考官方文档和社区资源。


更多关于golang物联网边缘应用与集成开源框架插件库flogo的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang物联网边缘应用与集成开源框架插件库flogo的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang物联网边缘应用与Flogo框架使用指南

Flogo简介

Flogo是一个开源的物联网(IoT)集成框架,专门为边缘计算设计,由TIBCO公司开源。它采用Go语言编写,具有轻量级、高性能的特点,非常适合在资源受限的边缘设备上运行。

Flogo核心特性

  1. 轻量级:二进制文件通常只有10-20MB
  2. 高性能:基于Go语言的高并发特性
  3. 可视化设计:支持通过Web UI设计流程
  4. 丰富的连接器:支持多种协议和设备
  5. 可扩展:支持自定义插件开发

安装Flogo

# 安装Flogo CLI工具
go install github.com/project-flogo/cli/...@latest

# 验证安装
flogo --version

创建Flogo应用

# 创建新应用
flogo create -f myapp

# 进入项目目录
cd myapp

示例:简单的物联网边缘数据处理应用

下面是一个使用Flogo处理传感器数据的示例:

package main

import (
	"github.com/project-flogo/core/engine"
	"github.com/project-flogo/contrib/activity/log"
	"github.com/project-flogo/contrib/trigger/rest"
	"github.com/project-flogo/flow"
)

func main() {
	// 创建Flogo引擎
	e, err := engine.New(&engine.Config{})
	if err != nil {
		panic(err)
	}

	// 创建REST触发器
	trg := &rest.Trigger{
		Id:   "receive_sensor_data",
		Port: 8080,
	}

	// 定义处理流程
	flow := &flow.Definition{
		Metadata: &flow.Metadata{
			Input: []*flow.Attribute{
				{Name: "temperature", Type: "float64"},
				{Name: "humidity", Type: "float64"},
			},
		},
		Nodes: map[string]*flow.Node{
			"log_sensor_data": {
				Activity: &log.Activity{},
				Settings: map[string]interface{}{
					"message": "Received sensor data - Temp: $flow.temperature, Humidity: $flow.humidity",
				},
			},
			"check_threshold": {
				Activity: &threshold.Activity{},
				Settings: map[string]interface{}{
					"maxTemp": 30.0,
				},
			},
		},
		Links: []*flow.Link{
			{From: "input", To: "log_sensor_data"},
			{From: "log_sensor_data", To: "check_threshold"},
		},
	}

	// 注册流程
	if err := e.RegisterFlow("process_sensor_data", flow); err != nil {
		panic(err)
	}

	// 启动引擎
	if err := e.Start(); err != nil {
		panic(err)
	}

	// 等待引擎停止
	e.WaitForShutdown()
}

Flogo的物联网常用连接器

Flogo提供了多种物联网场景下的连接器:

  1. MQTT连接器:与MQTT代理通信
  2. HTTP连接器:RESTful API交互
  3. CoAP连接器:轻量级物联网协议
  4. OPC-UA连接器:工业自动化协议
  5. Modbus连接器:工业设备通信

MQTT示例

// 创建MQTT触发器配置
mqttConfig := &mqtt.TriggerConfig{
	Id:       "mqtt_trigger",
	Broker:   "tcp://iot.eclipse.org:1883",
	Topic:    "sensors/+/temperature",
	ClientId: "flogo_client",
}

// 创建MQTT触发器
mqttTrigger := mqtt.NewTrigger(mqttConfig)

// 定义MQTT处理流程
mqttFlow := &flow.Definition{
	Metadata: &flow.Metadata{
		Input: []*flow.Attribute{
			{Name: "payload", Type: "string"},
			{Name: "topic", Type: "string"},
		},
	},
	Nodes: map[string]*flow.Node{
		"process_message": {
			Activity: &json.Activity{},
		},
	},
}

// 注册MQTT流程
if err := e.RegisterFlow("mqtt_flow", mqttFlow); err != nil {
	panic(err)
}

自定义活动(Activity)开发

当内置活动不满足需求时,可以开发自定义活动:

package simpleactivity

import (
	"github.com/project-flogo/core/activity"
	"github.com/project-flogo/core/data/metadata"
)

type Input struct {
	Data string `md:"data"`
}

type Output struct {
	Result string `md:"result"`
}

func init() {
	_ = activity.Register(&Activity{})
}

type Activity struct {
}

func (a *Activity) Metadata() *activity.Metadata {
	return activity.ToMetadata(&Input{}, &Output{})
}

func (a *Activity) Eval(ctx activity.Context) (done bool, err error) {
	input := &Input{}
	if err := ctx.GetInputObject(input); err != nil {
		return false, err
	}

	output := &Output{Result: "Processed: " + input.Data}
	err = ctx.SetOutputObject(output)
	return true, err
}

部署到边缘设备

  1. 编译应用:
flogo build -e
  1. 将生成的二进制文件复制到边缘设备

  2. 运行:

./myapp

性能优化技巧

  1. 使用批处理:对高频数据采用批处理方式
  2. 减少内存分配:重用对象减少GC压力
  3. 合理设置并发:根据设备CPU核心数调整
  4. 启用压缩:网络传输时启用数据压缩
  5. 选择性日志:减少不必要的日志输出

总结

Flogo为Golang物联网边缘应用开发提供了强大而灵活的工具集,通过其可视化设计器和丰富的连接器,开发者可以快速构建高效的边缘计算解决方案。其轻量级特性使其非常适合资源受限的环境,而Go语言的高性能保证了数据处理效率。

回到顶部