Rust WebSocket客户端库graphql-ws-client的使用:轻量级GraphQL订阅与实时数据传输

Rust WebSocket客户端库graphql-ws-client的使用:轻量级GraphQL订阅与实时数据传输

概述

graphql-ws-client是一个运行时无关的Rust库,实现了GraphQL-over-WebSocket协议。当前版本主要支持订阅功能,未来将支持查询和突变操作。

该库兼容多种WebSocket实现:

  • async-tungstenite
  • tokio-tungstenite
  • ws-stream-wasm

集成

通过特性标志可与主流GraphQL客户端集成:

  • graphql-client: 启用client-graphql-client特性
  • cynic: 启用client-cynic特性

文档

当前文档资源有限,主要参考:

  1. 库提供的示例代码
  2. docs.rs上的API文档

日志记录

默认在trace级别输出调试日志,可通过no-logging特性完全禁用日志。

安装方法

使用Cargo命令安装:

cargo add graphql-ws-client

或直接在Cargo.toml中添加:

graphql-ws-client = "0.11.1"

完整示例代码

以下是使用tokio-tungstenite的完整订阅示例:

use futures::{SinkExt, StreamExt};
use graphql_ws_client::{Client, Subscription};
use serde_json::json;
use tokio::net::TcpStream;
use tokio_tungstenite::{
    connect_async,
    tungstenite::protocol::Message,
    MaybeTlsStream,
    WebSocketStream,
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 建立WebSocket连接
    let (ws_stream, _) = connect_async("ws://localhost:8080/graphql").await?;
    
    // 初始化客户端
    let mut client = Client::new(ws_stream);
    
    // 启动连接
    client.start().await?;
    
    // 构建订阅请求
    let subscription = Subscription::new(
        "subscription { messageAdded { content } }", // GraphQL订阅语句
        None, // 查询变量
        None, // 操作名称
    );
    
    // 发起订阅并获取消息流
    let mut stream = client.subscribe(&subscription).await?;
    
    // 处理实时消息
    while let Some(message) = stream.next().await {
        match message {
            Ok(data) => println!("收到新消息: {:?}", data),
            Err(e) => {
                eprintln!("订阅错误: {:?}", e);
                break;
            }
        }
    }
    
    // 关闭连接
    client.stop().await?;
    
    Ok(())
}

操作流程说明

  1. 使用tokio-tungstenite建立WebSocket连接
  2. 创建graphql-ws-client实例
  3. 调用start()初始化协议握手
  4. 构建包含GraphQL查询的Subscription对象
  5. 通过subscribe()获取消息流
  6. 使用流处理器接收实时数据
  7. 最后调用stop()关闭连接

扩展示例

以下是带变量和错误处理的增强版示例:

use futures::{SinkExt, StreamExt};
use graphql_ws_client::{Client, Subscription};
use serde_json::json;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 带TLS的安全连接
    let (ws_stream, _) = tokio_tungstenite::connect_async(
        "wss://api.example.com/graphql"
    ).await?;

    let mut client = Client::new(ws_stream);
    
    // 更健壮的启动处理
    client.start().await
        .map_err(|e| format!("连接启动失败: {}", e))?;
    
    // 带变量的订阅
    let vars = json!({
        "roomId": "123"
    });
    
    let subscription = Subscription::new(
        r#"
        subscription MessageFeed($roomId: ID!) {
            messageAdded(roomId: $roomId) {
                id
                content
                author
            }
        }
        "#,
        Some(vars),
        Some("MessageFeed"),
    );
    
    let mut stream = client.subscribe(&subscription).await
        .map_err(|e| format!("订阅失败: {}", e))?;
    
    // 使用select宏处理多个异步事件
    loop {
        tokio::select! {
            Some(msg) = stream.next() => match msg {
                Ok(data) => {
                    println!("新消息: {:#?}", data);
                    // 这里可以添加业务处理逻辑
                },
                Err(e) => {
                    eprintln!("消息解析错误: {}", e);
                    break;
                }
            },
            _ = tokio::signal::ctrl_c() => {
                println!("收到终止信号,关闭连接...");
                break;
            }
        }
    }
    
    // 确保连接关闭
    client.stop().await
        .map_err(|e| format!("关闭连接失败: {}", e))?;
    
    Ok(())
}

这个增强版示例展示了:

  • 安全连接处理
  • 带变量的GraphQL查询
  • 更完善的错误处理
  • 信号处理
  • 结构化日志输出

可以根据实际需求调整订阅查询和消息处理逻辑。


1 回复

这是一个完整的聊天室应用示例,展示如何使用graphql-ws-client处理实时消息订阅:

use graphql_ws_client::{Client, graphql::Request};
use futures::{SinkExt, StreamExt};
use tokio::time::{sleep, Duration};
use serde_json::json;

#[tokio::main]
async fn main() {
    // 1. 建立WebSocket连接
    let (mut client, mut connection) = Client::connect("ws://localhost:8080/graphql")
        .await
        .expect("Failed to connect to WebSocket");
    
    // 2. 后台任务处理接收到的消息
    tokio::spawn(async move {
        while let Some(msg) = connection.next().await {
            match msg {
                Ok(msg) => {
                    // 处理收到的消息
                    if let Some(data) = msg.data {
                        println!("新消息: {}", json!(data));
                    }
                },
                Err(e) => eprintln!("连接错误: {}", e),
            }
        }
    });

    // 3. 订阅聊天室消息
    let subscription = r#"
        subscription OnNewMessage($roomId: String!) {
            newMessage(roomId: $roomId) {
                id
                content
                sender
                timestamp
            }
        }
    "#;

    let request = Request::new(subscription)
        .variable("roomId", "main_room");

    let subscription_id = client.subscribe(request)
        .await
        .expect("订阅失败");

    println!("已成功订阅聊天室消息,按Ctrl+C退出");

    // 4. 保持程序运行
    loop {
        sleep(Duration::from_secs(10)).await;
        
        // 发送心跳保持连接
        client.keep_alive().await.expect("心跳发送失败");
        println!("已发送心跳包保持连接");
    }
}

这个示例包含以下功能:

  1. 建立WebSocket连接
  2. 后台任务处理实时消息
  3. 订阅特定聊天室的消息
  4. 添加了心跳机制保持连接
  5. 使用serde_json格式化输出消息内容

要运行这个示例,你需要:

  1. 确保有可用的GraphQL WebSocket服务器
  2. 在Cargo.toml中添加必要的依赖
  3. 根据实际服务器地址修改连接URL

这个示例展示了graphql-ws-client在实时聊天应用中的典型用法,包括连接管理、消息订阅和错误处理。

回到顶部