Rust GraphQL订阅库juniper_subscriptions的使用,实现实时数据推送与WebSocket通信
Rust GraphQL订阅库juniper_subscriptions的使用,实现实时数据推送与WebSocket通信
juniper_subscriptions crate
这个库提供了SubscriptionCoordinator
和SubscriptionConnection
的实现,用于juniper,一个Rust的GraphQL库。
文档
关于这个crate的文档
示例
以下是一个工作示例代码,展示了一个带有GraphQL订阅处理程序的warp服务器。
许可证
这个项目使用BSD 2-Clause License。
完整示例代码
use futures::{Stream, StreamExt};
use juniper::{
graphql_object, graphql_subscription, EmptyMutation, EmptySubscription,
FieldError, RootNode, SubscriptionCoordinator,
};
use juniper_subscriptions::Coordinator;
use std::pin::Pin;
use tokio::sync::broadcast::{self, Sender};
use warp::{Filter, Reply};
// 定义GraphQL上下文
struct Context {
event_sender: Sender<String>,
}
impl juniper::Context for Context {}
// 定义查询根
struct Query;
#[graphql_object(context = Context)]
impl Query {
fn hello(&self) -> &str {
"Hello, world!"
}
}
// 定义订阅根
struct Subscription;
type StringStream = Pin<Box<dyn Stream<Item = Result<String, FieldError>> + Send>>;
#[graphql_subscription(context = Context)]
impl Subscription {
async fn events(context: &Context) -> StringStream {
let mut receiver = context.event_sender.subscribe();
let stream = async_stream::stream! {
while let Ok(msg) = receiver.recv().await {
yield Ok(msg);
}
};
Box::pin(stream)
}
}
// 主函数
#[tokio::main]
async fn main() {
// 创建事件广播通道
let (event_sender, _) = broadcast::channel(16);
// 创建GraphQL Schema
let schema = RootNode::new(
Query,
EmptyMutation::new(),
Subscription,
);
// 创建订阅协调器
let coordinator = Coordinator::new(schema);
// 创建warp过滤器
let ctx = warp::any().map(move || Context {
event_sender: event_sender.clone(),
});
let graphql_filter = juniper_warp::make_graphql_filter(schema, ctx.boxed());
// 设置路由
let routes = warp::path("graphql")
.and(warp::post())
.and(graphql_filter)
.or(warp::path("subscriptions")
.and(warp::ws())
.map(move |ws: warp::ws::Ws| {
let coordinator = coordinator.clone();
ws.on_upgrade(move |websocket| {
juniper_warp::subscriptions::serve(
"subscriptions",
coordinator,
websocket,
)
})
}));
// 启动事件发送任务
tokio::spawn(async move {
let mut counter = 0;
loop {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
counter += 1;
let _ = event_sender.send(format!("Event {}", counter));
}
});
// 启动服务器
warp::serve(routes).run(([127, 0, 0, 1], 8080)).await;
}
示例说明
-
依赖项:
- juniper: 提供GraphQL核心功能
- juniper_subscriptions: 处理订阅功能
- warp: 作为Web服务器
- tokio: 异步运行时
- futures: 提供Stream支持
-
主要组件:
- Query: 定义常规GraphQL查询
- Subscription: 定义订阅类型和订阅字段
- Context: 包含事件广播通道用于推送消息
- Coordinator: 管理订阅连接
-
工作流程:
- 创建事件广播通道
- 定义GraphQL Schema
- 设置HTTP和WebSocket路由
- 启动后台任务定期发送事件
- 客户端可以通过WebSocket订阅事件
-
使用方式:
- 常规GraphQL查询通过POST /graphql
- 订阅通过WebSocket /subscriptions
这个示例展示了如何使用juniper_subscriptions实现一个完整的GraphQL订阅服务器,包括实时数据推送和WebSocket通信功能。
1 回复
以下是基于提供内容整理的完整示例demo:
// Cargo.toml
// [dependencies]
// juniper = "0.15"
// juniper_subscriptions = "0.15"
// tokio = { version = "1.0", features = ["full"] }
// warp = "0.3"
// futures = "0.3"
use juniper::{FieldResult, RootNode};
use juniper_subscriptions::Coordinator;
use warp::Filter;
use std::sync::Arc;
use futures::StreamExt;
use tokio::time::interval;
use std::time::Duration;
// 定义查询类型
struct Query;
#[juniper::graphql_object]
impl Query {
// 简单示例查询
fn dummy(&self) -> bool {
true
}
}
// 定义订阅类型
struct Subscription;
#[juniper::graphql_subscription]
impl Subscription {
// 计数器订阅,每秒递增
async fn ticker(&self, count: i32) -> impl futures::Stream<Item = FieldResult<i32>> {
let mut count = count;
interval(Duration::from_secs(1))
.map(move |_| {
count += 1;
FieldResult::Ok(count)
})
.take(10) // 限制发送10次
}
}
// 定义Schema类型
type Schema = RootNode<'static, Query, juniper::EmptyMutation<()>, Subscription>;
fn schema() -> Schema {
Schema::new(Query, juniper::EmptyMutation::new(), Subscription)
}
#[tokio::main]
async fn main() {
// 创建Schema和Coordinator
let schema = schema();
let coordinator = Arc::new(Coordinator::new(schema));
// 设置WebSocket路由
let graphql_subscription = warp::path("subscriptions")
.and(warp::ws())
.map(move |ws: warp::ws::Ws| {
let coordinator = coordinator.clone();
ws.on_upgrade(move |websocket| {
// 处理WebSocket升级
juniper_subscriptions::serve_ws(websocket, coordinator)
})
});
// 启动服务器
println!("GraphQL subscription server running on ws://localhost:8080/subscriptions");
warp::serve(graphql_subscription)
.run(([127, 0, 0, 1], 8080))
.await;
}
对应的客户端JavaScript代码:
import { ApolloClient, InMemoryCache } from '@apollo/client/core';
import { WebSocketLink } from '@apollo/client/link/ws';
import { SubscriptionClient } from 'subscriptions-transport-ws';
// 创建WebSocket链接
const wsLink = new WebSocketLink({
uri: 'ws://localhost:8080/subscriptions',
options: {
reconnect: true
}
});
// 创建Apollo客户端
const client = new ApolloClient({
link: wsLink,
cache: new InMemoryCache()
});
// 定义订阅查询
const TICKER_SUBSCRIPTION = gql`
subscription Ticker($count: Int!) {
ticker(count: $count)
}
`;
// 执行订阅
const observable = client.subscribe({
query: TICKER_SUBSCRIPTION,
variables: { count: 0 }
});
// 处理订阅结果
const subscription = observable.subscribe({
next(response) {
console.log('收到数据:', response.data.ticker);
},
error(err) {
console.error('订阅错误:', err);
},
complete() {
console.log('订阅完成');
}
});
// 5秒后取消订阅
setTimeout(() => {
subscription.unsubscribe();
}, 5000);
这个完整示例包含:
- Rust服务端代码,实现了GraphQL订阅端点
- 客户端JavaScript代码,使用Apollo Client连接订阅
- 每秒递增的计数器订阅示例
- 基本的WebSocket通信实现
要运行这个示例:
- 将Rust代码保存为main.rs
- 添加对应的Cargo.toml依赖
- 运行服务端:cargo run
- 在浏览器或Node.js环境中运行客户端代码