golang Kafka主题/分区/消息检查命令行工具插件kcli的使用

Golang Kafka主题/分区/消息检查命令行工具插件kcli的使用

简介

kcli是一个Kafka只读命令行浏览器工具,用于查看Kafka主题、分区和消息。

安装

二进制安装

可以直接下载预编译的二进制文件(Windows版本未测试)。

使用Go安装

如果你已经安装了Go(1.11或更高版本),可以通过以下命令安装:

go get -u github.com/cswank/kcli

使用方法

基本使用

kcli --help
usage: kcli [<flags>]

Flags:
      --help             显示上下文相关帮助(也可以尝试--help-long和--help-man)
  -a, --addresses=localhost:9092 ...
                        Kafka地址的逗号分隔列表
  -l, --log=LOG         用于调试,将日志输出到文件
  -t, --topic=TOPIC     直接进入特定主题
  -p, --partition=-1    直接进入主题的特定分区
  -o, --offset=-1       直接进入特定消息
  -d, --decoder=DECODER 用于解码Kafka消息的插件路径

TLS认证配置

如果你的Kafka集群启用了TLS认证,需要设置以下环境变量:

export KCLI_CERT_FILE="<PEM格式的客户端证书路径>"
export KCLI_KEY_FILE="<PEM格式的客户端密钥路径>"
export KCLI_CA_CERT_FILE="<PEM格式的CA证书路径>"

操作示例

  1. 启动后会显示主题列表
  2. 输入’h’查看帮助菜单(再次输入’h’可关闭帮助菜单)
  3. 导航到主题并按回车查看分区
  4. 导航到分区并按回车查看消息页面
  5. 导航到消息并按回车查看消息内容

搜索功能

你可以在分区或主题上搜索字符串:

  • 在分区上搜索时,当前偏移量会设置为包含搜索字符串的第一条消息
  • 在主题上搜索时,只显示包含匹配项的主题,并将它们的当前偏移量设置为包含该匹配项的第一条消息

如果分区数据量很大,搜索可能需要较长时间。如果你知道消息的大致位置,可以使用分区偏移功能(C-o)来加快搜索。

跳转功能

使用跳转命令(C-j)可以设置分区的当前偏移量:

  • 在分区视图中:输入的数字将成为当前偏移量
  • 在其他视图(主题和消息视图)中:跳转会将光标导航到你输入的值

打印功能

输入C-p,kcli会退出并将当前视图的内容打印到stdout:

  • 如果当前视图是分区,则从光标到分区末尾的每条消息都会打印到stdout
  • 这对于处理消息很有用,例如:
kcli | jq .age | awk '{s+=$1} END {print s}'

假设打印的消息是JSON格式,这将打印分区中所有消息的age字段总和。

自定义解码器

如果你的Kafka消息以某种方式编码,你可以提供自定义解码器插件。编译插件后,可以这样启动kcli:

kcli -d /path/to/your/decoder.so

屏幕颜色

如果你不喜欢默认颜色,可以设置KCLI_COLOR[0,1,2,3]为以下之一:

  • black
  • red
  • green
  • yellow
  • blue
  • magenta
  • cyan
  • white

例如:

KCLI_COLOR0=white KCLI_COLOR1=blue KCLI_COLOR2=black KCLI_COLOR3=red

Docker注意事项

如果你使用wurstmeister/kafka Docker镜像连接本地Kafka,可能需要编辑/etc/hosts文件。例如,如果:

KAFKA_ADVERTISED_HOST_NAME=kafka

那么/etc/hosts中的127.0.0.1行应该类似:

127.0.0.1       localhost kafka

示例代码

以下是使用kcli的基本示例:

// 启动kcli并连接到指定Kafka集群
kcli -a kafka1:9092,kafka2:9092

// 直接查看特定主题
kcli -t my_topic

// 查看特定主题的分区
kcli -t my_topic -p 0

// 使用自定义解码器
kcli -d ./my_decoder.so -t encoded_topic

注意:以上代码示例展示了命令行用法,实际使用时直接在终端运行这些命令即可。


更多关于golang Kafka主题/分区/消息检查命令行工具插件kcli的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang Kafka主题/分区/消息检查命令行工具插件kcli的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang Kafka命令行工具kcli使用指南

kcli是一个用Go语言编写的Kafka命令行工具插件,用于检查Kafka主题、分区和消息情况。下面我将详细介绍它的安装和使用方法。

安装kcli

首先需要安装Go环境,然后使用go get命令安装kcli:

go get github.com/twmb/kcli

或者使用brew安装(如果支持):

brew install twmb/tap/kcli

基本用法

1. 查看主题列表

kcli topics -b localhost:9092

这将列出所有可用的Kafka主题。

2. 查看特定主题详情

kcli topic my_topic -b localhost:9092

这会显示my_topic的详细信息,包括分区数量、副本分配等。

3. 查看分区信息

kcli partitions my_topic -b localhost:9092

显示指定主题的所有分区及其leader、副本等信息。

4. 消费消息

kcli consume my_topic -b localhost:9092

从指定主题消费消息并打印到控制台。

5. 生产消息

kcli produce my_topic -b localhost:9092 -m "test message"

向指定主题发送一条测试消息。

高级功能

1. 查看消费者组

kcli groups -b localhost:9092

列出所有消费者组。

2. 查看特定消费者组详情

kcli group my_group -b localhost:9092

显示消费者组的详细信息,包括成员、分区分配等。

3. 查看消息偏移量

kcli offsets my_topic -b localhost:9092

显示主题各分区的消息偏移量。

4. 从特定偏移量开始消费

kcli consume my_topic -b localhost:9092 --start 100

从偏移量100开始消费消息。

Go代码示例

如果你想在自己的Go程序中使用类似功能,可以参考以下代码片段:

package main

import (
	"context"
	"fmt"
	"github.com/twmb/franz-go/pkg/kgo"
)

func main() {
	// 创建Kafka客户端
	client, err := kgo.NewClient(
		kgo.SeedBrokers("localhost:9092"),
		kgo.ConsumeTopics("my_topic"),
	)
	if err != nil {
		panic(err)
	}
	defer client.Close()

	// 获取主题元数据
	metadata, err := client.Metadata(context.Background())
	if err != nil {
		panic(err)
	}

	// 打印主题信息
	for _, topic := range metadata.Topics {
		fmt.Printf("Topic: %s, Partitions: %d\n", topic.Topic, len(topic.Partitions))
	}

	// 消费消息
	for {
		fetches := client.PollFetches(context.Background())
		if errs := fetches.Errors(); len(errs) > 0 {
			// 处理错误
			panic(fmt.Sprint(errs))
		}

		fetches.EachRecord(func(r *kgo.Record) {
			fmt.Printf("Received message: %s\n", string(r.Value))
		})
	}
}

常见问题解决

  1. 连接问题:确保Kafka broker地址正确且网络可访问
  2. 认证问题:如果需要认证,使用-u-p参数指定用户名密码
  3. SSL问题:使用--tls参数启用TLS,--ca指定CA证书

总结

kcli是一个功能强大的Kafka命令行工具,特别适合快速检查和调试Kafka集群。它的Go实现保证了高性能和跨平台兼容性。通过上述命令和示例代码,你可以轻松管理Kafka主题、分区和消息。

如需更多功能,可以查看kcli的官方文档或使用kcli --help查看所有可用命令。

回到顶部