Rust AWS MSK IAM SASL认证库aws-msk-iam-sasl-signer的使用,实现Kafka安全连接的IAM身份验证
Rust AWS MSK IAM SASL认证库aws-msk-iam-sasl-signer的使用,实现Kafka安全连接的IAM身份验证
aws-msk-iam-sasl-signer-rs
是一个用于Rust的AWS MSK IAM SASL签名库。它是aws-msk-iam-sasl-signer-go的Rust版本实现。
安装
在您的Cargo.toml
中添加以下依赖:
[dependencies]
aws-msk-iam-sasl-signer = "1.0.0"
示例
以下是使用aws-msk-iam-sasl-signer库实现Kafka生产者与AWS MSK服务通过IAM身份验证连接的完整示例:
use aws_msk_iam_sasl_signer::generate_auth_token;
use rdkafka::{
config::ClientConfig,
producer::{FutureProducer, FutureRecord},
};
#[tokio::main]
async fn main() {
// 1. 配置AWS凭证和MSK集群信息
let region = "us-west-2"; // 替换为您的AWS区域
let access_key = "YOUR_ACCESS_KEY"; // 替换为您的AWS访问密钥
let secret_key = "YOUR_SECRET_KEY"; // 替换为您的AWS秘密密钥
let msk_brokers = "your-msk-bootstrap-brokers:9098"; // 替换为您的MSK集群地址
// 2. 生成IAM SASL认证令牌
let auth_token = generate_auth_token(region, access_key, secret_key)
.expect("Failed to generate auth token");
// 3. 配置Kafka生产者
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", msk_brokers)
.set("security.protocol", "SASL_SSL")
.set("sasl.mechanism", "AWS_MSK_IAM")
.set("sasl.aws_msk_iam.token", auth_token)
.set("message.timeout.ms", "5000")
.create()
.expect("Producer creation error");
// 4. 发送测试消息
let topic = "test-topic";
let payload = "Hello, AWS MSK with IAM auth!";
let record = FutureRecord::to(topic)
.payload(payload)
.key("some_key");
match producer.send(record, 0).await {
Ok((partition, offset)) => println!(
"Message sent successfully to partition {} at offset {}",
partition, offset
),
Err((e, _)) => eprintln!("Error sending message: {:?}", e),
}
}
以下是消费者示例:
use aws_msk_iam_sasl_signer::generate_auth_token;
use rdkafka::{
config::ClientConfig,
consumer::{Consumer, StreamConsumer},
Message,
};
#[tokio::main]
async fn main() {
// 1. 配置AWS凭证和MSK集群信息
let region = "us-west-2"; // 替换为您的AWS区域
let access_key = "YOUR_ACCESS_KEY"; // 替换为您的AWS访问密钥
let secret_key = "YOUR_SECRET_KEY"; // 替换为您的AWS秘密密钥
let msk_brokers = "your-msk-bootstrap-brokers:9098"; // 替换为您的MSK集群地址
// 2. 生成IAM SASL认证令牌
let auth_token = generate_auth_token(region, access_key, secret_key)
.expect("Failed to generate auth token");
// 3. 配置Kafka消费者
let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", msk_brokers)
.set("group.id", "test-group")
.set("security.protocol", "SASL_SSL")
.set("sasl.mechanism", "AWS_MSK_IAM")
.set("sasl.aws_msk_iam.token", auth_token)
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "true")
.create()
.expect("Consumer creation error");
// 4. 订阅主题并消费消息
let topic = "test-topic";
consumer
.subscribe(&[topic])
.expect("Failed to subscribe to topic");
loop {
match consumer.recv().await {
Ok(message) => {
let payload = message.payload_view::<str>().unwrap().unwrap();
println!("Received message: {}", payload);
}
Err(e) => eprintln!("Error receiving message: {:?}", e),
}
}
}
许可证
该库遵循Apache 2.0许可证。
1 回复
Rust AWS MSK IAM SASL认证库 aws-msk-iam-sasl-signer 使用指南
介绍
aws-msk-iam-sasl-signer
是一个 Rust 库,用于通过 IAM 身份验证实现与 AWS MSK (Managed Streaming for Kafka) 的安全连接。该库实现了 SASL/OAUTHBEARER 机制,允许使用 AWS IAM 凭证对 Kafka 客户端进行身份验证。
主要特性
- 支持 AWS IAM 身份验证
- 生成 SASL/OAUTHBEARER 令牌
- 与主流 Rust Kafka 客户端库兼容
- 自动处理凭证刷新
完整示例代码
use std::time::{Duration, SystemTime};
use aws_msk_iam_sasl_signer::generate_auth_token;
use rusoto_core::Region;
use rusoto_credential::{ChainProvider, EnvironmentProvider};
use rdkafka::{
config::{ClientConfig, RDKafkaLogLevel},
consumer::{stream_consumer::StreamConsumer, Consumer},
producer::{FutureProducer, FutureRecord},
Message,
};
use tokio::time::sleep;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 生产消息示例
produce_message().await?;
// 消费消息示例
tokio::spawn(async {
if let Err(e) = consume_messages().await {
eprintln!("Consumer error: {}", e);
}
});
// 等待一段时间让消费者处理消息
sleep(Duration::from_secs(10)).await;
Ok(())
}
async fn produce_message() -> Result<(), Box<dyn std::error::Error>> {
// 使用环境变量凭证 (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
let credentials_provider = EnvironmentProvider::default();
// 生成认证令牌 - 替换为你的MSK区域
let auth_token = generate_auth_token(
&credentials_provider,
Region::UsWest2,
).await?;
// 打印令牌信息
println!("Generated auth token, expires at: {:?}", auth_token.expiry_time);
// 配置Kafka生产者
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", "your-msk-bootstrap-servers:9098") // 替换为你的MSK地址
.set("security.protocol", "SASL_SSL")
.set("sasl.mechanism", "OAUTHBEARER")
.set("sasl.oauthbearer.token", &auth_token.token)
.set("sasl.oauthbearer.method", "OAUTHBEARER")
.set("message.timeout.ms", "5000")
.create()?;
// 发送测试消息
for i in 0..5 {
let record = FutureRecord::to("test-topic")
.payload(&format!("Test message {}", i))
.key(&format!("key-{}", i));
match producer.send(record, 0).await {
Ok(_) => println!("Message {} sent successfully", i),
Err((e, _)) => println!("Failed to send message {}: {}", i, e),
}
}
Ok(())
}
async fn consume_messages() -> Result<(), Box<dyn std::error::Error>> {
// 使用默认凭证链 (环境变量 -> IAM角色等)
let credentials_provider = ChainProvider::new();
// 生成认证令牌 - 替换为你的MSK区域
let auth_token = generate_auth_token(
&credentials_provider,
Region::UsWest2,
).await?;
// 检查令牌是否有效
if auth_token.expiry_time < SystemTime::now() {
return Err("Token already expired".into());
}
// 配置Kafka消费者
let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", "your-msk-bootstrap-servers:9098") // 替换为你的MSK地址
.set("security.protocol", "SASL_SSL")
.set("sasl.mechanism", "OAUTHBEARER")
.set("sasl.oauthbearer.token", &auth_token.token)
.set("sasl.oauthbearer.method", "OAUTHBEARER")
.set("group.id", "rust-msk-consumer-group")
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "true")
.set("auto.offset.reset", "earliest")
.set_log_level(RDKafkaLogLevel::Info)
.create()?;
// 订阅主题
consumer.subscribe(&["test-topic"])?;
println!("Consumer subscribed to test-topic");
// 消费消息循环
loop {
match consumer.recv().await {
Ok(message) => {
let payload = match message.payload() {
Some(p) => String::from_utf8_lossy(p),
None => continue,
};
println!(
"Received message [{}:{}:{}]: {}",
message.topic(),
message.partition(),
message.offset(),
payload
);
}
Err(e) => eprintln!("Consumer error: {}", e),
}
}
}
注意事项
- 确保你的 IAM 用户/角色有访问 MSK 集群的权限
- MSK 集群必须配置为允许 IAM 认证
- 使用正确的 bootstrap servers 地址和端口(通常是 9098)
- 在生产环境中考虑使用更安全的凭证管理方式
IAM 策略示例
你的 IAM 用户或角色需要类似以下的权限:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kafka-cluster:Connect",
"kafka-cluster:DescribeCluster"
],
"Resource": [
"arn:aws:kafka:region:account-id:cluster/cluster-name/cluster-uuid"
]
}
]
}
这个完整示例展示了如何:
- 使用环境变量和默认凭证链获取AWS凭证
- 生成MSK IAM认证令牌
- 配置Kafka生产者和消费者
- 处理消息生产和消费
- 包含基本的错误处理和日志输出
要运行此示例,需要先设置好AWS凭证和正确的MSK集群信息。