Rust AWS Glue SDK库aws-sdk-glue的使用:管理与自动化AWS数据ETL流程
aws-sdk-glue
定义了Glue服务的公共端点。
入门
许多服务和操作都有可用示例,请查看GitHub中的示例文件夹。
SDK为每个AWS服务提供一个crate。您必须在Rust项目中添加Tokio作为依赖项以执行异步代码。要将aws-sdk-glue添加到您的项目中,请在Cargo.toml文件中添加以下内容:
[dependencies]
aws-config = { version = "1.1.7", features = ["behavior-version-latest"] }
aws-sdk-glue = "1.117.0"
tokio = { version = "1", features = ["full"] }
然后在代码中,可以使用以下方式创建客户端:
use aws_sdk_glue as glue;
#[::tokio::main]
async fn main() -> Result<(), glue::Error> {
let config = aws_config::load_from_env().await;
let client = aws_sdk_glue::Client::new(&config);
// ... 使用客户端进行一些调用
Ok(())
}
有关可以进行的调用以及每个调用的输入和输出的信息,请参阅客户端文档。
使用SDK
在SDK发布之前,我们将向开发者指南添加有关使用SDK的信息。如果您想为指南添加其他部分,请通过提出问题并描述您要尝试执行的操作来提出建议。
获取帮助
- GitHub讨论 - 用于想法、RFC和一般问题
- GitHub问题 - 用于错误报告和功能请求
- 生成的文档(最新版本)
- 使用示例
许可证
此项目根据Apache-2.0许可证授权。
完整示例代码
use aws_sdk_glue as glue;
use aws_sdk_glue::types::JobCommand;
#[::tokio::main]
async fn main() -> Result<(), glue::Error> {
// 从环境加载配置
let config = aws_config::load_from_env().await;
// 创建Glue客户端
let client = aws_sdk_glue::Client::new(&config);
// 创建ETL作业命令配置
let job_command = JobCommand::builder()
.name("glueetl")
.python_version("3")
.script_location("s3://your-bucket/scripts/etl_script.py")
.build();
// 创建Glue作业
let create_job_response = client
.create_job()
.name("my-etl-job")
.role("AWSGlueServiceRole")
.command(job_command)
.glue_version("3.0")
.worker_type("G.1X")
.number_of_workers(10)
.send()
.await?;
println!("Created job: {:?}", create_job_response);
// 启动作业运行
let start_job_response = client
.start_job_run()
.job_name("my-etl-job")
.send()
.await?;
println!("Started job run: {:?}", start_job_response);
// 获取作业运行状态
let get_job_run_response = client
.get_job_run()
.job_name("my-etl-job")
.run_id(start_job_response.job_run_id().unwrap_or_default())
.send()
.await?;
println!("Job run status: {:?}", get_job_run_response.job_run().and_then(|r| r.job_run_state()));
Ok(())
}
基于上述示例,以下是一个更完整的Glue服务操作示例:
use aws_sdk_glue as glue;
use aws_sdk_glue::types::{JobCommand, JobRunState};
use std::time::Duration;
#[::tokio::main]
async fn main() -> Result<(), glue::Error> {
// 从环境变量加载AWS配置(包括区域、凭证等)
let config = aws_config::load_from_env().await;
// 创建Glue客户端实例
let client = glue::Client::new(&config);
// 配置ETL作业命令
let job_command = JobCommand::builder()
.name("glueetl") // 指定作业类型为Glue ETL
.python_version("3") // 使用Python 3
.script_location("s3://your-bucket/scripts/etl_script.py") // ETL脚本位置
.build()?;
// 创建Glue ETL作业
let create_response = client
.create_job()
.name("my-etl-job") // 作业名称
.role("AWSGlueServiceRole") // 所需的IAM角色
.command(job_command) // 作业命令配置
.glue_version("3.0") // Glue版本
.worker_type("G.1X") // 工作器类型
.number_of_workers(10) // 工作器数量
.send()
.await?;
println!("成功创建作业: {:?}", create_response);
// 启动作业运行
let start_response = client
.start_job_run()
.job_name("my-etl-job")
.send()
.await?;
let job_run_id = start_response.job_run_id().unwrap_or_default();
println!("已启动作业运行,ID: {}", job_run_id);
// 等待作业完成并检查状态
let mut job_state = JobRunState::Running;
while job_state == JobRunState::Running || job_state == JobRunState::Starting {
// 等待一段时间再检查状态
tokio::time::sleep(Duration::from_secs(30)).await;
let status_response = client
.get_job_run()
.job_name("my-etl-job")
.run_id(job_run_id)
.send()
.await?;
if let Some(job_run) = status_response.job_run() {
job_state = job_run.job_run_state().unwrap_or(JobRunState::Unknown);
println!("当前作业状态: {:?}", job_state);
}
}
// 获取最终作业运行详情
let final_response = client
.get_job_run()
.job_name("my-etl-job")
.run_id(job_run_id)
.send()
.await?;
println!("作业运行最终结果: {:?}", final_response.job_run());
// 清理:删除作业(可选)
let _ = client
.delete_job()
.job_name("my-etl-job")
.send()
.await;
Ok(())
}
1 回复
Rust AWS Glue SDK库aws-sdk-glue的使用:管理与自动化AWS数据ETL流程
介绍
aws-sdk-glue
是AWS官方提供的Rust SDK库,用于与AWS Glue服务进行交互。该库允许开发者以编程方式管理数据ETL(提取、转换、加载)流程,包括创建、运行和监控Glue作业,管理数据目录和数据爬虫等。
主要功能
- 创建和管理Glue作业
- 监控作业运行状态
- 管理数据目录和表
- 操作数据爬虫
- 管理触发器和工作流
安装方法
在Cargo.toml中添加依赖:
[dependencies]
aws-sdk-glue = "0.24.0"
aws-config = "0.55.1"
tokio = { version = "1.0", features = ["full"] }
基本使用方法
1. 创建Glue客户端
use aws_sdk_glue::{Client, Config};
use aws_config::BehaviorVersion;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
let client = Client::new(&config);
Ok(())
}
2. 创建Glue作业
use aws_sdk_glue::types::JobCommand;
async fn create_glue_job(client: &Client) -> Result<(), Box<dyn std::error::Error>> {
let job_command = JobCommand::builder()
.name("glueetl")
.script_location("s3://my-bucket/scripts/my_etl_script.py")
.build();
let response = client
.create_job()
.name("my-etl-job")
.role("arn:aws:iam::123456789012:role/GlueServiceRole")
.command(job_command)
.glue_version("3.0")
.number_of_workers(10)
.worker_type("G.1X")
.send()
.await?;
println!("Created job: {:?}", response);
Ok(())
}
3. 启动作业运行
async fn start_job_run(client: &Client) -> Result<(), Box<dyn std::error::Error>> {
let response = client
.start_job_run()
.job_name("my-etl-job")
.arguments("--input_path", "s3://my-bucket/input/")
.arguments("--output_path", "s3://my-bucket/output/")
.send()
.await?;
println!("Job run started with ID: {}", response.job_run_id().unwrap());
Ok(())
}
4. 监控作业状态
async fn get_job_run_status(client: &Client, job_name: &str, run_id: &str) -> Result<(), Box<dyn std::error::Error>> {
let response = client
.get_job_run()
.job_name(job_name)
.run_id(run_id)
.send()
.await?;
if let Some(job_run) = response.job_run() {
println!("Job status: {:?}", job_run.job_run_state());
println!("Started at: {:?}", job_run.started_on());
println!("Completed at: {:?}", job_run.completed_on());
}
Ok(())
}
5. 创建数据爬虫
use aws_sdk_glue::types::{CrawlerTargets, S3Target};
async fn create_crawler(client: &Client) -> Result<(), Box<dyn std::error::Error>> {
let s3_target = S3Target::builder()
.path("s3://my-bucket/data/")
.build();
let targets = CrawlerTargets::builder()
.s3_targets(s3_target)
.build();
let response = client
.create_crawler()
.name("my-data-crawler")
.role("arn:aws:iam::123456789012:role/GlueServiceRole")
.database_name("my_database")
.targets(targets)
.send()
.await?;
println!("Crawler created successfully");
Ok(())
}
完整示例:自动化ETL流程
use aws_sdk_glue::{Client, types::{JobCommand, JobRunState}};
use aws_config::BehaviorVersion;
use std::time::Duration;
use tokio::time::sleep;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
let client = Client::new(&config);
// 启动作业
let run_response = client
.start_job_run()
.job_name("my-etl-job")
.send()
.await?;
let run_id = run_response.job_run_id().unwrap();
// 轮询检查作业状态
loop {
let status_response = client
.get_job_run()
.job_name("my-etl-job")
.run_id(run_id)
.send()
.await?;
if let Some(job_run) = status_response.job_run() {
match job_run.job_run_state() {
Some(JobRunState::Succeeded) => {
println!("ETL job completed successfully!");
break;
}
Some(JobRunState::Failed) | Some(JobRunState::Stopped) => {
println!("ETL job failed or was stopped");
break;
}
_ => {
println!("Job still running...");
sleep(Duration::from_secs(30)).await;
}
}
}
}
Ok(())
}
注意事项
- 确保正确配置AWS凭证和区域
- 适当处理错误和重试逻辑
- 监控API调用频率以避免限流
- 使用合适的IAM角色和权限
最佳实践
- 使用异步编程充分利用Rust的性能优势
- 实现适当的错误处理和日志记录
- 考虑使用AWS SDK的重试机制
- 对敏感信息使用环境变量或AWS Secrets Manager
这个SDK为Rust开发者提供了强大的工具来管理和自动化AWS Glue数据ETL流程,结合Rust的性能和安全特性,可以构建高效可靠的数据处理管道。
完整示例代码
use aws_sdk_glue::{Client, types::{JobCommand, CrawlerTargets, S3Target, JobRunState}};
use aws_config::BehaviorVersion;
use std::time::Duration;
use tokio::time::sleep;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 创建Glue客户端
let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
let client = Client::new(&config);
// 创建Glue作业
let job_command = JobCommand::builder()
.name("glueetl")
.script_location("s3://my-bucket/scripts/my_etl_script.py")
.build();
let create_job_response = client
.create_job()
.name("my-etl-job")
.role("arn:aws:iam::123456789012:role/GlueServiceRole")
.command(job_command)
.glue_version("3.0")
.number_of_workers(10)
.worker_type("G.1X")
.send()
.await?;
println!("Created job: {:?}", create_job_response);
// 创建数据爬虫
let s3_target = S3Target::builder()
.path("s3://my-bucket/data/")
.build();
let targets = CrawlerTargets::builder()
.s3_targets(s3_target)
.build();
let crawler_response = client
.create_crawler()
.name("my-data-crawler")
.role("arn:aws:iam::123456789012:role/GlueServiceRole")
.database_name("my_database")
.targets(targets)
.send()
.await?;
println!("Crawler created successfully: {:?}", crawler_response);
// 启动作业运行
let run_response = client
.start_job_run()
.job_name("my-etl-job")
.arguments("--input_path", "s3://my-bucket/input/")
.arguments("--output_path", "s3://my-bucket/output/")
.send()
.await?;
let run_id = run_response.job_run_id().unwrap();
println!("Job run started with ID: {}", run_id);
// 监控作业状态
loop {
let status_response = client
.get_job_run()
.job_name("my-etl-job")
.run_id(run_id)
.send()
.await?;
if let Some(job_run) = status_response.job_run() {
println!("Current job status: {:?}", job_run.job_run_state());
println!("Started at: {:?}", job_run.started_on());
match job_run.job_run_state() {
Some(JobRunState::Succeeded) => {
println!("ETL job completed successfully!");
println!("Completed at: {:?}", job_run.completed_on());
break;
}
Some(JobRunState::Failed) | Some(JobRunState::Stopped) => {
println!("ETL job failed or was stopped");
println!("Completed at: {:?}", job_run.completed_on());
break;
}
_ => {
println!("Job still running... checking again in 30 seconds");
sleep(Duration::from_secs(30)).await;
}
}
} else {
println!("No job run information found");
break;
}
}
Ok(())
}