Rust AWS数据同步库aws-sdk-datasync的使用:实现高效云端与本地存储之间的数据传输和管理

Rust AWS数据同步库aws-sdk-datasync的使用:实现高效云端与本地存储之间的数据传输和管理

DataSync是一项在线数据移动服务,可简化数据迁移并帮助您快速、轻松、安全地将文件或对象数据传输到、从以及在Amazon Web Services存储服务之间传输。

开始使用

要使用aws-sdk-datasync,您需要在Rust项目中添加Tokio作为异步代码执行的依赖项。将以下内容添加到您的Cargo.toml文件中:

[dependencies]
aws-config = { version = "1.1.7", features = ["behavior-version-latest"] }
aws-sdk-datasync = "1.85.0"
tokio = { version = "1", features = ["full"] }

然后在代码中,可以按以下方式创建客户端:

use aws_sdk_datasync as datasync;

#[::tokio::main]
async fn main() -> Result<(), datasync::Error> {
    let config = aws_config::load_from_env().await;
    let client = aws_sdk_datasync::Client::new(&config);

    // ... 使用客户端进行一些调用

    Ok(())
}

完整示例

以下是一个完整的示例,展示如何使用aws-sdk-datasync创建数据传输任务:

use aws_sdk_datasync as datasync;
use datasync::types::{LocationFilter, S3Config};

#[tokio::main]
async fn main() -> Result<(), datasync::Error> {
    // 加载AWS配置
    let config = aws_config::load_from_env().await;
    let client = datasync::Client::new(&config);

    // 创建S3源位置
    let source_location = client
        .create_location_s3()
        .s3_bucket_arn("arn:aws:s3:::my-source-bucket")
        .s3_config(S3Config::builder().build())
        .subdirectory("/path/to/data")
        .send()
        .await?;

    // 创建EFS目标位置
    let destination_location = client
        .create_location_efs()
        .efs_filesystem_arn("arn:aws:elasticfilesystem:us-west-2:123456789012:file-system/fs-01234567")
        .subdirectory("/target/path")
        .send()
        .await?;

    // 创建数据传输任务
    let task = client
        .create_task()
        .source_location_arn(source_location.location_arn().unwrap())
        .destination_location_arn(destination_location.location_arn().unwrap())
        .name("MyDataSyncTask")
        .send()
        .await?;

    // 启动任务
    client
        .start_task_execution()
        .task_arn(task.task_arn().unwrap())
        .send()
        .await?;

    println!("DataSync task started successfully");

    Ok(())
}

扩展完整示例

以下是一个更完整的示例,包含任务状态监控和错误处理:

use aws_sdk_datasync as datasync;
use datasync::types::{S3Config, EfsConfig};
use std::time::Duration;
use tokio::time::sleep;

#[tokio::main]
async fn main() -> Result<(), datasync::Error> {
    // 加载AWS配置
    let config = aws_config::load_from_env().await;
    let client = datasync::Client::new(&config);

    // 创建S3源位置
    println!("Creating S3 source location...");
    let source_location = client
        .create_location_s3()
        .s3_bucket_arn("arn:aws:s3:::my-source-bucket")
        .s3_config(S3Config::builder().build())
        .subdirectory("/path/to/data")
        .send()
        .await?;
    let source_arn = source_location.location_arn().unwrap();

    // 创建EFS目标位置
    println!("Creating EFS destination location...");
    let destination_location = client
        .create_location_efs()
        .efs_filesystem_arn("arn:aws:elasticfilesystem:us-west-2:123456789012:file-system/fs-01234567")
        .subdirectory("/target/path")
        .send()
        .await?;
    let destination_arn = destination_location.location_arn().unwrap();

    // 创建数据传输任务
    println!("Creating data sync task...");
    let task = client
        .create_task()
        .source_location_arn(source_arn)
        .destination_location_arn(destination_arn)
        .name("MyDataSyncTask")
        .send()
        .await?;
    let task_arn = task.task_arn().unwrap();

    // 启动任务
    println!("Starting task execution...");
    let execution = client
        .start_task_execution()
        .task_arn(task_arn)
        .send()
        .await?;
    let execution_arn = execution.task_execution_arn().unwrap();

    // 监控任务状态
    println!("Monitoring task execution...");
    loop {
        let status = client
            .describe_task_execution()
            .task_execution_arn(execution_arn)
            .send()
            .await?;
        
        match status.status() {
            Some(datasync::types::TaskExecutionStatus::Success) => {
                println!("Task completed successfully");
                break;
            }
            Some(datasync::types::TaskExecutionStatus::Error) => {
                println!("Task failed");
                if let Some(error) = status.error_code() {
                    println!("Error code: {}", error);
                }
                if let Some(detail) = status.error_detail() {
                    println!("Error detail: {}", detail);
                }
                return Err(datasync::Error::Unhandled(Box::new(std::io::Error::new(
                    std::io::ErrorKind::Other,
                    "DataSync task failed",
                ))));
            }
            _ => {
                println!("Task is still running...");
                sleep(Duration::from_secs(10)).await;
            }
        }
    }

    Ok(())
}

使用说明

  • 该SDK为每个AWS服务提供一个crate
  • 您需要添加Tokio作为依赖项来执行异步代码
  • 客户端文档提供了所有可调用的方法及其输入输出的信息

获取帮助

  • GitHub讨论 - 用于想法、RFC和一般问题
  • GitHub问题 - 用于错误报告和功能请求
  • 生成的文档(最新版本)
  • 使用示例

许可证

该项目采用Apache-2.0许可证。


1 回复

Rust AWS数据同步库aws-sdk-datasync的使用指南

简介

aws-sdk-datasync是AWS官方提供的Rust SDK,用于与AWS DataSync服务交互。AWS DataSync是一项在线数据传输服务,可简化、自动化和加速本地存储与AWS存储服务之间的数据传输。

主要功能

  • 在本地存储系统与AWS存储服务(如S3、EFS、FSx)之间高效传输数据
  • 自动处理网络优化、加密和完整性验证
  • 支持增量传输,只传输变更的部分
  • 提供任务监控和管理功能

基本使用方法

添加依赖

[dependencies]
aws-config = "0.55"
aws-sdk-datasync = "0.22"
tokio = { version = "1", features = ["full"] }

创建DataSync客户端

use aws_sdk_datasync as datasync;

async fn create_client() -> datasync::Client {
    let config = aws_config::load_from_env().await;
    datasync::Client::new(&config)
}

创建数据传输任务

use datasync::model::{LocationFilter, LocationFilterName};

async fn list_locations(client: &datasync::Client) -> Result<(), datasync::Error> {
    let resp = client.list_locations().send().await?;
    
    if let Some(locations) = resp.locations() {
        for location in locations {
            println!("Location ARN: {}", location.location_arn().unwrap_or("unknown"));
            println!("Type: {}", location.location_uri().unwrap_or("unknown"));
        }
    }
    
    Ok(())
}

启动数据传输任务

async fn start_task_execution(
    client: &datasync::Client,
    task_arn: &str,
) -> Result<(), datasync::Error> {
    let resp = client
        .start_task_execution()
        .task_arn(task_arn)
        .send()
        .await?;
    
    println!(
        "Started task execution: {}",
        resp.task_execution_arn().unwrap_or("unknown")
    );
    
    Ok(())
}

监控任务状态

async fn describe_task_execution(
    client: &datasync::Client,
    task_execution_arn: &str,
) -> Result<(), datasync::Error> {
    let resp = client
        .describe_task_execution()
        .task_execution_arn(task_execution_arn)
        .send()
        .await?;
    
    println!("Status: {:?}", resp.status());
    println!("Bytes transferred: {:?}", resp.bytes_transferred());
    println!("Files transferred: {:?}", resp.files_transferred());
    
    Ok(())
}

完整示例:从S3到本地NFS的同步

use aws_sdk_datasync as datasync;
use datasync::model::{FilterRule, FilterType, Options, TaskSchedule};

#[tokio::main]
async fn main() -> Result<(), datasync::Error> {
    let client = create_client().await;
    
    // 假设我们已经有了源(S3)和目标(NFS)位置的ARN
    let source_location_arn = "arn:aws:datasync:us-east-1:123456789012:location/loc-1234567890abcdef0";
    let destination_location_arn = "arn:aws:datasync:us-east-1:123456789012:location/loc-abcdef1234567890";
    
    // 创建任务
    let task = client
        .create_task()
        .source_location_arn(source_location_arn)
        .destination_location_arn(destination_location_arn)
        .name("S3-to-NFS-Sync")
        .options(
            Options::builder()
                .verify_mode(datasync::model::VerifyMode::PointInTimeConsistent)
                .build(),
        )
        .schedule(
            TaskSchedule::builder()
                .schedule_expression("cron(0 0 * * ? *)") // 每天午夜运行
                .build(),
        )
        .send()
        .await?;
    
    let task_arn = task.task_arn().unwrap();
    println!("Created task with ARN: {}", task_arn);
    
    // 立即执行任务
    start_task_execution(&client, task_arn).await?;
    
    Ok(())
}

async fn create_client() -> datasync::Client {
    let config = aws_config::load_from_env().await;
    datasync::Client::new(&config)
}

高级功能

使用过滤器

use datasync::model::{FilterRule, FilterType};

// 只同步.jpg文件
let filter = FilterRule::builder()
    .filter_type(FilterType::SimplePattern)
    .value("*.jpg")
    .build();

client
    .create_task()
    // ...其他参数
    .includes(filter)
    .send()
    .await?;

配置传输选项

use datasync::model::{Options, VerifyMode};

let options = Options::builder()
    .bytes_per_second(1024 * 1024) // 限制为1MB/s
    .verify_mode(VerifyMode::OnlyFilesTransferred)
    .overwrite_mode(datasync::model::OverwriteMode::Always)
    .build();

完整示例Demo

以下是一个完整的DataSync任务管理示例,包含创建、执行和监控任务的全流程:

use aws_sdk_datasync as datasync;
use datasync::model::{FilterRule, FilterType, Options, TaskSchedule};
use std::time::Duration;
use tokio::time::sleep;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 创建DataSync客户端
    let client = datasync::Client::new(&aws_config::load_from_env().await);
    
    // 1. 列出所有可用位置
    println!("Listing available locations...");
    let locations = client.list_locations().send().await?;
    for loc in locations.locations().unwrap_or_default() {
        println!("- {}: {}", 
            loc.location_arn().unwrap_or("unknown"),
            loc.location_uri().unwrap_or("unknown")
        );
    }
    
    // 2. 创建数据传输任务 (示例ARN,实际使用时需要替换)
    let source_arn = "arn:aws:datasync:us-east-1:123456789012:location/loc-src-123";
    let dest_arn = "arn:aws:datasync:us-east-1:123456789012:location/loc-dst-456";
    
    println!("Creating sync task...");
    let task = client
        .create_task()
        .source_location_arn(source_arn)
        .destination_location_arn(dest_arn)
        .name("Example-Sync-Task")
        .options(
            Options::builder()
                .bytes_per_second(1024 * 1024) // 1MB/s限速
                .verify_mode(datasync::model::VerifyMode::PointInTimeConsistent)
                .build()
        )
        .includes(
            FilterRule::builder()
                .filter_type(FilterType::SimplePattern)
                .value("*.jpg") // 只同步jpg文件
                .build()
        )
        .send()
        .await?;
    
    let task_arn = task.task_arn().unwrap();
    println!("Created task: {}", task_arn);
    
    // 3. 启动任务执行
    println!("Starting task execution...");
    let execution = client
        .start_task_execution()
        .task_arn(task_arn)
        .send()
        .await?;
    
    let execution_arn = execution.task_execution_arn().unwrap();
    println!("Started execution: {}", execution_arn);
    
    // 4. 监控任务状态
    loop {
        let status = client
            .describe_task_execution()
            .task_execution_arn(execution_arn)
            .send()
            .await?;
        
        println!("Current status: {:?}", status.status());
        println!("Progress: {}/{} bytes, {}/{} files",
            status.bytes_transferred().unwrap_or(0),
            status.bytes_written().unwrap_or(0),
            status.files_transferred().unwrap_or(0),
            status.files_written().unwrap_or(0)
        );
        
        match status.status() {
            Some(s) if s == datasync::model::TaskExecutionStatus::Success => break,
            Some(s) if s == datasync::model::TaskExecutionStatus::Error => {
                println!("Task failed!");
                break;
            },
            _ => sleep(Duration::from_secs(5)).await, // 每5秒检查一次
        }
    }
    
    Ok(())
}

最佳实践

  1. 合理设置带宽限制:使用Options中的bytes_per_second避免影响网络其他应用
  2. 增量传输:DataSync默认会增量传输,只同步变更的文件
  3. 监控任务:定期检查任务状态和传输统计信息
  4. 错误处理:实现重试逻辑处理临时性网络问题
  5. 安全配置:确保IAM角色有最小必要权限

总结

aws-sdk-datasync为Rust开发者提供了强大的AWS DataSync服务接口,使得在本地存储和AWS云存储之间高效、安全地传输数据变得简单。通过合理配置任务和选项,可以实现自动化、可靠的大规模数据传输解决方案。

回到顶部