Golang使用kafka-go时遇到的TLS证书问题

Golang使用kafka-go时遇到的TLS证书问题 大家好,

我需要通过使用信任库和密钥库与我的客户端建立TLS连接。我的存储库是P12格式。我的密钥库仅包含一个私钥和一个公钥。我的信任库仅包含一个集群证书。

我的问题是,我无法连接到我的SSL Kafka。从Kafka日志中,我看到握手失败。

以下是我的代码。也许是配置有问题?有谁知道如何打印错误信息?或者我的配置错误可能在哪里?我使用kafka-go进行通信。

getKafkaReader(topic string) *kafka.Reader {

    if consumer == nil {
        consumer = kafka.NewReader(kafka.ReaderConfig{
            Brokers:     []string{kafkaConfig.Host},
            GroupID:     os.Getenv("KAFKA_CONSUMER_GROUP"),
            Topic:       topic,
            Partition:   0,
            MinBytes:    10e2, // 1KB
            MaxBytes:    10e5, // 1MB
            Dialer:      getDialer(),
        })
    }
    return consumer
}

func getDialer() *kafka.Dialer {

    dialer := &kafka.Dialer{
        Timeout:   5 * time.Second,
        DualStack: true,
        TLS:       tlsConfig(),
    }

    return dialer
}

func tlsConfig() *tls.Config {

    // Keystore
    keys, _ := ioutil.ReadFile(kafkaConfig.KeyStoreLocation)
    blocks, err := p12.ToPEM(keys, kafkaConfig.KeyStorePassword)
    if err != nil {
        log.Fatal(err.Error())
    }

    var pemData []byte
    for test, b := range blocks {
        _ = test
        pemData = append(pemData, pem.EncodeToMemory(b)...)
    }

    cert, err := tls.X509KeyPair(pemData, pemData)
    if err != nil {
        log.Fatal(err.Error())
    }
   //Truststore
    caCert, err := ioutil.ReadFile("./certificates/ca.pem")
    if err != nil {
        log.Fatal(err)
    }

    caCertPool := x509.NewCertPool()
    caCertPool.AppendCertsFromPEM(caCert)

    config := &tls.Config{
        Certificates: []tls.Certificate{cert},
        RootCAs:      caCertPool,
    }
    return config
}

更多关于Golang使用kafka-go时遇到的TLS证书问题的实战教程也可以访问 https://www.itying.com/category-94-b0.html

2 回复

我注意到我使用的是自签名证书。我需要

tlsConfig.InsecureSkipVerify = true

更多关于Golang使用kafka-go时遇到的TLS证书问题的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


在TLS配置中,你的证书加载方式有问题。P12文件通常包含私钥和证书链,但p12.ToPEM返回的PEM块需要正确分离。以下是修正后的代码:

func tlsConfig() *tls.Config {
    // 加载密钥库
    p12Data, err := ioutil.ReadFile(kafkaConfig.KeyStoreLocation)
    if err != nil {
        log.Fatal("读取P12文件失败:", err)
    }

    // 解码P12文件
    blocks, err := p12.ToPEM(p12Data, kafkaConfig.KeyStorePassword)
    if err != nil {
        log.Fatal("P12解码失败:", err)
    }

    var certPEM, keyPEM []byte
    for _, block := range blocks {
        if block.Type == "CERTIFICATE" {
            certPEM = append(certPEM, pem.EncodeToMemory(block)...)
        } else if block.Type == "PRIVATE KEY" || block.Type == "RSA PRIVATE KEY" {
            keyPEM = append(keyPEM, pem.EncodeToMemory(block)...)
        }
    }

    if len(certPEM) == 0 || len(keyPEM) == 0 {
        log.Fatal("未找到证书或私钥")
    }

    // 加载证书对
    cert, err := tls.X509KeyPair(certPEM, keyPEM)
    if err != nil {
        log.Fatal("创建X509KeyPair失败:", err)
    }

    // 加载信任库
    caCert, err := ioutil.ReadFile("./certificates/ca.pem")
    if err != nil {
        log.Fatal("读取CA证书失败:", err)
    }

    caCertPool := x509.NewCertPool()
    if !caCertPool.AppendCertsFromPEM(caCert) {
        log.Fatal("添加CA证书到证书池失败")
    }

    config := &tls.Config{
        Certificates: []tls.Certificate{cert},
        RootCAs:      caCertPool,
        MinVersion:   tls.VersionTLS12,
    }
    
    return config
}

为了调试TLS握手错误,可以添加TLS配置的详细日志:

func tlsConfig() *tls.Config {
    // ... 之前的证书加载代码保持不变 ...

    config := &tls.Config{
        Certificates: []tls.Certificate{cert},
        RootCAs:      caCertPool,
        MinVersion:   tls.VersionTLS12,
    }

    // 添加连接状态回调来记录TLS握手详情
    config.GetConnectionState = func(conn *tls.Conn) tls.ConnectionState {
        state := conn.ConnectionState()
        log.Printf("TLS握手完成: Version=%x, CipherSuite=%x, ServerName=%s",
            state.Version, state.CipherSuite, state.ServerName)
        return state
    }

    return config
}

还可以在Dialer中添加超时和错误处理:

func getDialer() *kafka.Dialer {
    dialer := &kafka.Dialer{
        Timeout:   10 * time.Second,
        DualStack: true,
        TLS:       tlsConfig(),
    }
    
    // 添加连接事件监听
    dialer.OnConnect = func(ctx context.Context, conn *kafka.Conn) error {
        log.Println("成功连接到Kafka broker")
        return nil
    }
    
    return dialer
}

如果问题仍然存在,可以启用Go的TLS调试:

export GODEBUG="x509roots=1,tls13=1"

然后运行你的程序,会显示详细的TLS握手信息。

回到顶部