Rust AWS Glue SDK库aws-sdk-glue的使用:管理与自动化AWS数据ETL流程

aws-sdk-glue

定义了Glue服务的公共端点。

入门

许多服务和操作都有可用示例,请查看GitHub中的示例文件夹。

SDK为每个AWS服务提供一个crate。您必须在Rust项目中添加Tokio作为依赖项以执行异步代码。要将aws-sdk-glue添加到您的项目中,请在Cargo.toml文件中添加以下内容:

[dependencies]
aws-config = { version = "1.1.7", features = ["behavior-version-latest"] }
aws-sdk-glue = "1.117.0"
tokio = { version = "1", features = ["full"] }

然后在代码中,可以使用以下方式创建客户端:

use aws_sdk_glue as glue;

#[::tokio::main]
async fn main() -> Result<(), glue::Error> {
    let config = aws_config::load_from_env().await;
    let client = aws_sdk_glue::Client::new(&config);

    // ... 使用客户端进行一些调用

    Ok(())
}

有关可以进行的调用以及每个调用的输入和输出的信息,请参阅客户端文档。

使用SDK

在SDK发布之前,我们将向开发者指南添加有关使用SDK的信息。如果您想为指南添加其他部分,请通过提出问题并描述您要尝试执行的操作来提出建议。

获取帮助

  • GitHub讨论 - 用于想法、RFC和一般问题
  • GitHub问题 - 用于错误报告和功能请求
  • 生成的文档(最新版本)
  • 使用示例

许可证

此项目根据Apache-2.0许可证授权。

完整示例代码

use aws_sdk_glue as glue;
use aws_sdk_glue::types::JobCommand;

#[::tokio::main]
async fn main() -> Result<(), glue::Error> {
    // 从环境加载配置
    let config = aws_config::load_from_env().await;
    
    // 创建Glue客户端
    let client = aws_sdk_glue::Client::new(&config);

    // 创建ETL作业命令配置
    let job_command = JobCommand::builder()
        .name("glueetl")
        .python_version("3")
        .script_location("s3://your-bucket/scripts/etl_script.py")
        .build();

    // 创建Glue作业
    let create_job_response = client
        .create_job()
        .name("my-etl-job")
        .role("AWSGlueServiceRole")
        .command(job_command)
        .glue_version("3.0")
        .worker_type("G.1X")
        .number_of_workers(10)
        .send()
        .await?;

    println!("Created job: {:?}", create_job_response);

    // 启动作业运行
    let start_job_response = client
        .start_job_run()
        .job_name("my-etl-job")
        .send()
        .await?;

    println!("Started job run: {:?}", start_job_response);

    // 获取作业运行状态
    let get_job_run_response = client
        .get_job_run()
        .job_name("my-etl-job")
        .run_id(start_job_response.job_run_id().unwrap_or_default())
        .send()
        .await?;

    println!("Job run status: {:?}", get_job_run_response.job_run().and_then(|r| r.job_run_state()));

    Ok(())
}

基于上述示例,以下是一个更完整的Glue服务操作示例:

use aws_sdk_glue as glue;
use aws_sdk_glue::types::{JobCommand, JobRunState};
use std::time::Duration;

#[::tokio::main]
async fn main() -> Result<(), glue::Error> {
    // 从环境变量加载AWS配置(包括区域、凭证等)
    let config = aws_config::load_from_env().await;
    
    // 创建Glue客户端实例
    let client = glue::Client::new(&config);

    // 配置ETL作业命令
    let job_command = JobCommand::builder()
        .name("glueetl")  // 指定作业类型为Glue ETL
        .python_version("3")  // 使用Python 3
        .script_location("s3://your-bucket/scripts/etl_script.py")  // ETL脚本位置
        .build()?;

    // 创建Glue ETL作业
    let create_response = client
        .create_job()
        .name("my-etl-job")  // 作业名称
        .role("AWSGlueServiceRole")  // 所需的IAM角色
        .command(job_command)  // 作业命令配置
        .glue_version("3.0")  // Glue版本
        .worker_type("G.1X")  // 工作器类型
        .number_of_workers(10)  // 工作器数量
        .send()
        .await?;

    println!("成功创建作业: {:?}", create_response);

    // 启动作业运行
    let start_response = client
        .start_job_run()
        .job_name("my-etl-job")
        .send()
        .await?;

    let job_run_id = start_response.job_run_id().unwrap_or_default();
    println!("已启动作业运行,ID: {}", job_run_id);

    // 等待作业完成并检查状态
    let mut job_state = JobRunState::Running;
    while job_state == JobRunState::Running || job_state == JobRunState::Starting {
        // 等待一段时间再检查状态
        tokio::time::sleep(Duration::from_secs(30)).await;

        let status_response = client
            .get_job_run()
            .job_name("my-etl-job")
            .run_id(job_run_id)
            .send()
            .await?;

        if let Some(job_run) = status_response.job_run() {
            job_state = job_run.job_run_state().unwrap_or(JobRunState::Unknown);
            println!("当前作业状态: {:?}", job_state);
        }
    }

    // 获取最终作业运行详情
    let final_response = client
        .get_job_run()
        .job_name("my-etl-job")
        .run_id(job_run_id)
        .send()
        .await?;

    println!("作业运行最终结果: {:?}", final_response.job_run());

    // 清理:删除作业(可选)
    let _ = client
        .delete_job()
        .job_name("my-etl-job")
        .send()
        .await;

    Ok(())
}

1 回复

Rust AWS Glue SDK库aws-sdk-glue的使用:管理与自动化AWS数据ETL流程

介绍

aws-sdk-glue是AWS官方提供的Rust SDK库,用于与AWS Glue服务进行交互。该库允许开发者以编程方式管理数据ETL(提取、转换、加载)流程,包括创建、运行和监控Glue作业,管理数据目录和数据爬虫等。

主要功能

  • 创建和管理Glue作业
  • 监控作业运行状态
  • 管理数据目录和表
  • 操作数据爬虫
  • 管理触发器和工作流

安装方法

在Cargo.toml中添加依赖:

[dependencies]
aws-sdk-glue = "0.24.0"
aws-config = "0.55.1"
tokio = { version = "1.0", features = ["full"] }

基本使用方法

1. 创建Glue客户端

use aws_sdk_glue::{Client, Config};
use aws_config::BehaviorVersion;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
    let client = Client::new(&config);
    
    Ok(())
}

2. 创建Glue作业

use aws_sdk_glue::types::JobCommand;

async fn create_glue_job(client: &Client) -> Result<(), Box<dyn std::error::Error>> {
    let job_command = JobCommand::builder()
        .name("glueetl")
        .script_location("s3://my-bucket/scripts/my_etl_script.py")
        .build();
    
    let response = client
        .create_job()
        .name("my-etl-job")
        .role("arn:aws:iam::123456789012:role/GlueServiceRole")
        .command(job_command)
        .glue_version("3.0")
        .number_of_workers(10)
        .worker_type("G.1X")
        .send()
        .await?;
    
    println!("Created job: {:?}", response);
    Ok(())
}

3. 启动作业运行

async fn start_job_run(client: &Client) -> Result<(), Box<dyn std::error::Error>> {
    let response = client
        .start_job_run()
        .job_name("my-etl-job")
        .arguments("--input_path", "s3://my-bucket/input/")
        .arguments("--output_path", "s3://my-bucket/output/")
        .send()
        .await?;
    
    println!("Job run started with ID: {}", response.job_run_id().unwrap());
    Ok(())
}

4. 监控作业状态

async fn get_job_run_status(client: &Client, job_name: &str, run_id: &str) -> Result<(), Box<dyn std::error::Error>> {
    let response = client
        .get_job_run()
        .job_name(job_name)
        .run_id(run_id)
        .send()
        .await?;
    
    if let Some(job_run) = response.job_run() {
        println!("Job status: {:?}", job_run.job_run_state());
        println!("Started at: {:?}", job_run.started_on());
        println!("Completed at: {:?}", job_run.completed_on());
    }
    
    Ok(())
}

5. 创建数据爬虫

use aws_sdk_glue::types::{CrawlerTargets, S3Target};

async fn create_crawler(client: &Client) -> Result<(), Box<dyn std::error::Error>> {
    let s3_target = S3Target::builder()
        .path("s3://my-bucket/data/")
        .build();
    
    let targets = CrawlerTargets::builder()
        .s3_targets(s3_target)
        .build();
    
    let response = client
        .create_crawler()
        .name("my-data-crawler")
        .role("arn:aws:iam::123456789012:role/GlueServiceRole")
        .database_name("my_database")
        .targets(targets)
        .send()
        .await?;
    
    println!("Crawler created successfully");
    Ok(())
}

完整示例:自动化ETL流程

use aws_sdk_glue::{Client, types::{JobCommand, JobRunState}};
use aws_config::BehaviorVersion;
use std::time::Duration;
use tokio::time::sleep;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
    let client = Client::new(&config);
    
    // 启动作业
    let run_response = client
        .start_job_run()
        .job_name("my-etl-job")
        .send()
        .await?;
    
    let run_id = run_response.job_run_id().unwrap();
    
    // 轮询检查作业状态
    loop {
        let status_response = client
            .get_job_run()
            .job_name("my-etl-job")
            .run_id(run_id)
            .send()
            .await?;
        
        if let Some(job_run) = status_response.job_run() {
            match job_run.job_run_state() {
                Some(JobRunState::Succeeded) => {
                    println!("ETL job completed successfully!");
                    break;
                }
                Some(JobRunState::Failed) | Some(JobRunState::Stopped) => {
                    println!("ETL job failed or was stopped");
                    break;
                }
                _ => {
                    println!("Job still running...");
                    sleep(Duration::from_secs(30)).await;
                }
            }
        }
    }
    
    Ok(())
}

注意事项

  1. 确保正确配置AWS凭证和区域
  2. 适当处理错误和重试逻辑
  3. 监控API调用频率以避免限流
  4. 使用合适的IAM角色和权限

最佳实践

  • 使用异步编程充分利用Rust的性能优势
  • 实现适当的错误处理和日志记录
  • 考虑使用AWS SDK的重试机制
  • 对敏感信息使用环境变量或AWS Secrets Manager

这个SDK为Rust开发者提供了强大的工具来管理和自动化AWS Glue数据ETL流程,结合Rust的性能和安全特性,可以构建高效可靠的数据处理管道。

完整示例代码

use aws_sdk_glue::{Client, types::{JobCommand, CrawlerTargets, S3Target, JobRunState}};
use aws_config::BehaviorVersion;
use std::time::Duration;
use tokio::time::sleep;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 创建Glue客户端
    let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
    let client = Client::new(&config);
    
    // 创建Glue作业
    let job_command = JobCommand::builder()
        .name("glueetl")
        .script_location("s3://my-bucket/scripts/my_etl_script.py")
        .build();
    
    let create_job_response = client
        .create_job()
        .name("my-etl-job")
        .role("arn:aws:iam::123456789012:role/GlueServiceRole")
        .command(job_command)
        .glue_version("3.0")
        .number_of_workers(10)
        .worker_type("G.1X")
        .send()
        .await?;
    
    println!("Created job: {:?}", create_job_response);
    
    // 创建数据爬虫
    let s3_target = S3Target::builder()
        .path("s3://my-bucket/data/")
        .build();
    
    let targets = CrawlerTargets::builder()
        .s3_targets(s3_target)
        .build();
    
    let crawler_response = client
        .create_crawler()
        .name("my-data-crawler")
        .role("arn:aws:iam::123456789012:role/GlueServiceRole")
        .database_name("my_database")
        .targets(targets)
        .send()
        .await?;
    
    println!("Crawler created successfully: {:?}", crawler_response);
    
    // 启动作业运行
    let run_response = client
        .start_job_run()
        .job_name("my-etl-job")
        .arguments("--input_path", "s3://my-bucket/input/")
        .arguments("--output_path", "s3://my-bucket/output/")
        .send()
        .await?;
    
    let run_id = run_response.job_run_id().unwrap();
    println!("Job run started with ID: {}", run_id);
    
    // 监控作业状态
    loop {
        let status_response = client
            .get_job_run()
            .job_name("my-etl-job")
            .run_id(run_id)
            .send()
            .await?;
        
        if let Some(job_run) = status_response.job_run() {
            println!("Current job status: {:?}", job_run.job_run_state());
            println!("Started at: {:?}", job_run.started_on());
            
            match job_run.job_run_state() {
                Some(JobRunState::Succeeded) => {
                    println!("ETL job completed successfully!");
                    println!("Completed at: {:?}", job_run.completed_on());
                    break;
                }
                Some(JobRunState::Failed) | Some(JobRunState::Stopped) => {
                    println!("ETL job failed or was stopped");
                    println!("Completed at: {:?}", job_run.completed_on());
                    break;
                }
                _ => {
                    println!("Job still running... checking again in 30 seconds");
                    sleep(Duration::from_secs(30)).await;
                }
            }
        } else {
            println!("No job run information found");
            break;
        }
    }
    
    Ok(())
}
回到顶部