Rust WebSocket流处理库ws_stream_tungstenite的使用,高效实现Tungstenite协议的WebSocket通信与数据流处理

Rust WebSocket流处理库ws_stream_tungstenite的使用,高效实现Tungstenite协议的WebSocket通信与数据流处理

概述

ws_stream_tungstenite是一个Rust库,提供在async-tungstenite WebSocket上的AsyncRead/AsyncWrite/AsyncBufRead功能。它主要支持在非WASM目标(如服务器端)上工作,通过框架编解码器处理字节流。

安装

在Cargo.toml中添加依赖:

[dependencies]
ws_stream_tungstenite = "0.15"

特性

  • tokio_io特性:启用tokio版本的AsyncRead和AsyncWrite特性实现

基本使用示例

use
{
   ws_stream_tungstenite :: { *                  } ,
   futures               :: { StreamExt          } ,
   tracing               :: { *                  } ,
   async_tungstenite     :: { accept_async       } ,
   asynchronous_codec    :: { LinesCodec, Framed } ,
   async_std             :: { net::TcpListener   } ,
};

#[async_std::main]
async fn main() -> Result<(), std::io::Error>
{
   let     socket      = TcpListener::bind( "127.0.0.1:3012" ).await?;
   let mut connections = socket.incoming();

   let tcp = connections.next().await.expect( "1 connection" ).expect( "tcp connect" );
   let s   = accept_async( tcp ).await.expect( "ws handshake" );
   let ws  = WsStream::new( s );

   // ws here is observable with pharos to detect non fatal errors and ping/close events, which cannot
   // be represented in the AsyncRead/Write API. See the events example in the repository.

   let (_sink, mut stream) = Framed::new( ws, LinesCodec {} ).split();


   while let Some( msg ) = stream.next().await
   {
      let msg = match msg
      {
         Err(e) =>
         {
            error!( "Error on server stream: {:?}", e );

            // Errors returned directly through the AsyncRead/Write API are fatal, generally an error on the underlying
            // transport.
            //
            continue;
         }

         Ok(m) => m,
      };


      info!( "server received: {}", msg.trim() );

      // ... do something useful
   }

   // safe to drop the TCP connection

   Ok(())
}

完整WebSocket服务器和客户端示例

WebSocket服务器

use async_std::net::TcpListener;
use async_tungstenite::accept_async;
use futures::StreamExt;
use ws_stream_tungstenite::*;
use asynchronous_codec::{LinesCodec, Framed};
use tracing::{info, error};

#[async_std::main]
async fn main() -> Result<(), std::io::Error> {
    // 绑定到本地3012端口
    let listener = TcpListener::bind("127.0.0.1:3012").await?;
    info!("WebSocket server listening on ws://127.0.0.1:3012");
    
    // 接受连接
    while let Ok((stream, _)) = listener.accept().await {
        // 异步处理每个连接
        async_std::task::spawn(handle_connection(stream));
    }
    
    Ok(())
}

async fn handle_connection(stream: async_std::net::TcpStream) {
    // 执行WebSocket握手
    let ws_stream = match accept_async(stream).await {
        Ok(ws) => ws,
        Err(e) => {
            error!("Failed to accept WebSocket connection: {}", e);
            return;
        }
    };
    
    // 创建WsStream
    let ws = WsStream::new(ws_stream);
    
    // 使用LinesCodec进行帧处理
    let (mut sink, mut stream) = Framed::new(ws, LinesCodec {}).split();
    
    // 处理收到的消息
    while let Some(msg) = stream.next().await {
        match msg {
            Ok(msg) => {
                info!("Received: {}", msg);
                // 回显消息
                if let Err(e) = sink.send(msg).await {
                    error!("Failed to send message: {}", e);
                    break;
                }
            }
            Err(e) => {
                error!("Error in WebSocket stream: {}", e);
                break;
            }
        }
    }
}

WebSocket客户端

use async_tungstenite::connect_async;
use ws_stream_tungstenite::*;
use futures::{SinkExt, StreamExt};
use asynchronous_codec::{LinesCodec, Framed};
use tracing::{info, error};
use std::time::Duration;
use futures::future::join;

#[async_std::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 连接到WebSocket服务器
    let (ws_stream, _) = connect_async("ws://127.0.0.1:3012").await?;
    info!("Connected to WebSocket server");
    
    // 创建WsStream
    let ws = WsStream::new(ws_stream);
    
    // 使用LinesCodec进行帧处理
    let (mut sink, mut stream) = Framed::new(ws, LinesCodec {}).split();
    
    // 发送消息的任务
    let sender = async {
        for i in 1..=5 {
            let msg = format!("Hello, WebSocket! Message {}", i);
            if let Err(e) = sink.send(msg).await {
                error!("Failed to send message: {}", e);
                break;
            }
            async_std::task::sleep(Duration::from_secs(1)).await;
        }
        
        // 关闭连接
        sink.close().await?;
        Ok::<(), Box<dyn std::error::Error>>(())
    };
    
    // 接收消息的任务
    let receiver = async {
        while let Some(msg) = stream.next().await {
            match msg {
                Ok(msg) => info!("Received: {}", msg),
                Err(e) => {
                    error!("Error in WebSocket stream: {}", e);
                    break;
                }
            }
        }
        Ok::<(), Box<dyn std::error::Error>>(())
    };
    
    // 同时运行发送和接收
    join(sender, receiver).await.0?;
    
    Ok(())
}

如何关闭连接

WebSocket RFC规定了关闭握手:

  1. 当端点想要关闭连接时,它发送一个关闭帧,之后不再发送数据
  2. 远端发送关闭帧的确认
  3. 端点既发送又接收了关闭帧后,连接被视为已关闭

使用ws_stream_tungstenite正确关闭连接:

  • 如果是远端发起关闭,只需轮询流直到返回None
  • 如果想发起关闭,在sink上调用close(),然后继续轮询流直到返回None

错误处理

ws_stream_tungstenite只接受二进制消息,如果收到WebSocket文本消息,将被视为协议错误。错误通过pharos返回,应该观察WsStream并至少记录报告的任何错误。

限制

  • 没有API发送Ping消息
  • 收到的文本消息被视为错误

最佳实践

  1. 总是处理WebSocket错误和关闭事件
  2. 为编解码器设置最大消息大小以防止DOS攻击
  3. 使用适当的超时来处理卡住的连接
  4. 考虑使用tokio_io特性以获得更好的tokio集成

1 回复

Rust WebSocket流处理库ws_stream_tungstenite的使用指南

ws_stream_tungstenite是一个基于tungstenite的Rust库,提供了更高级的WebSocket流处理功能,特别适合需要高效处理WebSocket通信和数据流的场景。

主要特性

  • 基于成熟的tungstenite WebSocket实现
  • 提供流式接口处理WebSocket消息
  • 支持异步/等待(async/await)模式
  • 简化了WebSocket通信的复杂性
  • 良好的错误处理和连接管理

安装

在Cargo.toml中添加依赖:

[dependencies]
ws_stream_tungstenite = "0.5"
tokio = { version = "1.0", features = ["full"] }

基本使用方法

客户端连接示例

use ws_stream_tungstenite::*;
use futures::prelude::*;
use tokio::net::TcpStream;

#[tokio::main]
async fn main() {
    let url = "ws://echo.websocket.org";
    
    // 建立TCP连接
    let tcp = TcpStream::connect("echo.websocket.org:80").await.unwrap();
    
    // 创建WebSocket连接
    let (ws_stream, _) = tokio_tungstenite::connect_async(url)
        .await
        .unwrap();
    
    // 转换为WsStream
    let mut ws_stream = WsStream::new(ws_stream);
    
    // 发送消息
    ws_stream.send("Hello, WebSocket!".into()).await.unwrap();
    
    // 接收消息
    while let Some(msg) = ws_stream.next().await {
        match msg {
            Ok(msg) => println!("Received: {:?}", msg),
            Err(e) => eprintln!("Error: {}", e),
        }
    }
}

服务器端示例

use ws_stream_tungstenite::*;
use futures::prelude::*;
use tokio::net::{TcpListener, TcpStream};

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
    
    while let Ok((tcp_stream, _)) = listener.accept().await {
        tokio::spawn(handle_connection(tcp_stream));
    }
}

async fn handle_connection(tcp_stream: TcpStream) {
    // 升级为WebSocket连接
    let ws_stream = tokio_tungstenite::accept_async(tcp_stream)
        .await
        .unwrap();
    
    let mut ws_stream = WsStream::new(ws_stream);
    
    // 处理消息
    while let Some(msg) = ws_stream.next().await {
        match msg {
            Ok(msg) => {
                println!("Received: {:?}", msg);
                // 回显消息
                ws_stream.send(msg).await.unwrap();
            }
            Err(e) => {
                eprintln!("Error: {}", e);
                break;
            }
        }
    }
}

高级用法

处理二进制数据

#[tokio::main]
async fn main() {
    let (mut ws_stream, _) = WsStream::connect("ws://example.com/ws")
        .await
        .unwrap();
    
    // 发送二进制数据
    let binary_data = vec![0x01, 0x02, 0x03, 0x04];
    ws_stream.send(binary_data.into()).await.unwrap();
    
    // 接收二进制数据
    while let Some(msg) = ws_stream.next().await {
        if let Ok(Message::Binary(data)) = msg {
            println!("Received binary data: {:?}", data);
        }
    }
}

使用Sink特性批量发送消息

use futures::SinkExt;

#[tokio::main]
async fn main() {
    let (mut ws_stream, _) = WsStream::connect("ws://example.com/ws")
        .await
        .unwrap();
    
    let messages = vec![
        Message::text("Message 1"),
        Message::text("Message 2"),
        Message::text("Message 3"),
    ];
    
    let mut sink = ws_stream.sink();
    sink.send_all(&mut futures::stream::iter(messages))
        .await
        .unwrap();
}

错误处理

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let (mut ws_stream, _) = WsStream::connect("ws://example.com/ws")
        .await?;
    
    ws_stream.send("Ping".into()).await?;
    
    match ws_stream.next().await {
        Some(Ok(msg)) => println!("Received: {:?}", msg),
        Some extreme(Err(e)) => eprintln!("WebSocket error: {}", e),
        None => println!("Connection closed"),
    }
    
    Ok(())
}

性能提示

  1. 对于高频消息场景,考虑使用Message::Binary而不是Message::Text,因为二进制数据通常处理效率更高
  2. 批量处理消息时使用Sink特性可以提高性能
  3. 合理设置TCP缓冲区大小可以改善吞吐量

完整示例demo

下面是一个完整的客户端和服务器端交互的WebSocket示例:

服务器端完整代码

use ws_stream_tungstenite::*;
use futures::prelude::*;
use tokio::net::{TcpListener, TcpStream};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 绑定到本地8080端口
    let listener = TcpListener::bind("127.0.0.1:8080").await?;
    println!("WebSocket服务器启动,监听端口8080...");

    while let Ok((tcp_stream, _)) = listener.accept().await {
        tokio::spawn(async move {
            if let Err(e) = handle_connection(tcp_stream).await {
                eprintln!("连接处理错误: {}", e);
            }
        });
    }
    
    Ok(())
}

async fn handle_connection(tcp_stream: TcpStream) -> Result<(), Box<dyn std::error::Error>> {
    // 升级为WebSocket连接
    let ws_stream = tokio_tungstenite::accept_async(tcp_stream).await?;
    let mut ws_stream = WsStream::new(ws_stream);
    
    println!("新客户端连接");

    // 处理消息循环
    while let Some(msg) = ws_stream.next().await {
        match msg {
            Ok(msg) => {
                println!("收到消息: {:?}", msg);
                // 回显消息给客户端
                ws_stream.send(msg).await?;
            }
            Err(e) => {
                eprintln!("错误: {}", e);
                break;
            }
        }
    }
    
    println!("客户端断开连接");
    Ok(())
}

客户端完整代码

use ws_stream_tungstenite::*;
use futures::prelude::*;
use tokio::net::TcpStream;
use futures::SinkExt;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 连接WebSocket服务器
    let url = "ws://127.0.0.1:8080";
    let tcp = TcpStream::connect("127.0.0.1:8080").await?;
    
    // 创建WebSocket连接
    let (ws_stream, _) = tokio_tungstenite::connect_async(url).await?;
    let mut ws_stream = WsStream::new(ws_stream);
    
    println!("成功连接到服务器");

    // 创建Sink用于批量发送
    let mut sink = ws_stream.sink();
    
    // 批量发送文本消息
    let text_messages = vec![
        Message::text("第一条消息"),
        Message::text("第二条消息"),
        Message::text("第三条消息"),
    ];
    sink.send_all(&mut futures::stream::iter(text_messages)).await?;

    // 发送二进制消息
    let binary_data = vec![0x01, 0x02, 0x03, 0x04];
    ws_stream.send(binary_data.into()).await?;

    // 接收服务器响应
    while let Some(msg) = ws_stream.next().await {
        match msg {
            Ok(msg) => println!("收到服务器响应: {:?}", msg),
            Err(e) => {
                eprintln!("错误: {}", e);
                break;
            }
        }
    }
    
    Ok(())
}

这个完整示例展示了:

  1. 服务器端启动WebSocket服务并回显接收到的消息
  2. 客户端连接服务器并发送文本和二进制消息
  3. 使用Sink特性批量发送消息
  4. 完整的错误处理机制

您可以将服务器端和客户端代码分别保存为两个文件,先运行服务器端,再运行客户端进行测试。

回到顶部