Golang如何连接MQTT代理与Kafka
Golang如何连接MQTT代理与Kafka 我有一个每5秒产生一条消息的MQTT主题。我想将该消息写入Kafka的主题。是否有MQTT适配器可以直接连接到Kafka?我目前使用的MQTT包是“eclipse/paho.mqtt.golang”。目前我没有看到任何好的示例,您能指导我吗?
1 回复
更多关于Golang如何连接MQTT代理与Kafka的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
对于将MQTT消息转发到Kafka,没有官方的直接适配器,但可以通过编写一个桥接服务来实现。以下是一个使用eclipse/paho.mqtt.golang和confluent-kafka-go的示例代码:
package main
import (
"fmt"
"log"
"time"
MQTT "github.com/eclipse/paho.mqtt.golang"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
const (
mqttBroker = "tcp://localhost:1883"
mqttTopic = "sensors/data"
kafkaTopic = "mqtt-messages"
)
func main() {
// 1. 连接Kafka生产者
p, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"client.id": "mqtt-bridge",
"acks": "all",
})
if err != nil {
log.Fatal("Failed to create Kafka producer:", err)
}
defer p.Close()
// 2. 连接MQTT客户端
opts := MQTT.NewClientOptions().AddBroker(mqttBroker)
opts.SetClientID("kafka-bridge-client")
opts.SetDefaultPublishHandler(func(client MQTT.Client, msg MQTT.Message) {
// 3. 收到MQTT消息时转发到Kafka
deliveryChan := make(chan kafka.Event)
err := p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &kafkaTopic,
Partition: kafka.PartitionAny,
},
Value: msg.Payload(),
}, deliveryChan)
if err != nil {
log.Printf("Failed to produce message to Kafka: %v", err)
return
}
e := <-deliveryChan
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
log.Printf("Delivery failed: %v", m.TopicPartition.Error)
} else {
log.Printf("Delivered message to Kafka topic %s [%d] @ %d",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}
close(deliveryChan)
})
client := MQTT.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
log.Fatal("Failed to connect to MQTT broker:", token.Error())
}
defer client.Disconnect(250)
// 4. 订阅MQTT主题
if token := client.Subscribe(mqttTopic, 1, nil); token.Wait() && token.Error() != nil {
log.Fatal("Failed to subscribe to MQTT topic:", token.Error())
}
log.Printf("Subscribed to MQTT topic: %s", mqttTopic)
// 保持运行
for {
time.Sleep(1 * time.Second)
}
}
这个桥接服务会:
- 创建Kafka生产者连接到
localhost:9092 - 创建MQTT客户端连接到
tcp://localhost:1883 - 订阅指定的MQTT主题
- 将收到的每条MQTT消息转发到Kafka主题
需要安装的依赖:
go get github.com/eclipse/paho.mqtt.golang
go get github.com/confluentinc/confluent-kafka-go/kafka
注意:实际部署时需要处理连接重试、错误处理和消息持久化。对于生产环境,建议添加以下功能:
- MQTT和Kafka连接的重试逻辑
- 消息处理失败时的重试机制
- 指标监控和日志记录
- 配置外部化(环境变量或配置文件)

