golang工业物联网消息传递与设备管理服务插件库mainflux的使用

Golang工业物联网消息传递与设备管理服务插件库Mainflux的使用

项目迁移说明

Mainflux项目已成功更名为Magistrala,并迁移至新的代码仓库。所有后续开发将在新仓库进行,由Abstract Machines公司提供专业支持。

关于Mainflux/Magistrala

Mainflux/Magistrala是一个开源的工业物联网平台,提供设备连接、消息传递和设备管理功能。它使用Golang编写,专为工业物联网场景设计。

基本使用示例

以下是一个使用Mainflux/Magistrala进行设备消息传递的简单示例:

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/mainflux/mainflux/pkg/messaging"
	"github.com/mainflux/mainflux/pkg/messaging/nats"
)

func main() {
	// 创建NATS消息发布/订阅客户端
	pubSub, err := nats.NewPubSub("nats://localhost:4222", "example-client")
	if err != nil {
		log.Fatalf("Failed to connect to NATS: %v", err)
	}
	defer pubSub.Close()

	// 定义主题
	topic := "example.topic"

	// 订阅消息
	sub := pubSub.Subscribe(topic)
	defer sub.Unsubscribe()

	// 创建上下文
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	// 启动goroutine处理接收到的消息
	go func() {
		for {
			select {
			case msg := <-sub.Messages():
				fmt.Printf("Received message: %s\n", string(msg.Payload))
			case <-ctx.Done():
				return
			}
		}
	}()

	// 发布消息
	msg := messaging.Message{
		Channel:   topic,
		Subtopic:  "",
		Publisher: "example-publisher",
		Protocol:  "mqtt",
		Payload:   []byte("Hello, Mainflux!"),
		Created:   time.Now().UnixNano(),
	}

	if err := pubSub.Publish(topic, &msg); err != nil {
		log.Fatalf("Failed to publish message: %v", err)
	}

	fmt.Println("Message published successfully")

	// 等待一段时间让消息被接收
	time.Sleep(2 * time.Second)
}

设备管理示例

以下是一个简单的设备管理示例:

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/mainflux/mainflux/things"
	"github.com/mainflux/mainflux/things/api/grpc"
	"google.golang.org/grpc"
)

func main() {
	// 连接到Mainflux Things服务
	conn, err := grpc.Dial("localhost:8181", grpc.WithInsecure())
	if err != nil {
		log.Fatalf("Failed to connect: %v", err)
	}
	defer conn.Close()

	// 创建Things客户端
	client := grpc.NewClient(conn)

	// 创建新设备
	thing := things.Thing{
		Name:     "Example Device",
		Metadata: map[string]interface{}{"type": "temperature-sensor"},
	}

	// 设置认证令牌
	token := "your-admin-token"

	// 添加设备
	thingID, err := client.CreateThing(context.Background(), token, thing)
	if err != nil {
		log.Fatalf("Failed to create thing: %v", err)
	}

	fmt.Printf("Thing created with ID: %s\n", thingID)

	// 获取设备信息
	retrievedThing, err := client.ViewThing(context.Background(), token, thingID)
	if err != nil {
		log.Fatalf("Failed to retrieve thing: %v", err)
	}

	fmt.Printf("Retrieved thing: %+v\n", retrievedThing)
}

注意事项

  1. Mainflux已迁移至Magistrala,建议使用新仓库进行开发
  2. 上述示例需要先安装Mainflux/Magistrala服务并正确配置
  3. 实际使用时需要替换示例中的连接信息和认证令牌

迁移建议

由于Mainflux已迁移至Magistrala,建议开发者:

  1. 使用Magistrala的新仓库进行开发
  2. 更新依赖项到Magistrala的新版本
  3. 关注Magistrala的新特性和API变化

更多关于golang工业物联网消息传递与设备管理服务插件库mainflux的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang工业物联网消息传递与设备管理服务插件库mainflux的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Mainflux: Golang工业物联网消息传递与设备管理服务

Mainflux是一个开源的、云原生的物联网平台和消息中间件,使用Golang编写,专为工业物联网(IIoT)场景设计。它提供了完整的设备连接、消息传递、设备管理和数据聚合功能。

Mainflux核心功能

  1. 消息代理:支持MQTT、WebSocket、HTTP、CoAP等协议
  2. 设备管理:设备注册、配置、监控
  3. 用户管理:多租户支持
  4. 数据存储:时间序列数据库集成
  5. 安全:TLS加密、JWT认证

安装Mainflux

# 克隆仓库
git clone https://github.com/mainflux/mainflux.git
cd mainflux

# 使用Docker Compose启动
docker-compose -f docker/docker-compose.yml up -d

基本使用示例

1. 创建用户

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/mainflux/mainflux/pkg/errors"
	"github.com/mainflux/mainflux/users"
	"github.com/mainflux/mainflux/users/api"
	"github.com/mainflux/mainflux/users/postgres"
)

func main() {
	// 初始化数据库连接
	dbConfig := postgres.Config{
		Host:        "localhost",
		Port:        "5432",
		User:        "mainflux",
		Pass:        "mainflux",
		Name:        "users",
		SSLMode:     "disable",
		SSLCert:     "",
		SSLKey:      "",
		SSLRootCert: "",
	}

	db, err := postgres.Connect(dbConfig)
	if err != nil {
		log.Fatalf("Failed to connect to database: %s", err)
	}
	defer db.Close()

	// 创建用户服务
	userRepo := postgres.NewUserRepository(db)
	hasher := api.NewBcryptHasher()
	userService := users.New(userRepo, hasher, "secret", nil)

	// 创建新用户
	user := users.User{
		Email:    "user@example.com",
		Password: "password",
		Metadata: map[string]interface{}{
			"role": "admin",
		},
	}

	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()

	if _, err := userService.Register(ctx, user); err != nil {
		log.Fatalf("Failed to register user: %s", err)
	}

	fmt.Println("User created successfully")
}

2. 设备管理

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/mainflux/mainflux/things"
	"github.com/mainflux/mainflux/things/api"
	"github.com/mainflux/mainflux/things/postgres"
)

func main() {
	// 初始化数据库连接
	dbConfig := postgres.Config{
		Host:        "localhost",
		Port:        "5432",
		User:        "mainflux",
		Pass:        "mainflux",
		Name:        "things",
		SSLMode:     "disable",
		SSLCert:     "",
		SSLKey:      "",
		SSLRootCert: "",
	}

	db, err := postgres.Connect(dbConfig)
	if err != nil {
		log.Fatalf("Failed to connect to database: %s", err)
	}
	defer db.Close()

	// 创建设备服务
	thingRepo := postgres.NewThingRepository(db)
	channelRepo := postgres.NewChannelRepository(db)
	idProvider := api.NewUUIDProvider()
	thingService := things.New(thingRepo, channelRepo, idProvider)

	// 创建新设备
	thing := things.Thing{
		Name:     "Temperature Sensor",
		Metadata: map[string]interface{}{
			"type": "sensor",
			"location": "building-1",
		},
	}

	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()

	// 使用管理员token
	token := "admin_token_here"

	thingID, err := thingService.AddThing(ctx, token, thing)
	if err != nil {
		log.Fatalf("Failed to add thing: %s", err)
	}

	fmt.Printf("Thing created with ID: %s\n", thingID)
}

3. 消息发布与订阅

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	mqtt "github.com/eclipse/paho.mqtt.golang"
)

func main() {
	// MQTT客户端配置
	opts := mqtt.NewClientOptions()
	opts.AddBroker("tcp://localhost:1883")
	opts.SetClientID("example-client")
	opts.SetUsername("device_token") // 使用设备token作为用户名
	opts.SetPassword("")

	// 创建客户端
	client := mqtt.NewClient(opts)
	if token := client.Connect(); token.Wait() && token.Error() != nil {
		log.Fatal(token.Error())
	}

	// 订阅主题
	topic := "channels/<channel_id>/messages" // 替换为实际channel ID
	qos := 0

	if token := client.Subscribe(topic, byte(qos), func(client mqtt.Client, msg mqtt.Message) {
		fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
	}); token.Wait() && token.Error() != nil {
		log.Fatal(token.Error())
	}

	// 发布消息
	message := `{"temp": 23.5, "hum": 45.2}`
	token := client.Publish(topic, byte(qos), false, message)
	token.Wait()

	time.Sleep(5 * time.Second) // 等待接收消息

	client.Disconnect(250)
}

Mainflux高级特性

  1. 多协议适配器:Mainflux支持多种协议适配器,可以轻松扩展
  2. 规则引擎:可以配置规则来自动处理设备数据
  3. 微服务架构:采用微服务设计,易于扩展和定制
  4. 云原生支持:支持Kubernetes部署,适合大规模工业场景

生产环境建议

  1. 安全性:始终使用TLS加密通信
  2. 认证:使用强密码和定期轮换JWT密钥
  3. 监控:集成Prometheus和Grafana进行监控
  4. 持久化:配置适当的消息持久化策略
  5. 扩展性:根据负载水平扩展各个微服务

Mainflux是构建工业物联网应用的强大工具,其Golang实现保证了高性能和低资源消耗,非常适合边缘计算场景。

回到顶部