Rust数据加载与批处理库dataloader的使用,高效实现GraphQL批量查询与缓存优化
Rust数据加载与批处理库dataloader的使用,高效实现GraphQL批量查询与缓存优化
Dataloader简介
Rust实现的DataLoader库,基于Facebook的DataLoader概念,使用async-await特性实现高效的批量数据加载和缓存。
主要特性
- 支持带缓存的批量数据加载
- 支持不带缓存的批量数据加载
- 异步处理机制
- 可配置的批量大小
安装与配置
运行时选择
默认使用async-std运行时:
[dependencies]
dataloader = "0.18"
使用Tokio运行时:
[dependencies]
dataloader = { version = "0.18", default-features = false, features = ["runtime-tokio"]}
futures = "0.3"
基础使用示例
use dataloader::cached::Loader;
use dataloader::BatchFn;
use futures::executor::block_on;
use futures::future::ready;
use std::collections::HashMap;
use std::thread;
// 定义批量加载函数
struct MyLoadFn;
impl BatchFn<usize, usize> for MyLoadFn {
async fn load(&mut self, keys: &[usize]) -> HashMap<usize, usize> {
println!("批量加载键: {:?}", keys);
keys.iter()
.map(|v| (v.clone(), v.clone()))
.collect::<HashMap<_, _>>()
}
}
fn main() {
let mut i = 0;
while i < 2 {
let a = MyLoadFn;
// 创建Loader实例,设置最大批量大小为4
let loader = Loader::new(a).with_max_batch_size(4);
// 线程1中加载数据
let l1 = loader.clone();
let h1 = thread::spawn(move || {
let r1 = l1.load(1);
let r2 = l1.load(2);
let r3 = l1.load(3);
let r4 = l1.load_many(vec![2, 3, 4, 5, 6, 7, 8]);
// 等待所有加载完成
let f = futures::future::join4(r1, r2, r3, r4);
println!("线程1结果: {:?}", block_on(f));
});
// 线程2中加载数据
let l2 = loader.clone();
let h2 = thread::spawn(move || {
let r1 = l2.load(1);
let r2 = l2.load(2);
let r3 = l2.load(3);
let r4 = l2.load(4);
// 等待所有加载完成
let f = futures::future::join4(r1, r2, r3, r4);
println!("线程2结果: {:?}", block_on(f));
});
h1.join().unwrap();
h2.join().unwrap();
i += 1;
}
}
GraphQL集成完整示例
use async_graphql::{Context, FieldResult, Object};
use dataloader::{BatchFn, Loader};
use futures::executor::block_on;
use std::collections::HashMap;
use std::sync::Arc;
// 用户数据模型
#[derive(Clone)]
struct User {
id: i32,
name: String,
}
// 用户数据加载器
struct UserLoader;
#[async_trait::async_trait]
impl BatchFn<i32, User> for UserLoader {
async fn load(&mut self, keys: &[i32]) -> HashMap<i32, User> {
println!("批量加载用户ID: {:?}", keys);
keys.iter()
.map(|id| {
(
*id,
User {
id: *id,
name: format!("用户 {}", id),
},
)
})
.collect()
}
}
// GraphQL查询定义
struct Query;
#[Object]
impl Query {
// 查询单个用户
async fn user(&self, ctx: &Context<'_>, id: i32) -> FieldResult<User> {
let loader = ctx.data::<Loader<i32, User>>()?;
Ok(loader.load(id).await)
}
// 批量查询多个用户
async fn users(&self, ctx: &Context<'_>, ids: Vec<i32>) -> FieldResult<Vec<User>> {
let loader = ctx.data::<Loader<i32, User>>()?;
Ok(loader.load_many(ids).await)
}
}
fn main() {
// 创建数据加载器实例,设置最大批量大小为10
let loader = Loader::new(UserLoader).with_max_batch_size(10);
// 构建GraphQL Schema
let schema = async_graphql::Schema::build(Query, EmptyMutation, EmptySubscription)
.data(loader)
.finish();
// 执行GraphQL查询
block_on(async {
// 查询单个用户
let res = schema.execute("{ user(id: 1) { id name } }").await;
println!("单用户查询结果: {:?}", res);
// 批量查询多个用户
let res = schema.execute("{ users(ids: [1,2,3]) { id name } }").await;
println!("多用户查询结果: {:?}", res);
});
}
许可证
本项目采用以下任一许可证:
- Apache License, Version 2.0
- MIT license
1 回复
Rust数据加载与批处理库dataloader的使用:高效实现GraphQL批量查询与缓存优化
完整示例Demo
下面是一个完整的dataloader使用示例,展示如何在实际项目中实现批量数据加载和缓存:
use async_trait::async_trait;
use dataloader::{
BatchFn,
cached::Loader,
LoaderBuilder
};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::time::{sleep, Duration};
// 1. 定义用户结构体
#[derive(Debug, Clone)]
struct User {
id: i32,
name: String,
email: String,
}
// 2. 实现BatchFn trait的数据加载器
struct UserBatchLoader;
#[async_trait]
impl BatchFn<i32, Arc<User>> for UserBatchLoader {
async fn load(&self, keys: &[i32]) -> HashMap<i32, Arc<User>> {
println!("批处理加载用户IDs: {:?}", keys);
// 模拟数据库查询延迟
sleep(Duration::from_millis(100)).await;
keys.iter()
.map(|id| {
let user = Arc::new(User {
id: *id,
name: format!("用户-{}", id),
email: format!("user{}@example.com", id),
});
(*id, user)
})
.collect()
}
}
#[tokio::main]
async fn main() {
// 3. 创建带缓存的加载器
let user_loader = LoaderBuilder::new(UserBatchLoader)
.with_max_batch_size(10)
.with_delay(Duration::from_millis(50))
.build_cached();
// 4. 模拟并发请求
let futures = (1..=5).map(|id| {
let loader = user_loader.clone();
tokio::spawn(async move {
loader.load(id).await
})
});
// 5. 等待所有请求完成
let users = futures::future::join_all(futures).await;
// 6. 打印结果
for user in users {
match user {
Ok(user) => println!("加载的用户: {:?}", user),
Err(e) => println!("加载错误: {}", e),
}
}
// 7. 测试缓存
println!("\n测试缓存...");
let cached_user = user_loader.load(3).await;
println!("缓存用户: {:?}", cached_user);
// 8. 测试批量加载
println!("\n测试批量加载...");
let batch_futures = vec![
user_loader.load(10),
user_loader.load(20),
user_loader.load(10), // 重复的ID会被去重
];
let batch_users = futures::future::join_all(batch_futures).await;
for user in batch_users {
println!("批量加载用户: {:?}", user);
}
}
代码说明
- 用户结构体:定义了包含id、name和email的User结构体
- 批处理加载器:实现了BatchFn trait,模拟从数据库批量加载用户数据
- 加载器配置:创建带缓存的加载器,设置每批最多10个请求,最大延迟50毫秒
- 并发请求:模拟5个并发用户请求
- 结果处理:使用futures::join_all等待所有请求完成
- 缓存测试:验证相同ID的第二次加载是否从缓存获取
- 批量测试:演示批量加载和请求去重功能
运行效果
- 首次运行会显示批处理加载过程
- 重复加载相同ID会直接从缓存获取
- 批量请求会自动合并为单个批处理操作
- 重复的请求键会自动去重
这个完整示例展示了dataloader在实际项目中的典型用法,包括批处理、缓存和并发请求处理等功能。