Rust GraphQL WebSocket实现库juniper_graphql_ws的使用,支持实时GraphQL订阅和WebSocket协议交互

Rust GraphQL WebSocket实现库juniper_graphql_ws的使用,支持实时GraphQL订阅和WebSocket协议交互

juniper_graphql_ws crate

Crates.io Documentation CI Rust 1.73+

这个crate包含2种协议的实现:

  1. (graphql-transport-ws 特性) 新的graphql-transport-ws GraphQL WebSocket协议,现在被Apollo和graphql-ws npm包使用。

  2. (graphql-ws 特性) 旧的graphql-ws GraphQL WebSocket协议,以前被Apollo和subscriptions-transport-ws npm包使用(已废弃,推荐使用上面提到的新graphql-transport-ws协议)。

使用示例

下面是一个使用juniper_graphql_ws实现GraphQL订阅的完整示例:

use juniper::{
    graphql_object, graphql_subscription, FieldError, RootNode, 
    GraphQLEnum, GraphQLInputObject, GraphQLObject
};
use juniper_graphql_ws::ConnectionConfig;
use tokio::net::TcpListener;
use futures::Stream;

// 定义GraphQL数据结构
#[derive(GraphQLEnum)]
enum Episode {
    NewHope,
    EmpireStrikesBack,
    ReturnOfTheJedi,
}

#[derive(GraphQLObject)]
#[graphql(description = "A humanoid creature in the Star Wars universe")]
struct Human {
    id: String,
    name: String,
    appears_in: Vec<Episode>,
    home_planet: String,
}

#[derive(GraphQLInputObject)]
#[graphql(description = "Input for creating a human")]
struct NewHuman {
    name: String,
    appears_in: Vec<Episode>,
    home_planet: String,
}

// 定义查询
pub struct Query;

#[graphql_object]
impl Query {
    fn human(&self, id: String) -> Option<Human> {
        Some(Human {
            id,
            name: "Luke Skywalker".to_owned(),
            appears_in: vec![Episode::NewHope],
            home_planet: "Tatooine".to_owned(),
        })
    }
}

// 定义订阅
pub struct Subscription;

type HumanStream = std::pin::Pin<Box<dyn Stream<Item = Result<Human, FieldError>> + Send>>;

#[graphql_subscription]
impl Subscription {
    async fn human_created(&self) -> HumanStream {
        let stream = futures::stream::iter(vec![
            Ok(Human {
                id: "1".to_owned(),
                name: "Luke Skywalker".to_owned(),
                appears_in: vec![Episode::NewHope],
                home_planet: "Tatooine".to_owned(),
            })
        ]);
        Box::pin(stream)
    }
}

// 创建Schema
type Schema = RootNode<'static, Query, juniper::EmptyMutation<()>, Subscription>;

#[tokio::main]
async fn main() {
    // 创建Schema
    let schema = Schema::new(
        Query,
        juniper::EmptyMutation::<()>::new(),
        Subscription,
    );

    // 配置WebSocket连接
    let config = ConnectionConfig::new(()).schema(schema);

    // 创建TCP监听器
    let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
    println!("GraphQL WebSocket server listening on ws://127.0.0.1:8080/graphql");

    // 接受连接并处理
    while let Ok((stream, _)) = listener.accept().await {
        let config = config.clone();
        
        tokio::spawn(async move {
            // 使用graphql-transport-ws协议处理连接
            juniper_graphql_ws::serve_graphql_transport_ws(stream, config)
                .await
                .unwrap();
        });
    }
}

安装

在项目目录中运行以下Cargo命令:

cargo add juniper_graphql_ws

或者在Cargo.toml中添加:

juniper_graphql_ws = "0.4.0"

许可证

本项目采用BSD 2-Clause许可证。


1 回复

Rust GraphQL WebSocket实现库juniper_graphql_ws使用指南

概述

juniper_graphql_ws是一个Rust库,用于在WebSocket协议上实现GraphQL订阅功能,特别适合需要实时数据推送的应用场景。它基于Juniper GraphQL框架,提供了完整的GraphQL over WebSocket协议支持。

主要特性

  • 完整的GraphQL over WebSocket协议实现
  • 支持GraphQL订阅(Subscription)操作
  • 与Juniper GraphQL框架无缝集成
  • 异步处理支持(tokio/async-std)
  • 可定制的连接生命周期管理

安装

在Cargo.toml中添加依赖:

[dependencies]
juniper_graphql_ws = "0.5"
tokio = { version = "1.0", features = ["full"] }
juniper = "0.15"

基本使用方法

1. 创建GraphQL Schema

首先定义常规的GraphQL schema:

use juniper::{graphql_object, FieldResult, RootNode};

struct Query;

#[graphql_object]
impl Query {
    fn hello(&self) -> &str {
        "Hello, world!"
    }
}

struct Subscription;

#[graphql_object]
impl Subscription {
    async fn count(&self, from: i32) -> impl futures::Stream<Item = FieldResult<i32>> {
        tokio_stream::iter(from..).map(Ok)
    }
}

type Schema = RootNode<'static, Query, juniper::EmptyMutation<()>, Subscription>;

fn schema() -> Schema {
    Schema::new(Query, juniper::EmptyMutation::new(), Subscription)
}

2. 设置WebSocket服务器

use juniper_graphql_ws::ConnectionConfig;
use tokio::net::TcpListener;
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let schema = Arc::new(schema());
    let listener = TcpListener::bind("127.0.0.1:8080").await?;
    
    while let Ok((stream, _)) = listener.accept().await {
        let schema = schema.clone();
        
        tokio::spawn(async move {
            let config = ConnectionConfig::new(schema);
            let upgraded = tokio_tungstenite::accept_async(stream).await?;
            
            juniper_graphql_ws::serve_connection(upgraded, config).await
        });
    }
    
    Ok(())
}

客户端连接示例

JavaScript客户端

const client = new WebSocket('ws://localhost:8080/graphql', 'graphql-ws');

client.onopen = () => {
  client.send(JSON.stringify({
    type: 'connection_init'
  }));
  
  // 订阅count字段
  client.send(JSON.stringify({
    id: '1',
    type: 'start',
    payload: {
      query: 'subscription { count(from: 0) }'
    }
  }));
};

client.onmessage = (event) => {
  const data = JSON.parse(event.data);
  if (data.type === 'data') {
    console.log('Received:', data.payload.data.count);
  }
};

高级配置

自定义上下文

struct MyContext {
    user_id: String,
}

let config = ConnectionConfig::new(schema)
    .with_context(|| Ok(MyContext { user_id: "123".to_string() }));

认证处理

let config = ConnectionConfig::new(schema)
    .on_connection_init(|value| {
        let token = value.get("token").and_then(|t| t.as_str());
        if token != Some("secret") {
            return Err("Unauthorized".into());
        }
        Ok(())
    });

完整示例Demo

下面是一个完整的Rust GraphQL WebSocket服务器实现,包含查询、变更和订阅功能:

use juniper::{
    graphql_object, graphql_subscription, FieldError, FieldResult, 
    RootNode, EmptyMutation, SubscriptionCoordinator
};
use juniper_graphql_ws::ConnectionConfig;
use tokio::net::TcpListener;
use std::sync::Arc;
use futures::Stream;
use tokio_stream::StreamExt;

// 定义查询类型
struct Query;

#[graphql_object]
impl Query {
    fn api_version() -> &str {
        "1.0"
    }
}

// 定义变更类型
struct Mutation;

#[graphql_object]
impl Mutation {
    fn echo(&self, message: String) -> String {
        message
    }
}

// 定义订阅类型
struct Subscription;

#[graphql_subscription]
impl Subscription {
    async fn ticker(interval: i32) -> impl Stream<Item = Result<i32, FieldError>> {
        let mut counter = 0;
        let interval = std::time::Duration::from_millis(interval as u64);
        
        tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(interval))
            .map(move |_| {
                counter += 1;
                Ok(counter)
            })
    }
}

// 定义Schema类型
type Schema = RootNode<'static, Query, Mutation, Subscription>;

fn create_schema() -> Schema {
    Schema::new(Query, Mutation, Subscription)
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 创建并共享schema
    let schema = Arc::new(create_schema());
    
    // 绑定TCP监听器
    let listener = TcpListener::bind("127.0.0.1:8080").await?;
    println!("WebSocket服务器运行在 ws://127.0.0.1:8080");
    
    // 处理每个连接
    while let Ok((stream, _)) = listener.accept().await {
        let schema = schema.clone();
        
        tokio::spawn(async move {
            // 配置连接
            let config = ConnectionConfig::new(schema)
                .with_context(|| Ok(())) // 简单上下文
                .on_connection_init(|_| Ok(())); // 连接初始化回调
            
            // 升级到WebSocket连接
            match tokio_tungstenite::accept_async(stream).await {
                Ok(ws_stream) => {
                    // 处理GraphQL WebSocket连接
                    juniper_graphql_ws::serve_connection(ws_stream, config).await
                }
                Err(e) => {
                    eprintln!("WebSocket握手失败: {}", e);
                    Ok(())
                }
            }
        });
    }
    
    Ok(())
}

对应的HTML/JavaScript客户端示例:

<!DOCTYPE html>
<html>
<head>
    <title>GraphQL WebSocket客户端</title>
</head>
<body>
    <script>
        const wsUrl = 'ws://localhost:8080';
        const client = new WebSocket(wsUrl, 'graphql-ws');
        
        // 订阅计数器
        let subscriptionActive = false;
        
        // 连接打开时初始化
        client.onopen = () => {
            console.log('WebSocket连接已建立');
            
            // 发送连接初始化消息
            client.send(JSON.stringify({
                type: 'connection_init'
            }));
            
            // 启动订阅
            startSubscription();
        };
        
        // 处理接收到的消息
        client.onmessage = (event) => {
            const data = JSON.parse(event.data);
            
            switch(data.type) {
                case 'connection_ack':
                    console.log('服务器已确认连接');
                    break;
                    
                case 'data':
                    console.log('收到数据:', data.payload.data);
                    break;
                    
                case 'complete':
                    console.log('订阅已完成');
                    break;
                    
                case 'error':
                    console.error('发生错误:', data.payload);
                    break;
            }
        };
        
        // 启动订阅
        function startSubscription() {
            if (subscriptionActive) return;
            
            client.send(JSON.stringify({
                id: '1',
                type: 'start',
                payload: {
                    query: 'subscription { ticker(interval: 1000) }'
                }
            }));
            
            subscriptionActive = true;
        }
        
        // 停止订阅
        function stopSubscription() {
            if (!subscriptionActive) return;
            
            client.send(JSON.stringify({
                id: '1',
                type: 'stop'
            }));
            
            subscriptionActive = false;
        }
    </script>
    
    <div>
        <button onclick="startSubscription()">开始订阅</button>
        <button onclick="stopSubscription()">停止订阅</button>
    </div>
</body>
</html>

注意事项

  1. 确保客户端使用graphql-ws子协议
  2. 生产环境中应考虑添加错误处理和日志记录
  3. 对于大规模部署,可能需要考虑连接管理和限流
  4. 订阅功能会保持连接活跃,需要合理管理资源
  5. 考虑使用TLS加密生产环境的WebSocket连接

juniper_graphql_ws为Rust应用提供了强大的实时GraphQL功能,特别适合需要实时数据更新的应用场景如聊天应用、实时仪表盘等。

回到顶部