Rust SSE流处理库sse-stream的使用,实现高效服务器推送事件(Server-Sent Events)数据流处理

Rust SSE流处理库sse-stream的使用,实现高效服务器推送事件(Server-Sent Events)数据流处理

解码示例

# use sse_stream::SseStream;
# use http_body_util::Full;
# use bytes::Bytes;
# use futures_util::StreamExt;
const SSE_BODY: &str =
r#"
retry: 1000
event: userconnect
data: {"username": "bobby", "time": "02:33:48"}

data: Here's a system message of some kind that will get used
data: to accomplish some task.
"#;

let body = Full::<Bytes>::from(SSE_BODY);
let mut sse_body = SseStream::new(body);
async {
    while let Some(sse) = sse_body.next().await {
        println!("{:?}", sse.unwrap());
    }
};

编码示例

# use std::convert::Infallible;
# use futures_util::StreamExt;
# use sse_stream::{Sse, SseBody};

let stream = futures_util::stream::iter([
    Sse::default().event("1").data("....."),
    Sse::default().event("2").data("....."),
    Sse::default().event("3").data("....."),
])
.map(Result::<Sse, Infallible>::Ok);
let body = SseBody::new(stream);

完整示例代码

服务器端实现

use std::convert::Infallible;
use futures_util::StreamExt;
use sse_stream::{Sse, SseBody};
use hyper::{Body, Response, Server};
use hyper::service::{make_service_fn, service_fn};

async fn handle_sse() -> Result<Response<Body>, Infallible> {
    // 创建SSE事件流
    let stream = futures_util::stream::iter([
        Sse::default().event("message").data("Hello from server!"),
        Sse::default().event("update").data("Server time: 2023-01-01"),
        Sse::default().event("close").data("Connection will close"),
    ])
    .map(Result::<Sse, Infallible>::Ok);
    
    // 创建SSE响应体
    let body = SseBody::new(stream);
    
    // 构建HTTP响应
    Ok(Response::builder()
        .header("Content-Type", "text/event-stream")
        .header("Cache-Control", "no-cache")
        .body(Body::wrap_stream(body))
        .unwrap())
}

#[tokio::main]
async fn main() {
    let make_svc = make_service_fn(|_conn| async {
        Ok::<_, Infallible>(service_fn(|_req| handle_sse()))
    });

    let addr = ([127, 0, 0, 1], 3000).into();
    let server = Server::bind(&addr).serve(make_svc);

    println!("SSE server running at http://{}", addr);
    
    if let Err(e) = server.await {
        eprintln!("server error: {}", e);
    }
}

客户端实现

use sse_stream::SseStream;
use http_body_util::Full;
use bytes::Bytes;
use futures_util::StreamExt;
use reqwest::Client;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = Client::new();
    let response = client.get("http://localhost:3000")
        .header("Accept", "text/event-stream")
        .send()
        .await?;
    
    let body = response.bytes_stream();
    let mut sse_stream = SseStream::new(body);
    
    while let Some(event) = sse_stream.next().await {
        match event {
            Ok(sse) => {
                println!("Received SSE event:");
                println!("  Event: {:?}", sse.event);
                println!("  Data: {:?}", sse.data);
                println!("  ID: {:?}", sse.id);
                println!("  Retry: {:?}", sse.retry);
            }
            Err(e) => eprintln!("Error receiving event: {}", e),
        }
    }
    
    Ok(())
}

安装

在项目目录中运行以下Cargo命令:

cargo add sse-stream

或者在Cargo.toml中添加以下行:

sse-stream = "0.2.1"

特性

  • 支持SSE协议的编码和解码
  • 与hyper等HTTP库良好集成
  • 异步流处理
  • 轻量级实现

许可证

MIT OR Apache-2.0


1 回复

Rust SSE流处理库sse-stream的使用指南

简介

sse-stream是一个用于处理服务器推送事件(Server-Sent Events, SSE)的Rust库,它提供了高效的方式来处理SSE数据流。SSE是一种允许服务器通过HTTP连接向客户端推送数据的协议,常用于实时更新、通知和事件驱动的Web应用。

主要特性

  • 轻量级且高效
  • 支持异步处理
  • 符合SSE规范
  • 易于集成到现有Rust项目中

安装

在Cargo.toml中添加依赖:

[dependencies]
sse-stream = "0.2"
tokio = { version = "1.0", features = ["full"] }

基本使用方法

1. 创建SSE流

use sse_stream::SseStream;
use tokio::io::AsyncWriteExt;

#[tokio::main]
async fn main() -> std::io::Result<()> {
    let mut stream = SseStream::new();
    
    // 添加事件
    stream.push("message", "Hello, world!", None);
    stream.push("update", "Data updated", Some("123"));
    
    // 写入到任意AsyncWrite实现
    let mut buffer = Vec::new();
    stream.write_all(&mut buffer).await?;
    
    println!("{:?}", String::from_utf8(buffer).unwrap());
    Ok(())
}

2. 从HTTP响应读取SSE流

use sse_stream::SseReader;
use tokio::io::AsyncReadExt;

#[tokio::main]
async fn main() -> std::io::Result<()> {
    // 假设我们从某个HTTP响应获取了SSE流
    let sse_data = b"event: message\ndata: Hello\n\n";
    let mut reader = SseReader::new(&sse_data[..]);
    
    while let Some(event) = reader.next().await? {
        println!("Event: {}, Data: {}", event.event, event.data);
    }
    
    Ok(())
}

高级用法

与HTTP服务器集成

use axum::{
    response::sse::{Event, Sse},
    routing::get,
    Router,
};
use sse_stream::SseStream;
use std::convert::Infallible;
use tokio_stream::wrappers::ReceiverStream;
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let app = Router::new().route("/sse", get(sse_handler));

    axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
        .serve(app.into_make_service())
        .await
        .unwrap();
}

async fn sse_handler() -> Sse<ReceiverStream<Result<Event, Infallible>>> {
    let (tx, rx) = mpsc::channel(10);
    
    tokio::spawn(async move {
        let mut stream = SseStream::new();
        
        for i in 0..5 {
            stream.push("message", &format!("Count: {}", i), None);
            let event = Event::default().data(format!("Count: {}", i));
            tx.send(Ok(event)).await.unwrap();
            tokio::time::sleep(std::time::Duration::from_secs(1)).await;
        }
    });
    
    Sse::new(ReceiverStream::new(rx))
}

自定义重连行为

use sse_stream::SseReader;
use std::time::Duration;

#[tokio::main]
async fn main() -> std::io::Result<()> {
    let mut retries = 0;
    let max_retries = 3;
    
    loop {
        match connect_and_process().await {
            Ok(_) => break,
            Err(e) if retries < max_retries => {
                retries += 1;
                println!("Error: {}. Retrying in 5 seconds...", e);
                tokio::time::sleep(Duration::from_secs(5)).await;
            }
            Err(e) => {
                println!("Failed after {} retries: {}", max_retries, e);
                break;
            }
        }
    }
    
    Ok(())
}

async fn connect_and_process() -> std::io::Result<()> {
    // 模拟从网络获取SSE流
    let sse_data = b"event: status\ndata: connected\n\n";
    let mut reader = SseReader::new(&sse_data[..]);
    
    while let Some(event) = reader.next().await? {
        println!("Received: {} - {}", event.event, event.data);
    }
    
    Ok(())
}

完整示例

下面是一个完整的SSE服务器和客户端交互示例:

服务器端代码

use axum::{
    response::sse::{Event, Sse},
    routing::get,
    Router,
};
use std::convert::Infallible;
use tokio_stream::{wrappers::ReceiverStream, StreamExt as _};
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    // 创建路由
    let app = Router::new().route("/events", get(sse_handler));

    // 启动服务器
    axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
        .serve(app.into_make_service())
        .await
        .unwrap();
}

async fn sse_handler() -> Sse<ReceiverStream<Result<Event, Infallible>>> {
    // 创建通道用于发送事件
    let (tx, rx) = mpsc::channel(10);
    
    // 在后台任务中生成事件
    tokio::spawn(async move {
        let mut count = 0;
        
        loop {
            // 每1秒发送一个事件
            tokio::time::sleep(std::time::Duration::from_secs(1)).await;
            
            // 创建事件
            let event = Event::default()
                .event("update")
                .data(format!("Event {}", count))
                .id(count.to_string());
            
            // 发送事件
            if tx.send(Ok(event)).await.is_err() {
                break;
            }
            
            count += 1;
        }
    });
    
    // 将接收器转换为SSE流
    Sse::new(ReceiverStream::new(rx))
}

客户端代码

use sse_stream::SseReader;
use reqwest::Client;
use tokio::io::AsyncReadExt;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 创建HTTP客户端
    let client = Client::new();
    
    // 发送GET请求获取SSE流
    let response = client
        .get("http://localhost:3000/events")
        .header("Accept", "text/event-stream")
        .send()
        .await?;
    
    // 检查响应状态
    if !response.status().is_success() {
        eprintln!("Server returned error: {}", response.status());
        return Ok(());
    }
    
    // 创建SSE读取器
    let mut reader = SseReader::new(response.bytes_stream());
    
    // 处理事件
    while let Some(event) = reader.next().await? {
        println!(
            "收到事件 - 类型: {}, 数据: {}, ID: {}",
            event.event,
            event.data,
            event.id.unwrap_or_default()
        );
    }
    
    Ok(())
}

注意事项

  1. 确保正确处理连接断开和重连逻辑
  2. 对于生产环境,考虑添加心跳机制保持连接活跃
  3. 注意处理跨域问题(CORS)如果前端与API不在同一域名下
  4. 考虑使用适当的背压机制处理高频率事件

sse-stream库为Rust开发者提供了处理SSE协议的简洁高效方式,无论是作为客户端接收事件还是服务器端推送事件,都能很好地满足需求。

回到顶部