Golang中Kafka使用OAUTH Over TLS时遇到的问题

Golang中Kafka使用OAUTH Over TLS时遇到的问题 我正在创建一个Kafka Golang客户端。该客户端尝试使用“oidc” OAuthBearer方法从IAM获取令牌,以便与Kafka建立安全连接。同时使用了“librdkafka”依赖。我使用了以下配置:

bootstrap.servers=kafka-bootstrap:9095
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
ssl.ca.location=/var/tmp/go/ca.pem
ssl.certificate.location=/var/tmp/go/clicert.pem
ssl.key.location=/var/tmp/go/cliprivkey.pem
ssl.key.password=password
sasl.oauthbearer.method=oidc
sasl.oauthbearer.scope=openid
sasl.oauthbearer.client.id=client-oauth2
sasl.oauthbearer.client.secret=xxxxxxxxxxxxxxxxxxxxxxxxx
sasl.oauthbearer.token.endpoint.url=https://xxxxxxxxxxx:8443/auth/realms/master/protocol/openid-connect/token

我遇到了以下错误:

image

恳请在此处帮助我。

谢谢。


更多关于Golang中Kafka使用OAUTH Over TLS时遇到的问题的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于Golang中Kafka使用OAUTH Over TLS时遇到的问题的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


根据你提供的配置和错误信息,问题很可能出现在TLS证书验证或OAuth令牌获取环节。以下是几个关键排查点和示例代码:

1. 检查TLS证书配置

确保你的证书文件路径正确且格式有效。可以添加更详细的TLS配置:

import (
    "github.com/confluentinc/confluent-kafka-go/kafka"
)

func createKafkaConsumer() (*kafka.Consumer, error) {
    config := &kafka.ConfigMap{
        "bootstrap.servers":        "kafka-bootstrap:9095",
        "security.protocol":        "SASL_SSL",
        "sasl.mechanism":           "OAUTHBEARER",
        "ssl.ca.location":          "/var/tmp/go/ca.pem",
        "ssl.certificate.location": "/var/tmp/go/clicert.pem",
        "ssl.key.location":         "/var/tmp/go/cliprivkey.pem",
        "ssl.key.password":         "password",
        // 添加TLS验证配置
        "enable.ssl.certificate.verification": true,
        "ssl.endpoint.identification.algorithm": "https",
        
        // OAuth配置
        "sasl.oauthbearer.method":             "oidc",
        "sasl.oauthbearer.scope":              "openid",
        "sasl.oauthbearer.client.id":          "client-oauth2",
        "sasl.oauthbearer.client.secret":      "xxxxxxxxxxxxxxxxxxxxxxxxx",
        "sasl.oauthbearer.token.endpoint.url": "https://xxxxxxxxxxx:8443/auth/realms/master/protocol/openid-connect/token",
        
        // 调试配置
        "debug": "security,broker",
    }
    
    consumer, err := kafka.NewConsumer(config)
    if err != nil {
        return nil, err
    }
    
    return consumer, nil
}

2. 自定义OAuth令牌提供者

如果内置的oidc方法有问题,可以实现自定义的令牌获取:

import (
    "context"
    "fmt"
    "time"
    
    "github.com/confluentinc/confluent-kafka-go/kafka"
    "golang.org/x/oauth2/clientcredentials"
)

type OAuthTokenProvider struct {
    config *clientcredentials.Config
    token  *oauth2.Token
}

func NewOAuthTokenProvider(clientID, clientSecret, tokenURL string) *OAuthTokenProvider {
    config := &clientcredentials.Config{
        ClientID:     clientID,
        ClientSecret: clientSecret,
        TokenURL:     tokenURL,
        Scopes:       []string{"openid"},
    }
    
    return &OAuthTokenProvider{config: config}
}

func (p *OAuthTokenProvider) GetToken(ctx context.Context) (string, error) {
    if p.token == nil || p.token.Expiry.Before(time.Now().Add(-5*time.Minute)) {
        token, err := p.config.Token(ctx)
        if err != nil {
            return "", fmt.Errorf("failed to get token: %w", err)
        }
        p.token = token
    }
    
    return p.token.AccessToken, nil
}

func createKafkaClientWithCustomOAuth() (*kafka.Consumer, error) {
    // 创建自定义令牌提供者
    tokenProvider := NewOAuthTokenProvider(
        "client-oauth2",
        "xxxxxxxxxxxxxxxxxxxxxxxxx",
        "https://xxxxxxxxxxx:8443/auth/realms/master/protocol/openid-connect/token",
    )
    
    config := &kafka.ConfigMap{
        "bootstrap.servers":        "kafka-bootstrap:9095",
        "security.protocol":        "SASL_SSL",
        "sasl.mechanism":           "OAUTHBEARER",
        "ssl.ca.location":          "/var/tmp/go/ca.pem",
        "ssl.certificate.location": "/var/tmp/go/clicert.pem",
        "ssl.key.location":         "/var/tmp/go/cliprivkey.pem",
        "ssl.key.password":         "password",
        
        // 使用自定义OAuth回调
        "sasl.oauthbearer.config": tokenProvider,
    }
    
    consumer, err := kafka.NewConsumer(config)
    if err != nil {
        return nil, err
    }
    
    return consumer, nil
}

3. 验证证书和密钥

添加证书验证代码:

import (
    "crypto/tls"
    "crypto/x509"
    "io/ioutil"
    "log"
)

func testTLSConfig() error {
    // 加载CA证书
    caCert, err := ioutil.ReadFile("/var/tmp/go/ca.pem")
    if err != nil {
        return fmt.Errorf("failed to read CA cert: %w", err)
    }
    
    caCertPool := x509.NewCertPool()
    if !caCertPool.AppendCertsFromPEM(caCert) {
        return fmt.Errorf("failed to parse CA certificate")
    }
    
    // 加载客户端证书
    cert, err := tls.LoadX509KeyPair(
        "/var/tmp/go/clicert.pem",
        "/var/tmp/go/cliprivkey.pem",
    )
    if err != nil {
        return fmt.Errorf("failed to load key pair: %w", err)
    }
    
    // 创建TLS配置
    tlsConfig := &tls.Config{
        RootCAs:      caCertPool,
        Certificates: []tls.Certificate{cert},
        MinVersion:   tls.VersionTLS12,
    }
    
    log.Println("TLS configuration is valid")
    return nil
}

4. 完整示例配置

结合所有配置的最佳实践:

func createKafkaProducer() (*kafka.Producer, error) {
    config := &kafka.ConfigMap{
        // 基础配置
        "bootstrap.servers": "kafka-bootstrap:9095",
        "client.id":         "golang-oauth-client",
        
        // TLS配置
        "security.protocol": "SASL_SSL",
        "ssl.ca.location":   "/var/tmp/go/ca.pem",
        "ssl.certificate.location": "/var/tmp/go/clicert.pem",
        "ssl.key.location":         "/var/tmp/go/cliprivkey.pem",
        "ssl.key.password":         "password",
        
        // SASL OAuth配置
        "sasl.mechanism": "OAUTHBEARER",
        "sasl.oauthbearer.method": "oidc",
        "sasl.oauthbearer.scope": "openid",
        "sasl.oauthbearer.client.id": "client-oauth2",
        "sasl.oauthbearer.client.secret": "xxxxxxxxxxxxxxxxxxxxxxxxx",
        "sasl.oauthbearer.token.endpoint.url": "https://xxxxxxxxxxx:8443/auth/realms/master/protocol/openid-connect/token",
        
        // 连接和重试配置
        "socket.keepalive.enable": true,
        "reconnect.backoff.ms":    1000,
        "reconnect.backoff.max.ms": 10000,
        
        // 调试配置(生产环境移除)
        "debug": "security,broker,protocol",
    }
    
    producer, err := kafka.NewProducer(config)
    if err != nil {
        return nil, fmt.Errorf("failed to create producer: %w", err)
    }
    
    return producer, nil
}

运行前先执行证书验证函数,确保TLS配置正确。如果问题仍然存在,检查Kafka broker是否配置了正确的SASL OAuthbearer处理器。

回到顶部