Golang如何连接MQTT代理与Kafka

Golang如何连接MQTT代理与Kafka 我有一个每5秒产生一条消息的MQTT主题。我想将该消息写入Kafka的主题。是否有MQTT适配器可以直接连接到Kafka?我目前使用的MQTT包是“eclipse/paho.mqtt.golang”。目前我没有看到任何好的示例,您能指导我吗?

1 回复

更多关于Golang如何连接MQTT代理与Kafka的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


对于将MQTT消息转发到Kafka,没有官方的直接适配器,但可以通过编写一个桥接服务来实现。以下是一个使用eclipse/paho.mqtt.golangconfluent-kafka-go的示例代码:

package main

import (
    "fmt"
    "log"
    "time"

    MQTT "github.com/eclipse/paho.mqtt.golang"
    "github.com/confluentinc/confluent-kafka-go/kafka"
)

const (
    mqttBroker = "tcp://localhost:1883"
    mqttTopic  = "sensors/data"
    kafkaTopic = "mqtt-messages"
)

func main() {
    // 1. 连接Kafka生产者
    p, err := kafka.NewProducer(&kafka.ConfigMap{
        "bootstrap.servers": "localhost:9092",
        "client.id":         "mqtt-bridge",
        "acks":              "all",
    })
    if err != nil {
        log.Fatal("Failed to create Kafka producer:", err)
    }
    defer p.Close()

    // 2. 连接MQTT客户端
    opts := MQTT.NewClientOptions().AddBroker(mqttBroker)
    opts.SetClientID("kafka-bridge-client")
    opts.SetDefaultPublishHandler(func(client MQTT.Client, msg MQTT.Message) {
        // 3. 收到MQTT消息时转发到Kafka
        deliveryChan := make(chan kafka.Event)
        err := p.Produce(&kafka.Message{
            TopicPartition: kafka.TopicPartition{
                Topic:     &kafkaTopic,
                Partition: kafka.PartitionAny,
            },
            Value: msg.Payload(),
        }, deliveryChan)

        if err != nil {
            log.Printf("Failed to produce message to Kafka: %v", err)
            return
        }

        e := <-deliveryChan
        m := e.(*kafka.Message)
        if m.TopicPartition.Error != nil {
            log.Printf("Delivery failed: %v", m.TopicPartition.Error)
        } else {
            log.Printf("Delivered message to Kafka topic %s [%d] @ %d",
                *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
        }
        close(deliveryChan)
    })

    client := MQTT.NewClient(opts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        log.Fatal("Failed to connect to MQTT broker:", token.Error())
    }
    defer client.Disconnect(250)

    // 4. 订阅MQTT主题
    if token := client.Subscribe(mqttTopic, 1, nil); token.Wait() && token.Error() != nil {
        log.Fatal("Failed to subscribe to MQTT topic:", token.Error())
    }
    log.Printf("Subscribed to MQTT topic: %s", mqttTopic)

    // 保持运行
    for {
        time.Sleep(1 * time.Second)
    }
}

这个桥接服务会:

  1. 创建Kafka生产者连接到localhost:9092
  2. 创建MQTT客户端连接到tcp://localhost:1883
  3. 订阅指定的MQTT主题
  4. 将收到的每条MQTT消息转发到Kafka主题

需要安装的依赖:

go get github.com/eclipse/paho.mqtt.golang
go get github.com/confluentinc/confluent-kafka-go/kafka

注意:实际部署时需要处理连接重试、错误处理和消息持久化。对于生产环境,建议添加以下功能:

  • MQTT和Kafka连接的重试逻辑
  • 消息处理失败时的重试机制
  • 指标监控和日志记录
  • 配置外部化(环境变量或配置文件)
回到顶部