Rust GraphQL WebSocket实现库juniper_graphql_ws的使用,支持实时GraphQL订阅和WebSocket协议交互
Rust GraphQL WebSocket实现库juniper_graphql_ws的使用,支持实时GraphQL订阅和WebSocket协议交互
juniper_graphql_ws
crate
这个crate包含2种协议的实现:
-
(
graphql-transport-ws
特性) 新的graphql-transport-ws
GraphQL WebSocket协议,现在被Apollo和graphql-ws
npm包使用。 -
(
graphql-ws
特性) 旧的graphql-ws
GraphQL WebSocket协议,以前被Apollo和subscriptions-transport-ws
npm包使用(已废弃,推荐使用上面提到的新graphql-transport-ws
协议)。
使用示例
下面是一个使用juniper_graphql_ws实现GraphQL订阅的完整示例:
use juniper::{
graphql_object, graphql_subscription, FieldError, RootNode,
GraphQLEnum, GraphQLInputObject, GraphQLObject
};
use juniper_graphql_ws::ConnectionConfig;
use tokio::net::TcpListener;
use futures::Stream;
// 定义GraphQL数据结构
#[derive(GraphQLEnum)]
enum Episode {
NewHope,
EmpireStrikesBack,
ReturnOfTheJedi,
}
#[derive(GraphQLObject)]
#[graphql(description = "A humanoid creature in the Star Wars universe")]
struct Human {
id: String,
name: String,
appears_in: Vec<Episode>,
home_planet: String,
}
#[derive(GraphQLInputObject)]
#[graphql(description = "Input for creating a human")]
struct NewHuman {
name: String,
appears_in: Vec<Episode>,
home_planet: String,
}
// 定义查询
pub struct Query;
#[graphql_object]
impl Query {
fn human(&self, id: String) -> Option<Human> {
Some(Human {
id,
name: "Luke Skywalker".to_owned(),
appears_in: vec![Episode::NewHope],
home_planet: "Tatooine".to_owned(),
})
}
}
// 定义订阅
pub struct Subscription;
type HumanStream = std::pin::Pin<Box<dyn Stream<Item = Result<Human, FieldError>> + Send>>;
#[graphql_subscription]
impl Subscription {
async fn human_created(&self) -> HumanStream {
let stream = futures::stream::iter(vec![
Ok(Human {
id: "1".to_owned(),
name: "Luke Skywalker".to_owned(),
appears_in: vec![Episode::NewHope],
home_planet: "Tatooine".to_owned(),
})
]);
Box::pin(stream)
}
}
// 创建Schema
type Schema = RootNode<'static, Query, juniper::EmptyMutation<()>, Subscription>;
#[tokio::main]
async fn main() {
// 创建Schema
let schema = Schema::new(
Query,
juniper::EmptyMutation::<()>::new(),
Subscription,
);
// 配置WebSocket连接
let config = ConnectionConfig::new(()).schema(schema);
// 创建TCP监听器
let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
println!("GraphQL WebSocket server listening on ws://127.0.0.1:8080/graphql");
// 接受连接并处理
while let Ok((stream, _)) = listener.accept().await {
let config = config.clone();
tokio::spawn(async move {
// 使用graphql-transport-ws协议处理连接
juniper_graphql_ws::serve_graphql_transport_ws(stream, config)
.await
.unwrap();
});
}
}
安装
在项目目录中运行以下Cargo命令:
cargo add juniper_graphql_ws
或者在Cargo.toml中添加:
juniper_graphql_ws = "0.4.0"
许可证
本项目采用BSD 2-Clause许可证。
1 回复
Rust GraphQL WebSocket实现库juniper_graphql_ws使用指南
概述
juniper_graphql_ws
是一个Rust库,用于在WebSocket协议上实现GraphQL订阅功能,特别适合需要实时数据推送的应用场景。它基于Juniper GraphQL框架,提供了完整的GraphQL over WebSocket协议支持。
主要特性
- 完整的GraphQL over WebSocket协议实现
- 支持GraphQL订阅(Subscription)操作
- 与Juniper GraphQL框架无缝集成
- 异步处理支持(tokio/async-std)
- 可定制的连接生命周期管理
安装
在Cargo.toml中添加依赖:
[dependencies]
juniper_graphql_ws = "0.5"
tokio = { version = "1.0", features = ["full"] }
juniper = "0.15"
基本使用方法
1. 创建GraphQL Schema
首先定义常规的GraphQL schema:
use juniper::{graphql_object, FieldResult, RootNode};
struct Query;
#[graphql_object]
impl Query {
fn hello(&self) -> &str {
"Hello, world!"
}
}
struct Subscription;
#[graphql_object]
impl Subscription {
async fn count(&self, from: i32) -> impl futures::Stream<Item = FieldResult<i32>> {
tokio_stream::iter(from..).map(Ok)
}
}
type Schema = RootNode<'static, Query, juniper::EmptyMutation<()>, Subscription>;
fn schema() -> Schema {
Schema::new(Query, juniper::EmptyMutation::new(), Subscription)
}
2. 设置WebSocket服务器
use juniper_graphql_ws::ConnectionConfig;
use tokio::net::TcpListener;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let schema = Arc::new(schema());
let listener = TcpListener::bind("127.0.0.1:8080").await?;
while let Ok((stream, _)) = listener.accept().await {
let schema = schema.clone();
tokio::spawn(async move {
let config = ConnectionConfig::new(schema);
let upgraded = tokio_tungstenite::accept_async(stream).await?;
juniper_graphql_ws::serve_connection(upgraded, config).await
});
}
Ok(())
}
客户端连接示例
JavaScript客户端
const client = new WebSocket('ws://localhost:8080/graphql', 'graphql-ws');
client.onopen = () => {
client.send(JSON.stringify({
type: 'connection_init'
}));
// 订阅count字段
client.send(JSON.stringify({
id: '1',
type: 'start',
payload: {
query: 'subscription { count(from: 0) }'
}
}));
};
client.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === 'data') {
console.log('Received:', data.payload.data.count);
}
};
高级配置
自定义上下文
struct MyContext {
user_id: String,
}
let config = ConnectionConfig::new(schema)
.with_context(|| Ok(MyContext { user_id: "123".to_string() }));
认证处理
let config = ConnectionConfig::new(schema)
.on_connection_init(|value| {
let token = value.get("token").and_then(|t| t.as_str());
if token != Some("secret") {
return Err("Unauthorized".into());
}
Ok(())
});
完整示例Demo
下面是一个完整的Rust GraphQL WebSocket服务器实现,包含查询、变更和订阅功能:
use juniper::{
graphql_object, graphql_subscription, FieldError, FieldResult,
RootNode, EmptyMutation, SubscriptionCoordinator
};
use juniper_graphql_ws::ConnectionConfig;
use tokio::net::TcpListener;
use std::sync::Arc;
use futures::Stream;
use tokio_stream::StreamExt;
// 定义查询类型
struct Query;
#[graphql_object]
impl Query {
fn api_version() -> &str {
"1.0"
}
}
// 定义变更类型
struct Mutation;
#[graphql_object]
impl Mutation {
fn echo(&self, message: String) -> String {
message
}
}
// 定义订阅类型
struct Subscription;
#[graphql_subscription]
impl Subscription {
async fn ticker(interval: i32) -> impl Stream<Item = Result<i32, FieldError>> {
let mut counter = 0;
let interval = std::time::Duration::from_millis(interval as u64);
tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(interval))
.map(move |_| {
counter += 1;
Ok(counter)
})
}
}
// 定义Schema类型
type Schema = RootNode<'static, Query, Mutation, Subscription>;
fn create_schema() -> Schema {
Schema::new(Query, Mutation, Subscription)
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 创建并共享schema
let schema = Arc::new(create_schema());
// 绑定TCP监听器
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("WebSocket服务器运行在 ws://127.0.0.1:8080");
// 处理每个连接
while let Ok((stream, _)) = listener.accept().await {
let schema = schema.clone();
tokio::spawn(async move {
// 配置连接
let config = ConnectionConfig::new(schema)
.with_context(|| Ok(())) // 简单上下文
.on_connection_init(|_| Ok(())); // 连接初始化回调
// 升级到WebSocket连接
match tokio_tungstenite::accept_async(stream).await {
Ok(ws_stream) => {
// 处理GraphQL WebSocket连接
juniper_graphql_ws::serve_connection(ws_stream, config).await
}
Err(e) => {
eprintln!("WebSocket握手失败: {}", e);
Ok(())
}
}
});
}
Ok(())
}
对应的HTML/JavaScript客户端示例:
<!DOCTYPE html>
<html>
<head>
<title>GraphQL WebSocket客户端</title>
</head>
<body>
<script>
const wsUrl = 'ws://localhost:8080';
const client = new WebSocket(wsUrl, 'graphql-ws');
// 订阅计数器
let subscriptionActive = false;
// 连接打开时初始化
client.onopen = () => {
console.log('WebSocket连接已建立');
// 发送连接初始化消息
client.send(JSON.stringify({
type: 'connection_init'
}));
// 启动订阅
startSubscription();
};
// 处理接收到的消息
client.onmessage = (event) => {
const data = JSON.parse(event.data);
switch(data.type) {
case 'connection_ack':
console.log('服务器已确认连接');
break;
case 'data':
console.log('收到数据:', data.payload.data);
break;
case 'complete':
console.log('订阅已完成');
break;
case 'error':
console.error('发生错误:', data.payload);
break;
}
};
// 启动订阅
function startSubscription() {
if (subscriptionActive) return;
client.send(JSON.stringify({
id: '1',
type: 'start',
payload: {
query: 'subscription { ticker(interval: 1000) }'
}
}));
subscriptionActive = true;
}
// 停止订阅
function stopSubscription() {
if (!subscriptionActive) return;
client.send(JSON.stringify({
id: '1',
type: 'stop'
}));
subscriptionActive = false;
}
</script>
<div>
<button onclick="startSubscription()">开始订阅</button>
<button onclick="stopSubscription()">停止订阅</button>
</div>
</body>
</html>
注意事项
- 确保客户端使用
graphql-ws
子协议 - 生产环境中应考虑添加错误处理和日志记录
- 对于大规模部署,可能需要考虑连接管理和限流
- 订阅功能会保持连接活跃,需要合理管理资源
- 考虑使用TLS加密生产环境的WebSocket连接
juniper_graphql_ws
为Rust应用提供了强大的实时GraphQL功能,特别适合需要实时数据更新的应用场景如聊天应用、实时仪表盘等。