golang管理和诊断Apache Kafka集群并支持Protocol Buffer消息处理的CLI工具插件trubka的使用

Golang管理和诊断Apache Kafka集群并支持Protocol Buffer消息处理的CLI工具插件trubka的使用

Trubka简介

Trubka是一个用Go语言构建的Kafka CLI工具,它提供以下功能:

  • 管理、查询和诊断Kafka集群
  • 从Kafka消费Protocol Buffer和纯文本消息
  • 向Kafka发布Protocol Buffer和纯文本消息

logo-small

使用示例

安装Trubka

# 使用Go安装
go install github.com/xitonix/trubka@latest

管理Kafka集群

# 列出所有主题
trubka topics list --brokers localhost:9092

# 查看主题详情
trubka topics describe --brokers localhost:9092 --topic my_topic

# 创建新主题
trubka topics create --brokers localhost:9092 --topic new_topic --partitions 3 --replication-factor 1

消费消息

# 消费纯文本消息
trubka consume plain --brokers localhost:9092 --topic my_topic --from-beginning

# 消费Protocol Buffer消息(需要.proto文件)
trubka consume proto --brokers localhost:9092 --topic my_proto_topic \
    --proto-dir ./protos --proto-file messages.proto --message-type Message

发布消息

# 发布纯文本消息
echo "Hello Kafka" | trubka publish plain --brokers localhost:9092 --topic my_topic

# 发布Protocol Buffer消息(JSON格式)
echo '{"id": 1, "name": "Test"}' | trubka publish proto \
    --brokers localhost:9092 --topic my_proto_topic \
    --proto-dir ./protos --proto-file messages.proto --message-type Message

完整的Golang集成示例

以下是一个完整的Go程序示例,展示如何使用Trubka的Go客户端库与Kafka交互并处理Protocol Buffer消息:

package main

import (
	"fmt"
	"github.com/xitonix/trubka/commands"
	"github.com/xitonix/trubka/internal"
)

func main() {
	// 初始化Trubka配置
	config := internal.NewDefaultConfig()
	config.Global.Brokers = []string{"localhost:9092"}
	config.Global.Verbosity = internal.VerbosityLevelHigh

	// 创建主题管理命令
	topicCmd := commands.NewTopicsCommand()
	topicCmd.InitFromConfig(config)

	// 列出所有主题
	fmt.Println("Listing all topics:")
	if err := topicCmd.List(); err != nil {
		panic(err)
	}

	// 创建消费命令
	consumeCmd := commands.NewConsumeCommand()
	consumeCmd.InitFromConfig(config)
	consumeCmd.Topic = "my_proto_topic"
	consumeCmd.ProtoDir = "./protos"
	consumeCmd.ProtoFile = "messages.proto"
	consumeCmd.MessageType = "Message"
	consumeCmd.FromBeginning = true

	// 消费Protocol Buffer消息
	fmt.Println("\nConsuming Protocol Buffer messages:")
	if err := consumeCmd.Run(); err != nil {
		panic(err)
	}

	// 创建发布命令
	publishCmd := commands.NewPublishCommand()
	publishCmd.InitFromConfig(config)
	publishCmd.Topic = "my_proto_topic"
	publishCmd.ProtoDir = "./protos"
	publishCmd.ProtoFile = "messages.proto"
	publishCmd.MessageType = "Message"
	publishCmd.Message = `{"id": 1, "name": "Test Message"}`

	// 发布Protocol Buffer消息
	fmt.Println("\nPublishing Protocol Buffer message:")
	if err := publishCmd.Run(); err != nil {
		panic(err)
	}
}

致谢

特别感谢Joshua Humphries开发了令人着迷的protoreflect包。

同时也要感谢以下优秀的库和包:


更多关于golang管理和诊断Apache Kafka集群并支持Protocol Buffer消息处理的CLI工具插件trubka的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang管理和诊断Apache Kafka集群并支持Protocol Buffer消息处理的CLI工具插件trubka的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Trubka: 一个强大的Kafka CLI工具插件

Trubka是一个用Go编写的Apache Kafka命令行工具,它提供了丰富的功能来管理和诊断Kafka集群,特别支持Protocol Buffer(protobuf)消息处理。下面我将详细介绍Trubka的功能和使用方法。

主要功能

  1. 消息生产和消费:支持JSON、原始文本和protobuf格式的消息
  2. 集群管理:查看broker、topic和consumer group信息
  3. 消息转换:支持JSON和protobuf之间的转换
  4. 诊断工具:监控延迟、吞吐量等指标

安装

go install github.com/xitonix/trubka@latest

基本使用示例

1. 查看topic列表

// 查看所有topic
trubka topics list --bootstrap-server localhost:9092

2. 生产消息

// 生产JSON格式消息
trubka produce my-topic --value '{"name":"John","age":30}' --bootstrap-server localhost:9092

// 生产protobuf格式消息(需要.proto文件)
trubka produce my-topic --proto-file message.proto --proto-type Message --value '{"name":"John","age":30}' --bootstrap-server localhost:9092

3. 消费消息

// 消费JSON格式消息
trubka consume my-topic --from-beginning --bootstrap-server localhost:9092

// 消费protobuf格式消息
trubka consume my-topic --proto-file message.proto --proto-type Message --from-beginning --bootstrap-server localhost:9092

Protobuf集成示例

假设我们有一个简单的protobuf定义文件person.proto:

syntax = "proto3";

message Person {
  string name = 1;
  int32 age = 2;
  repeated string hobbies = 3;
}

生产protobuf消息

trubka produce person-topic \
  --proto-file person.proto \
  --proto-type Person \
  --value '{"name":"Alice","age":25,"hobbies":["reading","swimming"]}' \
  --bootstrap-server localhost:9092

消费protobuf消息

trubka consume person-topic \
  --proto-file person.proto \
  --proto-type Person \
  --from-beginning \
  --bootstrap-server localhost:9092

高级功能

1. 监控consumer lag

trubka lags monitor my-group --topic my-topic --bootstrap-server localhost:9092

2. 查看consumer group状态

trubka groups describe my-group --bootstrap-server localhost:9092

3. 消息格式转换

# 将protobuf消息转换为JSON
trubka consume my-topic \
  --proto-file message.proto \
  --proto-type Message \
  --convert-to json \
  --output-file messages.json

Go代码集成示例

虽然Trubka主要是CLI工具,但它的核心功能可以通过Go代码调用:

package main

import (
	"context"
	"fmt"
	"github.com/xitonix/trubka/commands"
	"github.com/xitonix/trubka/internal"
)

func main() {
	// 初始化logger
	logger := internal.NewDefaultLogger()

	// 生产消息
	produceCmd := commands.NewProduce()
	produceCmd.Init([]string{
		"--topic", "test-topic",
		"--value", `{"key":"value"}`,
		"--bootstrap-server", "localhost:9092",
	})
	if err := produceCmd.Run(context.Background(), logger); err != nil {
		fmt.Printf("生产消息失败: %v\n", err)
	}

	// 消费消息
	consumeCmd := commands.NewConsume()
	consumeCmd.Init([]string{
		"--topic", "test-topic",
		"--from-beginning",
		"--bootstrap-server", "localhost:9092",
	})
	if err := consumeCmd.Run(context.Background(), logger); err != nil {
		fmt.Printf("消费消息失败: %v\n", err)
	}
}

总结

Trubka是一个功能强大的Kafka CLI工具,特别适合需要处理protobuf格式消息的场景。它提供了比原生Kafka命令行工具更友好的用户界面和更丰富的功能,尤其适合以下场景:

  1. 需要快速诊断Kafka集群问题
  2. 处理protobuf格式的消息
  3. 需要监控consumer lag和集群状态
  4. 在开发和测试环境中快速生产和消费消息

通过结合Go代码,Trubka还可以集成到自动化测试和监控系统中,为Kafka集群管理提供全面的解决方案。

回到顶部