golang管理和诊断Apache Kafka集群并支持Protocol Buffer消息处理的CLI工具插件trubka的使用
Golang管理和诊断Apache Kafka集群并支持Protocol Buffer消息处理的CLI工具插件trubka的使用
Trubka简介
Trubka是一个用Go语言构建的Kafka CLI工具,它提供以下功能:
- 管理、查询和诊断Kafka集群
- 从Kafka消费Protocol Buffer和纯文本消息
- 向Kafka发布Protocol Buffer和纯文本消息
使用示例
安装Trubka
# 使用Go安装
go install github.com/xitonix/trubka@latest
管理Kafka集群
# 列出所有主题
trubka topics list --brokers localhost:9092
# 查看主题详情
trubka topics describe --brokers localhost:9092 --topic my_topic
# 创建新主题
trubka topics create --brokers localhost:9092 --topic new_topic --partitions 3 --replication-factor 1
消费消息
# 消费纯文本消息
trubka consume plain --brokers localhost:9092 --topic my_topic --from-beginning
# 消费Protocol Buffer消息(需要.proto文件)
trubka consume proto --brokers localhost:9092 --topic my_proto_topic \
--proto-dir ./protos --proto-file messages.proto --message-type Message
发布消息
# 发布纯文本消息
echo "Hello Kafka" | trubka publish plain --brokers localhost:9092 --topic my_topic
# 发布Protocol Buffer消息(JSON格式)
echo '{"id": 1, "name": "Test"}' | trubka publish proto \
--brokers localhost:9092 --topic my_proto_topic \
--proto-dir ./protos --proto-file messages.proto --message-type Message
完整的Golang集成示例
以下是一个完整的Go程序示例,展示如何使用Trubka的Go客户端库与Kafka交互并处理Protocol Buffer消息:
package main
import (
"fmt"
"github.com/xitonix/trubka/commands"
"github.com/xitonix/trubka/internal"
)
func main() {
// 初始化Trubka配置
config := internal.NewDefaultConfig()
config.Global.Brokers = []string{"localhost:9092"}
config.Global.Verbosity = internal.VerbosityLevelHigh
// 创建主题管理命令
topicCmd := commands.NewTopicsCommand()
topicCmd.InitFromConfig(config)
// 列出所有主题
fmt.Println("Listing all topics:")
if err := topicCmd.List(); err != nil {
panic(err)
}
// 创建消费命令
consumeCmd := commands.NewConsumeCommand()
consumeCmd.InitFromConfig(config)
consumeCmd.Topic = "my_proto_topic"
consumeCmd.ProtoDir = "./protos"
consumeCmd.ProtoFile = "messages.proto"
consumeCmd.MessageType = "Message"
consumeCmd.FromBeginning = true
// 消费Protocol Buffer消息
fmt.Println("\nConsuming Protocol Buffer messages:")
if err := consumeCmd.Run(); err != nil {
panic(err)
}
// 创建发布命令
publishCmd := commands.NewPublishCommand()
publishCmd.InitFromConfig(config)
publishCmd.Topic = "my_proto_topic"
publishCmd.ProtoDir = "./protos"
publishCmd.ProtoFile = "messages.proto"
publishCmd.MessageType = "Message"
publishCmd.Message = `{"id": 1, "name": "Test Message"}`
// 发布Protocol Buffer消息
fmt.Println("\nPublishing Protocol Buffer message:")
if err := publishCmd.Run(); err != nil {
panic(err)
}
}
致谢
特别感谢Joshua Humphries开发了令人着迷的protoreflect包。
同时也要感谢以下优秀的库和包:
- Shopify团队的sarama
- Alec Thomas的kingpin和chroma
- Naveen Mahalingam的go-pretty
- Peter Bourgon的diskv
- Brian Voelker的gofakeit
- Dustin Sallings的go-humanize
- Noah Petherbridge的confdir
- Mitchell Hashimoto的go-homedir
更多关于golang管理和诊断Apache Kafka集群并支持Protocol Buffer消息处理的CLI工具插件trubka的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang管理和诊断Apache Kafka集群并支持Protocol Buffer消息处理的CLI工具插件trubka的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Trubka: 一个强大的Kafka CLI工具插件
Trubka是一个用Go编写的Apache Kafka命令行工具,它提供了丰富的功能来管理和诊断Kafka集群,特别支持Protocol Buffer(protobuf)消息处理。下面我将详细介绍Trubka的功能和使用方法。
主要功能
- 消息生产和消费:支持JSON、原始文本和protobuf格式的消息
- 集群管理:查看broker、topic和consumer group信息
- 消息转换:支持JSON和protobuf之间的转换
- 诊断工具:监控延迟、吞吐量等指标
安装
go install github.com/xitonix/trubka@latest
基本使用示例
1. 查看topic列表
// 查看所有topic
trubka topics list --bootstrap-server localhost:9092
2. 生产消息
// 生产JSON格式消息
trubka produce my-topic --value '{"name":"John","age":30}' --bootstrap-server localhost:9092
// 生产protobuf格式消息(需要.proto文件)
trubka produce my-topic --proto-file message.proto --proto-type Message --value '{"name":"John","age":30}' --bootstrap-server localhost:9092
3. 消费消息
// 消费JSON格式消息
trubka consume my-topic --from-beginning --bootstrap-server localhost:9092
// 消费protobuf格式消息
trubka consume my-topic --proto-file message.proto --proto-type Message --from-beginning --bootstrap-server localhost:9092
Protobuf集成示例
假设我们有一个简单的protobuf定义文件person.proto
:
syntax = "proto3";
message Person {
string name = 1;
int32 age = 2;
repeated string hobbies = 3;
}
生产protobuf消息
trubka produce person-topic \
--proto-file person.proto \
--proto-type Person \
--value '{"name":"Alice","age":25,"hobbies":["reading","swimming"]}' \
--bootstrap-server localhost:9092
消费protobuf消息
trubka consume person-topic \
--proto-file person.proto \
--proto-type Person \
--from-beginning \
--bootstrap-server localhost:9092
高级功能
1. 监控consumer lag
trubka lags monitor my-group --topic my-topic --bootstrap-server localhost:9092
2. 查看consumer group状态
trubka groups describe my-group --bootstrap-server localhost:9092
3. 消息格式转换
# 将protobuf消息转换为JSON
trubka consume my-topic \
--proto-file message.proto \
--proto-type Message \
--convert-to json \
--output-file messages.json
Go代码集成示例
虽然Trubka主要是CLI工具,但它的核心功能可以通过Go代码调用:
package main
import (
"context"
"fmt"
"github.com/xitonix/trubka/commands"
"github.com/xitonix/trubka/internal"
)
func main() {
// 初始化logger
logger := internal.NewDefaultLogger()
// 生产消息
produceCmd := commands.NewProduce()
produceCmd.Init([]string{
"--topic", "test-topic",
"--value", `{"key":"value"}`,
"--bootstrap-server", "localhost:9092",
})
if err := produceCmd.Run(context.Background(), logger); err != nil {
fmt.Printf("生产消息失败: %v\n", err)
}
// 消费消息
consumeCmd := commands.NewConsume()
consumeCmd.Init([]string{
"--topic", "test-topic",
"--from-beginning",
"--bootstrap-server", "localhost:9092",
})
if err := consumeCmd.Run(context.Background(), logger); err != nil {
fmt.Printf("消费消息失败: %v\n", err)
}
}
总结
Trubka是一个功能强大的Kafka CLI工具,特别适合需要处理protobuf格式消息的场景。它提供了比原生Kafka命令行工具更友好的用户界面和更丰富的功能,尤其适合以下场景:
- 需要快速诊断Kafka集群问题
- 处理protobuf格式的消息
- 需要监控consumer lag和集群状态
- 在开发和测试环境中快速生产和消费消息
通过结合Go代码,Trubka还可以集成到自动化测试和监控系统中,为Kafka集群管理提供全面的解决方案。