Rust GraphQL订阅库juniper_subscriptions的使用,实现实时数据推送与WebSocket通信

Rust GraphQL订阅库juniper_subscriptions的使用,实现实时数据推送与WebSocket通信

juniper_subscriptions crate

Crates.io Documentation CI Rust 1.73+

这个库提供了SubscriptionCoordinatorSubscriptionConnection的实现,用于juniper,一个Rust的GraphQL库。

文档

关于这个crate的文档

示例

以下是一个工作示例代码,展示了一个带有GraphQL订阅处理程序的warp服务器。

许可证

这个项目使用BSD 2-Clause License。

完整示例代码

use futures::{Stream, StreamExt};
use juniper::{
    graphql_object, graphql_subscription, EmptyMutation, EmptySubscription, 
    FieldError, RootNode, SubscriptionCoordinator,
};
use juniper_subscriptions::Coordinator;
use std::pin::Pin;
use tokio::sync::broadcast::{self, Sender};
use warp::{Filter, Reply};

// 定义GraphQL上下文
struct Context {
    event_sender: Sender<String>,
}

impl juniper::Context for Context {}

// 定义查询根
struct Query;

#[graphql_object(context = Context)]
impl Query {
    fn hello(&self) -> &str {
        "Hello, world!"
    }
}

// 定义订阅根
struct Subscription;

type StringStream = Pin<Box<dyn Stream<Item = Result<String, FieldError>> + Send>>;

#[graphql_subscription(context = Context)]
impl Subscription {
    async fn events(context: &Context) -> StringStream {
        let mut receiver = context.event_sender.subscribe();
        let stream = async_stream::stream! {
            while let Ok(msg) = receiver.recv().await {
                yield Ok(msg);
            }
        };
        Box::pin(stream)
    }
}

// 主函数
#[tokio::main]
async fn main() {
    // 创建事件广播通道
    let (event_sender, _) = broadcast::channel(16);
    
    // 创建GraphQL Schema
    let schema = RootNode::new(
        Query,
        EmptyMutation::new(),
        Subscription,
    );
    
    // 创建订阅协调器
    let coordinator = Coordinator::new(schema);
    
    // 创建warp过滤器
    let ctx = warp::any().map(move || Context {
        event_sender: event_sender.clone(),
    });
    
    let graphql_filter = juniper_warp::make_graphql_filter(schema, ctx.boxed());
    
    // 设置路由
    let routes = warp::path("graphql")
        .and(warp::post())
        .and(graphql_filter)
        .or(warp::path("subscriptions")
            .and(warp::ws())
            .map(move |ws: warp::ws::Ws| {
                let coordinator = coordinator.clone();
                ws.on_upgrade(move |websocket| {
                    juniper_warp::subscriptions::serve(
                        "subscriptions",
                        coordinator,
                        websocket,
                    )
                })
            }));
    
    // 启动事件发送任务
    tokio::spawn(async move {
        let mut counter = 0;
        loop {
            tokio::time::sleep(std::time::Duration::from_secs(1)).await;
            counter += 1;
            let _ = event_sender.send(format!("Event {}", counter));
        }
    });
    
    // 启动服务器
    warp::serve(routes).run(([127, 0, 0, 1], 8080)).await;
}

示例说明

  1. 依赖项:

    • juniper: 提供GraphQL核心功能
    • juniper_subscriptions: 处理订阅功能
    • warp: 作为Web服务器
    • tokio: 异步运行时
    • futures: 提供Stream支持
  2. 主要组件:

    • Query: 定义常规GraphQL查询
    • Subscription: 定义订阅类型和订阅字段
    • Context: 包含事件广播通道用于推送消息
    • Coordinator: 管理订阅连接
  3. 工作流程:

    • 创建事件广播通道
    • 定义GraphQL Schema
    • 设置HTTP和WebSocket路由
    • 启动后台任务定期发送事件
    • 客户端可以通过WebSocket订阅事件
  4. 使用方式:

    • 常规GraphQL查询通过POST /graphql
    • 订阅通过WebSocket /subscriptions

这个示例展示了如何使用juniper_subscriptions实现一个完整的GraphQL订阅服务器,包括实时数据推送和WebSocket通信功能。


1 回复

以下是基于提供内容整理的完整示例demo:

// Cargo.toml
// [dependencies]
// juniper = "0.15"
// juniper_subscriptions = "0.15"
// tokio = { version = "1.0", features = ["full"] }
// warp = "0.3"
// futures = "0.3"

use juniper::{FieldResult, RootNode};
use juniper_subscriptions::Coordinator;
use warp::Filter;
use std::sync::Arc;
use futures::StreamExt;
use tokio::time::interval;
use std::time::Duration;

// 定义查询类型
struct Query;

#[juniper::graphql_object]
impl Query {
    // 简单示例查询
    fn dummy(&self) -> bool {
        true
    }
}

// 定义订阅类型
struct Subscription;

#[juniper::graphql_subscription]
impl Subscription {
    // 计数器订阅,每秒递增
    async fn ticker(&self, count: i32) -> impl futures::Stream<Item = FieldResult<i32>> {
        let mut count = count;
        interval(Duration::from_secs(1))
            .map(move |_| {
                count += 1;
                FieldResult::Ok(count)
            })
            .take(10) // 限制发送10次
    }
}

// 定义Schema类型
type Schema = RootNode<'static, Query, juniper::EmptyMutation<()>, Subscription>;

fn schema() -> Schema {
    Schema::new(Query, juniper::EmptyMutation::new(), Subscription)
}

#[tokio::main]
async fn main() {
    // 创建Schema和Coordinator
    let schema = schema();
    let coordinator = Arc::new(Coordinator::new(schema));
    
    // 设置WebSocket路由
    let graphql_subscription = warp::path("subscriptions")
        .and(warp::ws())
        .map(move |ws: warp::ws::Ws| {
            let coordinator = coordinator.clone();
            ws.on_upgrade(move |websocket| {
                // 处理WebSocket升级
                juniper_subscriptions::serve_ws(websocket, coordinator)
            })
        });
    
    // 启动服务器
    println!("GraphQL subscription server running on ws://localhost:8080/subscriptions");
    warp::serve(graphql_subscription)
        .run(([127, 0, 0, 1], 8080))
        .await;
}

对应的客户端JavaScript代码:

import { ApolloClient, InMemoryCache } from '@apollo/client/core';
import { WebSocketLink } from '@apollo/client/link/ws';
import { SubscriptionClient } from 'subscriptions-transport-ws';

// 创建WebSocket链接
const wsLink = new WebSocketLink({
  uri: 'ws://localhost:8080/subscriptions',
  options: {
    reconnect: true
  }
});

// 创建Apollo客户端
const client = new ApolloClient({
  link: wsLink,
  cache: new InMemoryCache()
});

// 定义订阅查询
const TICKER_SUBSCRIPTION = gql`
  subscription Ticker($count: Int!) {
    ticker(count: $count)
  }
`;

// 执行订阅
const observable = client.subscribe({
  query: TICKER_SUBSCRIPTION,
  variables: { count: 0 }
});

// 处理订阅结果
const subscription = observable.subscribe({
  next(response) {
    console.log('收到数据:', response.data.ticker);
  },
  error(err) {
    console.error('订阅错误:', err);
  },
  complete() {
    console.log('订阅完成');
  }
});

// 5秒后取消订阅
setTimeout(() => {
  subscription.unsubscribe();
}, 5000);

这个完整示例包含:

  1. Rust服务端代码,实现了GraphQL订阅端点
  2. 客户端JavaScript代码,使用Apollo Client连接订阅
  3. 每秒递增的计数器订阅示例
  4. 基本的WebSocket通信实现

要运行这个示例:

  1. 将Rust代码保存为main.rs
  2. 添加对应的Cargo.toml依赖
  3. 运行服务端:cargo run
  4. 在浏览器或Node.js环境中运行客户端代码
回到顶部