Rust数据加载与批处理库dataloader的使用,高效实现GraphQL批量查询与缓存优化

Rust数据加载与批处理库dataloader的使用,高效实现GraphQL批量查询与缓存优化

Dataloader简介

Rust实现的DataLoader库,基于Facebook的DataLoader概念,使用async-await特性实现高效的批量数据加载和缓存。

主要特性

  • 支持带缓存的批量数据加载
  • 支持不带缓存的批量数据加载
  • 异步处理机制
  • 可配置的批量大小

安装与配置

运行时选择

默认使用async-std运行时:

[dependencies]
dataloader = "0.18"

使用Tokio运行时:

[dependencies]
dataloader = { version = "0.18", default-features = false, features = ["runtime-tokio"]}
futures = "0.3"

基础使用示例

use dataloader::cached::Loader;
use dataloader::BatchFn;
use futures::executor::block_on;
use futures::future::ready;
use std::collections::HashMap;
use std::thread;

// 定义批量加载函数
struct MyLoadFn;

impl BatchFn<usize, usize> for MyLoadFn {
    async fn load(&mut self, keys: &[usize]) -> HashMap<usize, usize> {
        println!("批量加载键: {:?}", keys);
        keys.iter()
            .map(|v| (v.clone(), v.clone()))
            .collect::<HashMap<_, _>>()
    }
}

fn main() {
    let mut i = 0;
    while i < 2 {
        let a = MyLoadFn;
        // 创建Loader实例,设置最大批量大小为4
        let loader = Loader::new(a).with_max_batch_size(4);

        // 线程1中加载数据
        let l1 = loader.clone();
        let h1 = thread::spawn(move || {
            let r1 = l1.load(1);
            let r2 = l1.load(2);
            let r3 = l1.load(3);
            let r4 = l1.load_many(vec![2, 3, 4, 5, 6, 7, 8]);
            
            // 等待所有加载完成
            let f = futures::future::join4(r1, r2, r3, r4);
            println!("线程1结果: {:?}", block_on(f));
        });

        // 线程2中加载数据
        let l2 = loader.clone();
        let h2 = thread::spawn(move || {
            let r1 = l2.load(1);
            let r2 = l2.load(2);
            let r3 = l2.load(3);
            let r4 = l2.load(4);
            
            // 等待所有加载完成
            let f = futures::future::join4(r1, r2, r3, r4);
            println!("线程2结果: {:?}", block_on(f));
        });

        h1.join().unwrap();
        h2.join().unwrap();
        i += 1;
    }
}

GraphQL集成完整示例

use async_graphql::{Context, FieldResult, Object};
use dataloader::{BatchFn, Loader};
use futures::executor::block_on;
use std::collections::HashMap;
use std::sync::Arc;

// 用户数据模型
#[derive(Clone)]
struct User {
    id: i32,
    name: String,
}

// 用户数据加载器
struct UserLoader;

#[async_trait::async_trait]
impl BatchFn<i32, User> for UserLoader {
    async fn load(&mut self, keys: &[i32]) -> HashMap<i32, User> {
        println!("批量加载用户ID: {:?}", keys);
        keys.iter()
            .map(|id| {
                (
                    *id,
                    User {
                        id: *id,
                        name: format!("用户 {}", id),
                    },
                )
            })
            .collect()
    }
}

// GraphQL查询定义
struct Query;

#[Object]
impl Query {
    // 查询单个用户
    async fn user(&self, ctx: &Context<'_>, id: i32) -> FieldResult<User> {
        let loader = ctx.data::<Loader<i32, User>>()?;
        Ok(loader.load(id).await)
    }

    // 批量查询多个用户
    async fn users(&self, ctx: &Context<'_>, ids: Vec<i32>) -> FieldResult<Vec<User>> {
        let loader = ctx.data::<Loader<i32, User>>()?;
        Ok(loader.load_many(ids).await)
    }
}

fn main() {
    // 创建数据加载器实例,设置最大批量大小为10
    let loader = Loader::new(UserLoader).with_max_batch_size(10);

    // 构建GraphQL Schema
    let schema = async_graphql::Schema::build(Query, EmptyMutation, EmptySubscription)
        .data(loader)
        .finish();

    // 执行GraphQL查询
    block_on(async {
        // 查询单个用户
        let res = schema.execute("{ user(id: 1) { id name } }").await;
        println!("单用户查询结果: {:?}", res);
        
        // 批量查询多个用户
        let res = schema.execute("{ users(ids: [1,2,3]) { id name } }").await;
        println!("多用户查询结果: {:?}", res);
    });
}

许可证

本项目采用以下任一许可证:

  • Apache License, Version 2.0
  • MIT license

1 回复

Rust数据加载与批处理库dataloader的使用:高效实现GraphQL批量查询与缓存优化

完整示例Demo

下面是一个完整的dataloader使用示例,展示如何在实际项目中实现批量数据加载和缓存:

use async_trait::async_trait;
use dataloader::{
    BatchFn, 
    cached::Loader,
    LoaderBuilder
};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::time::{sleep, Duration};

// 1. 定义用户结构体
#[derive(Debug, Clone)]
struct User {
    id: i32,
    name: String,
    email: String,
}

// 2. 实现BatchFn trait的数据加载器
struct UserBatchLoader;

#[async_trait]
impl BatchFn<i32, Arc<User>> for UserBatchLoader {
    async fn load(&self, keys: &[i32]) -> HashMap<i32, Arc<User>> {
        println!("批处理加载用户IDs: {:?}", keys);
        
        // 模拟数据库查询延迟
        sleep(Duration::from_millis(100)).await;
        
        keys.iter()
            .map(|id| {
                let user = Arc::new(User {
                    id: *id,
                    name: format!("用户-{}", id),
                    email: format!("user{}@example.com", id),
                });
                (*id, user)
            })
            .collect()
    }
}

#[tokio::main]
async fn main() {
    // 3. 创建带缓存的加载器
    let user_loader = LoaderBuilder::new(UserBatchLoader)
        .with_max_batch_size(10)
        .with_delay(Duration::from_millis(50))
        .build_cached();
    
    // 4. 模拟并发请求
    let futures = (1..=5).map(|id| {
        let loader = user_loader.clone();
        tokio::spawn(async move {
            loader.load(id).await
        })
    });
    
    // 5. 等待所有请求完成
    let users = futures::future::join_all(futures).await;
    
    // 6. 打印结果
    for user in users {
        match user {
            Ok(user) => println!("加载的用户: {:?}", user),
            Err(e) => println!("加载错误: {}", e),
        }
    }
    
    // 7. 测试缓存
    println!("\n测试缓存...");
    let cached_user = user_loader.load(3).await;
    println!("缓存用户: {:?}", cached_user);
    
    // 8. 测试批量加载
    println!("\n测试批量加载...");
    let batch_futures = vec![
        user_loader.load(10),
        user_loader.load(20),
        user_loader.load(10),  // 重复的ID会被去重
    ];
    
    let batch_users = futures::future::join_all(batch_futures).await;
    for user in batch_users {
        println!("批量加载用户: {:?}", user);
    }
}

代码说明

  1. 用户结构体:定义了包含id、name和email的User结构体
  2. 批处理加载器:实现了BatchFn trait,模拟从数据库批量加载用户数据
  3. 加载器配置:创建带缓存的加载器,设置每批最多10个请求,最大延迟50毫秒
  4. 并发请求:模拟5个并发用户请求
  5. 结果处理:使用futures::join_all等待所有请求完成
  6. 缓存测试:验证相同ID的第二次加载是否从缓存获取
  7. 批量测试:演示批量加载和请求去重功能

运行效果

  1. 首次运行会显示批处理加载过程
  2. 重复加载相同ID会直接从缓存获取
  3. 批量请求会自动合并为单个批处理操作
  4. 重复的请求键会自动去重

这个完整示例展示了dataloader在实际项目中的典型用法,包括批处理、缓存和并发请求处理等功能。

回到顶部