Rust WebSocket流处理库ws_stream_tungstenite的使用,高效实现Tungstenite协议的WebSocket通信与数据流处理
Rust WebSocket流处理库ws_stream_tungstenite的使用,高效实现Tungstenite协议的WebSocket通信与数据流处理
概述
ws_stream_tungstenite是一个Rust库,提供在async-tungstenite WebSocket上的AsyncRead/AsyncWrite/AsyncBufRead功能。它主要支持在非WASM目标(如服务器端)上工作,通过框架编解码器处理字节流。
安装
在Cargo.toml中添加依赖:
[dependencies]
ws_stream_tungstenite = "0.15"
特性
tokio_io
特性:启用tokio版本的AsyncRead和AsyncWrite特性实现
基本使用示例
use
{
ws_stream_tungstenite :: { * } ,
futures :: { StreamExt } ,
tracing :: { * } ,
async_tungstenite :: { accept_async } ,
asynchronous_codec :: { LinesCodec, Framed } ,
async_std :: { net::TcpListener } ,
};
#[async_std::main]
async fn main() -> Result<(), std::io::Error>
{
let socket = TcpListener::bind( "127.0.0.1:3012" ).await?;
let mut connections = socket.incoming();
let tcp = connections.next().await.expect( "1 connection" ).expect( "tcp connect" );
let s = accept_async( tcp ).await.expect( "ws handshake" );
let ws = WsStream::new( s );
// ws here is observable with pharos to detect non fatal errors and ping/close events, which cannot
// be represented in the AsyncRead/Write API. See the events example in the repository.
let (_sink, mut stream) = Framed::new( ws, LinesCodec {} ).split();
while let Some( msg ) = stream.next().await
{
let msg = match msg
{
Err(e) =>
{
error!( "Error on server stream: {:?}", e );
// Errors returned directly through the AsyncRead/Write API are fatal, generally an error on the underlying
// transport.
//
continue;
}
Ok(m) => m,
};
info!( "server received: {}", msg.trim() );
// ... do something useful
}
// safe to drop the TCP connection
Ok(())
}
完整WebSocket服务器和客户端示例
WebSocket服务器
use async_std::net::TcpListener;
use async_tungstenite::accept_async;
use futures::StreamExt;
use ws_stream_tungstenite::*;
use asynchronous_codec::{LinesCodec, Framed};
use tracing::{info, error};
#[async_std::main]
async fn main() -> Result<(), std::io::Error> {
// 绑定到本地3012端口
let listener = TcpListener::bind("127.0.0.1:3012").await?;
info!("WebSocket server listening on ws://127.0.0.1:3012");
// 接受连接
while let Ok((stream, _)) = listener.accept().await {
// 异步处理每个连接
async_std::task::spawn(handle_connection(stream));
}
Ok(())
}
async fn handle_connection(stream: async_std::net::TcpStream) {
// 执行WebSocket握手
let ws_stream = match accept_async(stream).await {
Ok(ws) => ws,
Err(e) => {
error!("Failed to accept WebSocket connection: {}", e);
return;
}
};
// 创建WsStream
let ws = WsStream::new(ws_stream);
// 使用LinesCodec进行帧处理
let (mut sink, mut stream) = Framed::new(ws, LinesCodec {}).split();
// 处理收到的消息
while let Some(msg) = stream.next().await {
match msg {
Ok(msg) => {
info!("Received: {}", msg);
// 回显消息
if let Err(e) = sink.send(msg).await {
error!("Failed to send message: {}", e);
break;
}
}
Err(e) => {
error!("Error in WebSocket stream: {}", e);
break;
}
}
}
}
WebSocket客户端
use async_tungstenite::connect_async;
use ws_stream_tungstenite::*;
use futures::{SinkExt, StreamExt};
use asynchronous_codec::{LinesCodec, Framed};
use tracing::{info, error};
use std::time::Duration;
use futures::future::join;
#[async_std::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 连接到WebSocket服务器
let (ws_stream, _) = connect_async("ws://127.0.0.1:3012").await?;
info!("Connected to WebSocket server");
// 创建WsStream
let ws = WsStream::new(ws_stream);
// 使用LinesCodec进行帧处理
let (mut sink, mut stream) = Framed::new(ws, LinesCodec {}).split();
// 发送消息的任务
let sender = async {
for i in 1..=5 {
let msg = format!("Hello, WebSocket! Message {}", i);
if let Err(e) = sink.send(msg).await {
error!("Failed to send message: {}", e);
break;
}
async_std::task::sleep(Duration::from_secs(1)).await;
}
// 关闭连接
sink.close().await?;
Ok::<(), Box<dyn std::error::Error>>(())
};
// 接收消息的任务
let receiver = async {
while let Some(msg) = stream.next().await {
match msg {
Ok(msg) => info!("Received: {}", msg),
Err(e) => {
error!("Error in WebSocket stream: {}", e);
break;
}
}
}
Ok::<(), Box<dyn std::error::Error>>(())
};
// 同时运行发送和接收
join(sender, receiver).await.0?;
Ok(())
}
如何关闭连接
WebSocket RFC规定了关闭握手:
- 当端点想要关闭连接时,它发送一个关闭帧,之后不再发送数据
- 远端发送关闭帧的确认
- 端点既发送又接收了关闭帧后,连接被视为已关闭
使用ws_stream_tungstenite正确关闭连接:
- 如果是远端发起关闭,只需轮询流直到返回None
- 如果想发起关闭,在sink上调用close(),然后继续轮询流直到返回None
错误处理
ws_stream_tungstenite只接受二进制消息,如果收到WebSocket文本消息,将被视为协议错误。错误通过pharos返回,应该观察WsStream并至少记录报告的任何错误。
限制
- 没有API发送Ping消息
- 收到的文本消息被视为错误
最佳实践
- 总是处理WebSocket错误和关闭事件
- 为编解码器设置最大消息大小以防止DOS攻击
- 使用适当的超时来处理卡住的连接
- 考虑使用tokio_io特性以获得更好的tokio集成
1 回复
Rust WebSocket流处理库ws_stream_tungstenite的使用指南
ws_stream_tungstenite
是一个基于tungstenite
的Rust库,提供了更高级的WebSocket流处理功能,特别适合需要高效处理WebSocket通信和数据流的场景。
主要特性
- 基于成熟的
tungstenite
WebSocket实现 - 提供流式接口处理WebSocket消息
- 支持异步/等待(async/await)模式
- 简化了WebSocket通信的复杂性
- 良好的错误处理和连接管理
安装
在Cargo.toml中添加依赖:
[dependencies]
ws_stream_tungstenite = "0.5"
tokio = { version = "1.0", features = ["full"] }
基本使用方法
客户端连接示例
use ws_stream_tungstenite::*;
use futures::prelude::*;
use tokio::net::TcpStream;
#[tokio::main]
async fn main() {
let url = "ws://echo.websocket.org";
// 建立TCP连接
let tcp = TcpStream::connect("echo.websocket.org:80").await.unwrap();
// 创建WebSocket连接
let (ws_stream, _) = tokio_tungstenite::connect_async(url)
.await
.unwrap();
// 转换为WsStream
let mut ws_stream = WsStream::new(ws_stream);
// 发送消息
ws_stream.send("Hello, WebSocket!".into()).await.unwrap();
// 接收消息
while let Some(msg) = ws_stream.next().await {
match msg {
Ok(msg) => println!("Received: {:?}", msg),
Err(e) => eprintln!("Error: {}", e),
}
}
}
服务器端示例
use ws_stream_tungstenite::*;
use futures::prelude::*;
use tokio::net::{TcpListener, TcpStream};
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
while let Ok((tcp_stream, _)) = listener.accept().await {
tokio::spawn(handle_connection(tcp_stream));
}
}
async fn handle_connection(tcp_stream: TcpStream) {
// 升级为WebSocket连接
let ws_stream = tokio_tungstenite::accept_async(tcp_stream)
.await
.unwrap();
let mut ws_stream = WsStream::new(ws_stream);
// 处理消息
while let Some(msg) = ws_stream.next().await {
match msg {
Ok(msg) => {
println!("Received: {:?}", msg);
// 回显消息
ws_stream.send(msg).await.unwrap();
}
Err(e) => {
eprintln!("Error: {}", e);
break;
}
}
}
}
高级用法
处理二进制数据
#[tokio::main]
async fn main() {
let (mut ws_stream, _) = WsStream::connect("ws://example.com/ws")
.await
.unwrap();
// 发送二进制数据
let binary_data = vec![0x01, 0x02, 0x03, 0x04];
ws_stream.send(binary_data.into()).await.unwrap();
// 接收二进制数据
while let Some(msg) = ws_stream.next().await {
if let Ok(Message::Binary(data)) = msg {
println!("Received binary data: {:?}", data);
}
}
}
使用Sink特性批量发送消息
use futures::SinkExt;
#[tokio::main]
async fn main() {
let (mut ws_stream, _) = WsStream::connect("ws://example.com/ws")
.await
.unwrap();
let messages = vec![
Message::text("Message 1"),
Message::text("Message 2"),
Message::text("Message 3"),
];
let mut sink = ws_stream.sink();
sink.send_all(&mut futures::stream::iter(messages))
.await
.unwrap();
}
错误处理
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (mut ws_stream, _) = WsStream::connect("ws://example.com/ws")
.await?;
ws_stream.send("Ping".into()).await?;
match ws_stream.next().await {
Some(Ok(msg)) => println!("Received: {:?}", msg),
Some extreme(Err(e)) => eprintln!("WebSocket error: {}", e),
None => println!("Connection closed"),
}
Ok(())
}
性能提示
- 对于高频消息场景,考虑使用
Message::Binary
而不是Message::Text
,因为二进制数据通常处理效率更高 - 批量处理消息时使用
Sink
特性可以提高性能 - 合理设置TCP缓冲区大小可以改善吞吐量
完整示例demo
下面是一个完整的客户端和服务器端交互的WebSocket示例:
服务器端完整代码
use ws_stream_tungstenite::*;
use futures::prelude::*;
use tokio::net::{TcpListener, TcpStream};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 绑定到本地8080端口
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("WebSocket服务器启动,监听端口8080...");
while let Ok((tcp_stream, _)) = listener.accept().await {
tokio::spawn(async move {
if let Err(e) = handle_connection(tcp_stream).await {
eprintln!("连接处理错误: {}", e);
}
});
}
Ok(())
}
async fn handle_connection(tcp_stream: TcpStream) -> Result<(), Box<dyn std::error::Error>> {
// 升级为WebSocket连接
let ws_stream = tokio_tungstenite::accept_async(tcp_stream).await?;
let mut ws_stream = WsStream::new(ws_stream);
println!("新客户端连接");
// 处理消息循环
while let Some(msg) = ws_stream.next().await {
match msg {
Ok(msg) => {
println!("收到消息: {:?}", msg);
// 回显消息给客户端
ws_stream.send(msg).await?;
}
Err(e) => {
eprintln!("错误: {}", e);
break;
}
}
}
println!("客户端断开连接");
Ok(())
}
客户端完整代码
use ws_stream_tungstenite::*;
use futures::prelude::*;
use tokio::net::TcpStream;
use futures::SinkExt;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 连接WebSocket服务器
let url = "ws://127.0.0.1:8080";
let tcp = TcpStream::connect("127.0.0.1:8080").await?;
// 创建WebSocket连接
let (ws_stream, _) = tokio_tungstenite::connect_async(url).await?;
let mut ws_stream = WsStream::new(ws_stream);
println!("成功连接到服务器");
// 创建Sink用于批量发送
let mut sink = ws_stream.sink();
// 批量发送文本消息
let text_messages = vec![
Message::text("第一条消息"),
Message::text("第二条消息"),
Message::text("第三条消息"),
];
sink.send_all(&mut futures::stream::iter(text_messages)).await?;
// 发送二进制消息
let binary_data = vec![0x01, 0x02, 0x03, 0x04];
ws_stream.send(binary_data.into()).await?;
// 接收服务器响应
while let Some(msg) = ws_stream.next().await {
match msg {
Ok(msg) => println!("收到服务器响应: {:?}", msg),
Err(e) => {
eprintln!("错误: {}", e);
break;
}
}
}
Ok(())
}
这个完整示例展示了:
- 服务器端启动WebSocket服务并回显接收到的消息
- 客户端连接服务器并发送文本和二进制消息
- 使用Sink特性批量发送消息
- 完整的错误处理机制
您可以将服务器端和客户端代码分别保存为两个文件,先运行服务器端,再运行客户端进行测试。