Golang使用kafka-go时遇到的TLS证书问题
Golang使用kafka-go时遇到的TLS证书问题 大家好,
我需要通过使用信任库和密钥库与我的客户端建立TLS连接。我的存储库是P12格式。我的密钥库仅包含一个私钥和一个公钥。我的信任库仅包含一个集群证书。
我的问题是,我无法连接到我的SSL Kafka。从Kafka日志中,我看到握手失败。
以下是我的代码。也许是配置有问题?有谁知道如何打印错误信息?或者我的配置错误可能在哪里?我使用kafka-go进行通信。
getKafkaReader(topic string) *kafka.Reader {
if consumer == nil {
consumer = kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{kafkaConfig.Host},
GroupID: os.Getenv("KAFKA_CONSUMER_GROUP"),
Topic: topic,
Partition: 0,
MinBytes: 10e2, // 1KB
MaxBytes: 10e5, // 1MB
Dialer: getDialer(),
})
}
return consumer
}
func getDialer() *kafka.Dialer {
dialer := &kafka.Dialer{
Timeout: 5 * time.Second,
DualStack: true,
TLS: tlsConfig(),
}
return dialer
}
func tlsConfig() *tls.Config {
// Keystore
keys, _ := ioutil.ReadFile(kafkaConfig.KeyStoreLocation)
blocks, err := p12.ToPEM(keys, kafkaConfig.KeyStorePassword)
if err != nil {
log.Fatal(err.Error())
}
var pemData []byte
for test, b := range blocks {
_ = test
pemData = append(pemData, pem.EncodeToMemory(b)...)
}
cert, err := tls.X509KeyPair(pemData, pemData)
if err != nil {
log.Fatal(err.Error())
}
//Truststore
caCert, err := ioutil.ReadFile("./certificates/ca.pem")
if err != nil {
log.Fatal(err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
config := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
}
return config
}
更多关于Golang使用kafka-go时遇到的TLS证书问题的实战教程也可以访问 https://www.itying.com/category-94-b0.html
2 回复
我注意到我使用的是自签名证书。我需要
tlsConfig.InsecureSkipVerify = true
更多关于Golang使用kafka-go时遇到的TLS证书问题的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
在TLS配置中,你的证书加载方式有问题。P12文件通常包含私钥和证书链,但p12.ToPEM返回的PEM块需要正确分离。以下是修正后的代码:
func tlsConfig() *tls.Config {
// 加载密钥库
p12Data, err := ioutil.ReadFile(kafkaConfig.KeyStoreLocation)
if err != nil {
log.Fatal("读取P12文件失败:", err)
}
// 解码P12文件
blocks, err := p12.ToPEM(p12Data, kafkaConfig.KeyStorePassword)
if err != nil {
log.Fatal("P12解码失败:", err)
}
var certPEM, keyPEM []byte
for _, block := range blocks {
if block.Type == "CERTIFICATE" {
certPEM = append(certPEM, pem.EncodeToMemory(block)...)
} else if block.Type == "PRIVATE KEY" || block.Type == "RSA PRIVATE KEY" {
keyPEM = append(keyPEM, pem.EncodeToMemory(block)...)
}
}
if len(certPEM) == 0 || len(keyPEM) == 0 {
log.Fatal("未找到证书或私钥")
}
// 加载证书对
cert, err := tls.X509KeyPair(certPEM, keyPEM)
if err != nil {
log.Fatal("创建X509KeyPair失败:", err)
}
// 加载信任库
caCert, err := ioutil.ReadFile("./certificates/ca.pem")
if err != nil {
log.Fatal("读取CA证书失败:", err)
}
caCertPool := x509.NewCertPool()
if !caCertPool.AppendCertsFromPEM(caCert) {
log.Fatal("添加CA证书到证书池失败")
}
config := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
MinVersion: tls.VersionTLS12,
}
return config
}
为了调试TLS握手错误,可以添加TLS配置的详细日志:
func tlsConfig() *tls.Config {
// ... 之前的证书加载代码保持不变 ...
config := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
MinVersion: tls.VersionTLS12,
}
// 添加连接状态回调来记录TLS握手详情
config.GetConnectionState = func(conn *tls.Conn) tls.ConnectionState {
state := conn.ConnectionState()
log.Printf("TLS握手完成: Version=%x, CipherSuite=%x, ServerName=%s",
state.Version, state.CipherSuite, state.ServerName)
return state
}
return config
}
还可以在Dialer中添加超时和错误处理:
func getDialer() *kafka.Dialer {
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
TLS: tlsConfig(),
}
// 添加连接事件监听
dialer.OnConnect = func(ctx context.Context, conn *kafka.Conn) error {
log.Println("成功连接到Kafka broker")
return nil
}
return dialer
}
如果问题仍然存在,可以启用Go的TLS调试:
export GODEBUG="x509roots=1,tls13=1"
然后运行你的程序,会显示详细的TLS握手信息。

