Rust SSE流处理库sse-stream的使用,实现高效服务器推送事件(Server-Sent Events)数据流处理
Rust SSE流处理库sse-stream的使用,实现高效服务器推送事件(Server-Sent Events)数据流处理
解码示例
# use sse_stream::SseStream;
# use http_body_util::Full;
# use bytes::Bytes;
# use futures_util::StreamExt;
const SSE_BODY: &str =
r#"
retry: 1000
event: userconnect
data: {"username": "bobby", "time": "02:33:48"}
data: Here's a system message of some kind that will get used
data: to accomplish some task.
"#;
let body = Full::<Bytes>::from(SSE_BODY);
let mut sse_body = SseStream::new(body);
async {
while let Some(sse) = sse_body.next().await {
println!("{:?}", sse.unwrap());
}
};
编码示例
# use std::convert::Infallible;
# use futures_util::StreamExt;
# use sse_stream::{Sse, SseBody};
let stream = futures_util::stream::iter([
Sse::default().event("1").data("....."),
Sse::default().event("2").data("....."),
Sse::default().event("3").data("....."),
])
.map(Result::<Sse, Infallible>::Ok);
let body = SseBody::new(stream);
完整示例代码
服务器端实现
use std::convert::Infallible;
use futures_util::StreamExt;
use sse_stream::{Sse, SseBody};
use hyper::{Body, Response, Server};
use hyper::service::{make_service_fn, service_fn};
async fn handle_sse() -> Result<Response<Body>, Infallible> {
// 创建SSE事件流
let stream = futures_util::stream::iter([
Sse::default().event("message").data("Hello from server!"),
Sse::default().event("update").data("Server time: 2023-01-01"),
Sse::default().event("close").data("Connection will close"),
])
.map(Result::<Sse, Infallible>::Ok);
// 创建SSE响应体
let body = SseBody::new(stream);
// 构建HTTP响应
Ok(Response::builder()
.header("Content-Type", "text/event-stream")
.header("Cache-Control", "no-cache")
.body(Body::wrap_stream(body))
.unwrap())
}
#[tokio::main]
async fn main() {
let make_svc = make_service_fn(|_conn| async {
Ok::<_, Infallible>(service_fn(|_req| handle_sse()))
});
let addr = ([127, 0, 0, 1], 3000).into();
let server = Server::bind(&addr).serve(make_svc);
println!("SSE server running at http://{}", addr);
if let Err(e) = server.await {
eprintln!("server error: {}", e);
}
}
客户端实现
use sse_stream::SseStream;
use http_body_util::Full;
use bytes::Bytes;
use futures_util::StreamExt;
use reqwest::Client;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::new();
let response = client.get("http://localhost:3000")
.header("Accept", "text/event-stream")
.send()
.await?;
let body = response.bytes_stream();
let mut sse_stream = SseStream::new(body);
while let Some(event) = sse_stream.next().await {
match event {
Ok(sse) => {
println!("Received SSE event:");
println!(" Event: {:?}", sse.event);
println!(" Data: {:?}", sse.data);
println!(" ID: {:?}", sse.id);
println!(" Retry: {:?}", sse.retry);
}
Err(e) => eprintln!("Error receiving event: {}", e),
}
}
Ok(())
}
安装
在项目目录中运行以下Cargo命令:
cargo add sse-stream
或者在Cargo.toml中添加以下行:
sse-stream = "0.2.1"
特性
- 支持SSE协议的编码和解码
- 与hyper等HTTP库良好集成
- 异步流处理
- 轻量级实现
许可证
MIT OR Apache-2.0
1 回复