Golang如何将Kafka消费者数据传递到Storm?找不到相关资料,是否可行

Golang如何将Kafka消费者数据传递到Storm?找不到相关资料,是否可行 我有来自Apache Kafka(使用Sarama包)的实时流数据。现在我想将我的消费者连接到Apache Storm,这在Golang中是否可行?我尝试寻找好的资源,但没有找到任何能帮助我的内容。虽然有Scala、Java等其他语言的资源,但我想要一个适用于Golang的解决方案。

1 回复

更多关于Golang如何将Kafka消费者数据传递到Storm?找不到相关资料,是否可行的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


在Go语言中,将Kafka消费者数据传递到Apache Storm是可行的,但需要一些中间步骤,因为Storm原生支持Java/Scala,而Go不是其首选语言。以下是两种主要方法来实现这一目标:

方法1:使用Storm的Multilang协议(通过Shell Bolt)

Storm支持通过Shell Bolt执行外部进程,并使用JSON over STDIN/STDOUT进行通信。您可以在Go中编写一个程序,从Kafka消费数据,并通过标准输入输出与Storm Bolt交互。

步骤:

  1. 在Go中实现Kafka消费者:使用Sarama包从Kafka主题消费消息。
  2. 实现Storm的Multilang协议:Go程序需要处理来自Storm的JSON消息(如元组、ack等),并通过标准输出发送处理后的数据。
  3. 在Storm拓扑中配置Shell Bolt:指向您的Go可执行文件。

示例代码:

首先,安装Sarama包:go get github.com/Shopify/sarama

Go程序(例如 kafka_storm_bridge.go

package main

import (
	"encoding/json"
	"fmt"
	"log"
	"os"

	"github.com/Shopify/sarama"
)

// 定义Storm协议消息结构
type StormMsg struct {
	Id    string      `json:"id"`
	Tuple []interface{} `json:"tuple"`
}

func main() {
	// Kafka配置
	config := sarama.NewConfig()
	config.Consumer.Return.Errors = true
	consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
	if err != nil {
		log.Fatal("Failed to create consumer: ", err)
	}
	defer consumer.Close()

	partitionConsumer, err := consumer.ConsumePartition("your-topic", 0, sarama.OffsetNewest)
	if err != nil {
		log.Fatal("Failed to consume partition: ", err)
	}
	defer partitionConsumer.Close()

	// 处理Storm协议:读取STDIN(来自Storm的指令),发送数据到STDOUT
	decoder := json.NewDecoder(os.Stdin)
	encoder := json.NewEncoder(os.Stdout)

	for {
		// 从Kafka消费消息
		select {
		case msg := <-partitionConsumer.Messages():
			// 构建Storm元组消息
			stormMsg := StormMsg{
				Id:    "default",
				Tuple: []interface{}{string(msg.Value)},
			}
			// 发送到Storm via STDOUT
			if err := encoder.Encode(stormMsg); err != nil {
				log.Println("Encode error:", err)
			}
			// 刷新输出以确保数据发送
			os.Stdout.Sync()
		case err := <-partitionConsumer.Errors():
			log.Println("Kafka consumer error:", err)
		}

		// 可选:处理来自Storm的ack或其他命令(如果需要)
		var input map[string]interface{}
		if err := decoder.Decode(&input); err != nil {
			// 忽略解码错误,继续处理Kafka数据
			continue
		}
		// 根据输入处理Storm命令(例如,确认消息)
		if cmd, ok := input["command"].(string); ok {
			if cmd == "ack" {
				// 处理ack逻辑
			}
		}
	}
}

编译Go程序:go build -o kafka_storm_bridge kafka_storm_bridge.go

Storm拓扑配置(Java示例): 在Storm拓扑中,使用ShellBolt来调用Go可执行文件。

// 在Java拓扑定义中
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spout", new YourKafkaSpout(), 1);

// 使用ShellBolt指向Go程序
builder.setBolt("go-bolt", new ShellBolt("path/to/kafka_storm_bridge"))
       .shuffleGrouping("kafka-spout");

方法2:通过中间存储或网络接口

如果Multilang方法太复杂,可以先将Kafka数据写入中间存储(如Redis、数据库)或通过HTTP/gRPC接口暴露,然后在Storm中通过相应的Spout读取。

示例步骤:

  1. Go程序消费Kafka并写入Redis: 使用Go将Kafka消息推送到Redis列表。
  2. Storm使用Redis Spout: 在Storm拓扑中配置Redis Spout(如Storm自带的org.apache.storm.redis.spout.RedisSpout)来消费数据。

Go程序写入Redis的代码片段

package main

import (
	"log"

	"github.com/Shopify/sarama"
	"github.com/go-redis/redis"
)

func main() {
	// Kafka消费者配置
	config := sarama.NewConfig()
	config.Consumer.Return.Errors = true
	consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
	if err != nil {
		log.Fatal(err)
	}
	defer consumer.Close()

	// Redis客户端
	rdb := redis.NewClient(&redis.Options{
		Addr: "localhost:6379",
	})
	defer rdb.Close()

	partitionConsumer, err := consumer.ConsumePartition("your-topic", 0, sarama.OffsetNewest)
	if err != nil {
		log.Fatal(err)
	}
	defer partitionConsumer.Close()

	for {
		select {
		case msg := <-partitionConsumer.Messages():
			// 将消息推送到Redis列表
			err := rdb.LPush("storm-input", msg.Value).Err()
			if err != nil {
				log.Println("Redis push error:", err)
			}
		case err := <-partitionConsumer.Errors():
			log.Println("Kafka error:", err)
		}
	}
}

在Storm拓扑中,使用Redis Spout来读取storm-input列表。

总结

  • 方法1直接集成,但需要处理Multilang协议,可能涉及性能开销。
  • 方法2更简单,通过中间存储解耦,但增加了延迟和依赖。

根据您的用例选择合适的方法。如果数据量不大且需要实时处理,方法1更直接;如果系统允许额外组件,方法2更稳定。确保在Storm集群中正确配置资源和路径。

回到顶部