golang Kafka主题/分区/消息检查命令行工具插件kcli的使用
Golang Kafka主题/分区/消息检查命令行工具插件kcli的使用
简介
kcli是一个Kafka只读命令行浏览器工具,用于查看Kafka主题、分区和消息。
安装
二进制安装
可以直接下载预编译的二进制文件(Windows版本未测试)。
使用Go安装
如果你已经安装了Go(1.11或更高版本),可以通过以下命令安装:
go get -u github.com/cswank/kcli
使用方法
基本使用
kcli --help
usage: kcli [<flags>]
Flags:
--help 显示上下文相关帮助(也可以尝试--help-long和--help-man)
-a, --addresses=localhost:9092 ...
Kafka地址的逗号分隔列表
-l, --log=LOG 用于调试,将日志输出到文件
-t, --topic=TOPIC 直接进入特定主题
-p, --partition=-1 直接进入主题的特定分区
-o, --offset=-1 直接进入特定消息
-d, --decoder=DECODER 用于解码Kafka消息的插件路径
TLS认证配置
如果你的Kafka集群启用了TLS认证,需要设置以下环境变量:
export KCLI_CERT_FILE="<PEM格式的客户端证书路径>"
export KCLI_KEY_FILE="<PEM格式的客户端密钥路径>"
export KCLI_CA_CERT_FILE="<PEM格式的CA证书路径>"
操作示例
- 启动后会显示主题列表
- 输入’h’查看帮助菜单(再次输入’h’可关闭帮助菜单)
- 导航到主题并按回车查看分区
- 导航到分区并按回车查看消息页面
- 导航到消息并按回车查看消息内容
搜索功能
你可以在分区或主题上搜索字符串:
- 在分区上搜索时,当前偏移量会设置为包含搜索字符串的第一条消息
- 在主题上搜索时,只显示包含匹配项的主题,并将它们的当前偏移量设置为包含该匹配项的第一条消息
如果分区数据量很大,搜索可能需要较长时间。如果你知道消息的大致位置,可以使用分区偏移功能(C-o)来加快搜索。
跳转功能
使用跳转命令(C-j)可以设置分区的当前偏移量:
- 在分区视图中:输入的数字将成为当前偏移量
- 在其他视图(主题和消息视图)中:跳转会将光标导航到你输入的值
打印功能
输入C-p,kcli会退出并将当前视图的内容打印到stdout:
- 如果当前视图是分区,则从光标到分区末尾的每条消息都会打印到stdout
- 这对于处理消息很有用,例如:
kcli | jq .age | awk '{s+=$1} END {print s}'
假设打印的消息是JSON格式,这将打印分区中所有消息的age字段总和。
自定义解码器
如果你的Kafka消息以某种方式编码,你可以提供自定义解码器插件。编译插件后,可以这样启动kcli:
kcli -d /path/to/your/decoder.so
屏幕颜色
如果你不喜欢默认颜色,可以设置KCLI_COLOR[0,1,2,3]为以下之一:
- black
- red
- green
- yellow
- blue
- magenta
- cyan
- white
例如:
KCLI_COLOR0=white KCLI_COLOR1=blue KCLI_COLOR2=black KCLI_COLOR3=red
Docker注意事项
如果你使用wurstmeister/kafka Docker镜像连接本地Kafka,可能需要编辑/etc/hosts文件。例如,如果:
KAFKA_ADVERTISED_HOST_NAME=kafka
那么/etc/hosts中的127.0.0.1行应该类似:
127.0.0.1 localhost kafka
示例代码
以下是使用kcli的基本示例:
// 启动kcli并连接到指定Kafka集群
kcli -a kafka1:9092,kafka2:9092
// 直接查看特定主题
kcli -t my_topic
// 查看特定主题的分区
kcli -t my_topic -p 0
// 使用自定义解码器
kcli -d ./my_decoder.so -t encoded_topic
注意:以上代码示例展示了命令行用法,实际使用时直接在终端运行这些命令即可。
更多关于golang Kafka主题/分区/消息检查命令行工具插件kcli的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang Kafka主题/分区/消息检查命令行工具插件kcli的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang Kafka命令行工具kcli使用指南
kcli是一个用Go语言编写的Kafka命令行工具插件,用于检查Kafka主题、分区和消息情况。下面我将详细介绍它的安装和使用方法。
安装kcli
首先需要安装Go环境,然后使用go get命令安装kcli:
go get github.com/twmb/kcli
或者使用brew安装(如果支持):
brew install twmb/tap/kcli
基本用法
1. 查看主题列表
kcli topics -b localhost:9092
这将列出所有可用的Kafka主题。
2. 查看特定主题详情
kcli topic my_topic -b localhost:9092
这会显示my_topic的详细信息,包括分区数量、副本分配等。
3. 查看分区信息
kcli partitions my_topic -b localhost:9092
显示指定主题的所有分区及其leader、副本等信息。
4. 消费消息
kcli consume my_topic -b localhost:9092
从指定主题消费消息并打印到控制台。
5. 生产消息
kcli produce my_topic -b localhost:9092 -m "test message"
向指定主题发送一条测试消息。
高级功能
1. 查看消费者组
kcli groups -b localhost:9092
列出所有消费者组。
2. 查看特定消费者组详情
kcli group my_group -b localhost:9092
显示消费者组的详细信息,包括成员、分区分配等。
3. 查看消息偏移量
kcli offsets my_topic -b localhost:9092
显示主题各分区的消息偏移量。
4. 从特定偏移量开始消费
kcli consume my_topic -b localhost:9092 --start 100
从偏移量100开始消费消息。
Go代码示例
如果你想在自己的Go程序中使用类似功能,可以参考以下代码片段:
package main
import (
"context"
"fmt"
"github.com/twmb/franz-go/pkg/kgo"
)
func main() {
// 创建Kafka客户端
client, err := kgo.NewClient(
kgo.SeedBrokers("localhost:9092"),
kgo.ConsumeTopics("my_topic"),
)
if err != nil {
panic(err)
}
defer client.Close()
// 获取主题元数据
metadata, err := client.Metadata(context.Background())
if err != nil {
panic(err)
}
// 打印主题信息
for _, topic := range metadata.Topics {
fmt.Printf("Topic: %s, Partitions: %d\n", topic.Topic, len(topic.Partitions))
}
// 消费消息
for {
fetches := client.PollFetches(context.Background())
if errs := fetches.Errors(); len(errs) > 0 {
// 处理错误
panic(fmt.Sprint(errs))
}
fetches.EachRecord(func(r *kgo.Record) {
fmt.Printf("Received message: %s\n", string(r.Value))
})
}
}
常见问题解决
- 连接问题:确保Kafka broker地址正确且网络可访问
- 认证问题:如果需要认证,使用
-u
和-p
参数指定用户名密码 - SSL问题:使用
--tls
参数启用TLS,--ca
指定CA证书
总结
kcli是一个功能强大的Kafka命令行工具,特别适合快速检查和调试Kafka集群。它的Go实现保证了高性能和跨平台兼容性。通过上述命令和示例代码,你可以轻松管理Kafka主题、分区和消息。
如需更多功能,可以查看kcli的官方文档或使用kcli --help
查看所有可用命令。