Rust AWS MSK IAM SASL认证库aws-msk-iam-sasl-signer的使用,实现Kafka安全连接的IAM身份验证

Rust AWS MSK IAM SASL认证库aws-msk-iam-sasl-signer的使用,实现Kafka安全连接的IAM身份验证

aws-msk-iam-sasl-signer-rs是一个用于Rust的AWS MSK IAM SASL签名库。它是aws-msk-iam-sasl-signer-go的Rust版本实现。

安装

在您的Cargo.toml中添加以下依赖:

[dependencies]
aws-msk-iam-sasl-signer = "1.0.0"

示例

以下是使用aws-msk-iam-sasl-signer库实现Kafka生产者与AWS MSK服务通过IAM身份验证连接的完整示例:

use aws_msk_iam_sasl_signer::generate_auth_token;
use rdkafka::{
    config::ClientConfig,
    producer::{FutureProducer, FutureRecord},
};

#[tokio::main]
async fn main() {
    // 1. 配置AWS凭证和MSK集群信息
    let region = "us-west-2"; // 替换为您的AWS区域
    let access_key = "YOUR_ACCESS_KEY"; // 替换为您的AWS访问密钥
    let secret_key = "YOUR_SECRET_KEY"; // 替换为您的AWS秘密密钥
    let msk_brokers = "your-msk-bootstrap-brokers:9098"; // 替换为您的MSK集群地址

    // 2. 生成IAM SASL认证令牌
    let auth_token = generate_auth_token(region, access_key, secret_key)
        .expect("Failed to generate auth token");

    // 3. 配置Kafka生产者
    let producer: FutureProducer = ClientConfig::new()
        .set("bootstrap.servers", msk_brokers)
        .set("security.protocol", "SASL_SSL")
        .set("sasl.mechanism", "AWS_MSK_IAM")
        .set("sasl.aws_msk_iam.token", auth_token)
        .set("message.timeout.ms", "5000")
        .create()
        .expect("Producer creation error");

    // 4. 发送测试消息
    let topic = "test-topic";
    let payload = "Hello, AWS MSK with IAM auth!";
    
    let record = FutureRecord::to(topic)
        .payload(payload)
        .key("some_key");

    match producer.send(record, 0).await {
        Ok((partition, offset)) => println!(
            "Message sent successfully to partition {} at offset {}",
            partition, offset
        ),
        Err((e, _)) => eprintln!("Error sending message: {:?}", e),
    }
}

以下是消费者示例:

use aws_msk_iam_sasl_signer::generate_auth_token;
use rdkafka::{
    config::ClientConfig,
    consumer::{Consumer, StreamConsumer},
    Message,
};

#[tokio::main]
async fn main() {
    // 1. 配置AWS凭证和MSK集群信息
    let region = "us-west-2"; // 替换为您的AWS区域
    let access_key = "YOUR_ACCESS_KEY"; // 替换为您的AWS访问密钥
    let secret_key = "YOUR_SECRET_KEY"; // 替换为您的AWS秘密密钥
    let msk_brokers = "your-msk-bootstrap-brokers:9098"; // 替换为您的MSK集群地址

    // 2. 生成IAM SASL认证令牌
    let auth_token = generate_auth_token(region, access_key, secret_key)
        .expect("Failed to generate auth token");

    // 3. 配置Kafka消费者
    let consumer: StreamConsumer = ClientConfig::new()
        .set("bootstrap.servers", msk_brokers)
        .set("group.id", "test-group")
        .set("security.protocol", "SASL_SSL")
        .set("sasl.mechanism", "AWS_MSK_IAM")
        .set("sasl.aws_msk_iam.token", auth_token)
        .set("enable.partition.eof", "false")
        .set("session.timeout.ms", "6000")
        .set("enable.auto.commit", "true")
        .create()
        .expect("Consumer creation error");

    // 4. 订阅主题并消费消息
    let topic = "test-topic";
    consumer
        .subscribe(&[topic])
        .expect("Failed to subscribe to topic");

    loop {
        match consumer.recv().await {
            Ok(message) => {
                let payload = message.payload_view::<str>().unwrap().unwrap();
                println!("Received message: {}", payload);
            }
            Err(e) => eprintln!("Error receiving message: {:?}", e),
        }
    }
}

许可证

该库遵循Apache 2.0许可证。


1 回复

Rust AWS MSK IAM SASL认证库 aws-msk-iam-sasl-signer 使用指南

介绍

aws-msk-iam-sasl-signer 是一个 Rust 库,用于通过 IAM 身份验证实现与 AWS MSK (Managed Streaming for Kafka) 的安全连接。该库实现了 SASL/OAUTHBEARER 机制,允许使用 AWS IAM 凭证对 Kafka 客户端进行身份验证。

主要特性

  • 支持 AWS IAM 身份验证
  • 生成 SASL/OAUTHBEARER 令牌
  • 与主流 Rust Kafka 客户端库兼容
  • 自动处理凭证刷新

完整示例代码

use std::time::{Duration, SystemTime};
use aws_msk_iam_sasl_signer::generate_auth_token;
use rusoto_core::Region;
use rusoto_credential::{ChainProvider, EnvironmentProvider};
use rdkafka::{
    config::{ClientConfig, RDKafkaLogLevel},
    consumer::{stream_consumer::StreamConsumer, Consumer},
    producer::{FutureProducer, FutureRecord},
    Message,
};
use tokio::time::sleep;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 生产消息示例
    produce_message().await?;
    
    // 消费消息示例
    tokio::spawn(async {
        if let Err(e) = consume_messages().await {
            eprintln!("Consumer error: {}", e);
        }
    });
    
    // 等待一段时间让消费者处理消息
    sleep(Duration::from_secs(10)).await;
    
    Ok(())
}

async fn produce_message() -> Result<(), Box<dyn std::error::Error>> {
    // 使用环境变量凭证 (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
    let credentials_provider = EnvironmentProvider::default();
    
    // 生成认证令牌 - 替换为你的MSK区域
    let auth_token = generate_auth_token(
        &credentials_provider,
        Region::UsWest2,
    ).await?;
    
    // 打印令牌信息
    println!("Generated auth token, expires at: {:?}", auth_token.expiry_time);

    // 配置Kafka生产者
    let producer: FutureProducer = ClientConfig::new()
        .set("bootstrap.servers", "your-msk-bootstrap-servers:9098") // 替换为你的MSK地址
        .set("security.protocol", "SASL_SSL")
        .set("sasl.mechanism", "OAUTHBEARER")
        .set("sasl.oauthbearer.token", &auth_token.token)
        .set("sasl.oauthbearer.method", "OAUTHBEARER")
        .set("message.timeout.ms", "5000")
        .create()?;

    // 发送测试消息
    for i in 0..5 {
        let record = FutureRecord::to("test-topic")
            .payload(&format!("Test message {}", i))
            .key(&format!("key-{}", i));
        
        match producer.send(record, 0).await {
            Ok(_) => println!("Message {} sent successfully", i),
            Err((e, _)) => println!("Failed to send message {}: {}", i, e),
        }
    }
    
    Ok(())
}

async fn consume_messages() -> Result<(), Box<dyn std::error::Error>> {
    // 使用默认凭证链 (环境变量 -> IAM角色等)
    let credentials_provider = ChainProvider::new();
    
    // 生成认证令牌 - 替换为你的MSK区域
    let auth_token = generate_auth_token(
        &credentials_provider,
        Region::UsWest2,
    ).await?;
    
    // 检查令牌是否有效
    if auth_token.expiry_time < SystemTime::now() {
        return Err("Token already expired".into());
    }

    // 配置Kafka消费者
    let consumer: StreamConsumer = ClientConfig::new()
        .set("bootstrap.servers", "your-msk-bootstrap-servers:9098") // 替换为你的MSK地址
        .set("security.protocol", "SASL_SSL")
        .set("sasl.mechanism", "OAUTHBEARER")
        .set("sasl.oauthbearer.token", &auth_token.token)
        .set("sasl.oauthbearer.method", "OAUTHBEARER")
        .set("group.id", "rust-msk-consumer-group")
        .set("enable.partition.eof", "false")
        .set("session.timeout.ms", "6000")
        .set("enable.auto.commit", "true")
        .set("auto.offset.reset", "earliest")
        .set_log_level(RDKafkaLogLevel::Info)
        .create()?;

    // 订阅主题
    consumer.subscribe(&["test-topic"])?;
    println!("Consumer subscribed to test-topic");

    // 消费消息循环
    loop {
        match consumer.recv().await {
            Ok(message) => {
                let payload = match message.payload() {
                    Some(p) => String::from_utf8_lossy(p),
                    None => continue,
                };
                println!(
                    "Received message [{}:{}:{}]: {}",
                    message.topic(),
                    message.partition(),
                    message.offset(),
                    payload
                );
            }
            Err(e) => eprintln!("Consumer error: {}", e),
        }
    }
}

注意事项

  1. 确保你的 IAM 用户/角色有访问 MSK 集群的权限
  2. MSK 集群必须配置为允许 IAM 认证
  3. 使用正确的 bootstrap servers 地址和端口(通常是 9098)
  4. 在生产环境中考虑使用更安全的凭证管理方式

IAM 策略示例

你的 IAM 用户或角色需要类似以下的权限:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:DescribeCluster"
            ],
            "Resource": [
                "arn:aws:kafka:region:account-id:cluster/cluster-name/cluster-uuid"
            ]
        }
    ]
}

这个完整示例展示了如何:

  1. 使用环境变量和默认凭证链获取AWS凭证
  2. 生成MSK IAM认证令牌
  3. 配置Kafka生产者和消费者
  4. 处理消息生产和消费
  5. 包含基本的错误处理和日志输出

要运行此示例,需要先设置好AWS凭证和正确的MSK集群信息。

回到顶部