Rust异步运行时库tokio-core的使用,tokio-core为Rust提供高效、可靠的异步I/O和事件驱动编程支持
Rust异步运行时库tokio-core的使用
弃用通知
这个crate已被弃用,建议使用Tokio替代。
tokio-core介绍
tokio-core是Rust中基于futures
和mio
构建的异步I/O和事件循环抽象的核心库。
使用方法
首先,在您的Cargo.toml
中添加以下依赖:
[dependencies]
tokio-core = "0.1"
接下来,在您的crate中添加以下引用:
extern crate tokio_core;
完整示例代码
以下是一个使用tokio-core创建简单TCP服务器的完整示例:
extern crate tokio_core;
extern crate futures;
use tokio_core::reactor::Core;
use tokio_core::net::TcpListener;
use futures::Stream;
fn main() {
// 创建事件循环
let mut core = Core::new().unwrap();
let handle = core.handle();
// 绑定TCP监听端口
let listener = TcpListener::bind(&"127.0.0.1:8080".parse().unwrap(), &handle).unwrap();
println!("Server running on 127.0.0.1:8080");
// 处理传入连接
let server = listener.incoming().for_each(|(socket, _)| {
// 这里处理每个连接
println!("New connection from: {}", socket.peer_addr().unwrap());
// 关闭连接
drop(socket);
Ok(())
});
// 运行服务器
core.run(server).unwrap();
}
另一个示例:简单的TCP客户端
extern crate tokio_core;
extern crate futures;
use tokio_core::reactor::Core;
use tokio_core::net::TcpStream;
use futures::Future;
fn main() {
// 创建事件循环
let mut core = Core::new().unwrap();
let handle = core.handle();
// 连接服务器
let addr = "127.0.0.1:8080".parse().unwrap();
let tcp = TcpStream::connect(&addr, &handle);
// 处理连接
let client = tcp.and_then(|socket| {
println!("Connected to server!");
// 关闭连接
drop(socket);
Ok(())
});
// 运行客户端
core.run(client).unwrap();
}
完整示例demo
以下是一个增强版的TCP服务器示例,增加了简单的消息处理功能:
extern crate tokio_core;
extern crate futures;
use tokio_core::reactor::Core;
use tokio_core::net::{TcpListener, TcpStream};
use futures::{Future, Stream};
use std::io::{self, Read, Write};
fn main() {
// 创建事件循环
let mut core = Core::new().unwrap();
let handle = core.handle();
// 绑定TCP监听端口
let listener = TcpListener::bind(&"127.0.0.1:8080".parse().unwrap(), &handle).unwrap();
println!("Server running on 127.0.0.1:8080");
// 处理传入连接
let server = listener.incoming().for_each(|(socket, _)| {
// 获取客户端地址
let addr = socket.peer_addr().unwrap();
println!("New connection from: {}", addr);
// 创建读写缓冲区
let (reader, writer) = socket.split();
// 读取客户端数据
let read_future = tokio_core::io::read(reader, vec![0; 1024])
.and_then(move |(reader, buf, n)| {
if n == 0 {
println!("Connection closed by: {}", addr);
return Ok(());
}
let received = String::from_utf8_lossy(&buf[..n]);
println!("Received from {}: {}", addr, received);
// 回显消息
tokio_core::io::write_all(writer, buf)
.map(|_| ())
.map_err(|e| e.into())
});
// 处理错误
handle.spawn(read_future.map_err(|e| {
println!("Error with connection from {}: {}", addr, e);
}));
Ok(())
});
// 运行服务器
core.run(server).unwrap();
}
许可证
本项目采用以下任一许可证:
- Apache License, Version 2.0
- MIT license
贡献
除非您明确声明,否则根据Apache-2.0许可证提交的任何贡献都将按上述双重许可,不附加任何额外条款或条件。
1 回复
Rust异步运行时库tokio-core的使用指南
tokio-core简介
tokio-core是Rust生态中一个轻量级的异步I/O库,为开发者提供了高效、可靠的事件驱动编程支持。它是tokio生态系统的基础组件,专注于提供核心的异步I/O功能。
tokio-core主要特点:
- 基于mio库构建,提供跨平台的事件通知系统
- 实现了Future和Stream抽象
- 提供TCP/UDP网络支持
- 轻量级设计,适合构建小型异步应用
基本使用方法
添加依赖
首先在Cargo.toml中添加依赖:
[dependencies]
tokio-core = "0.1"
futures = "0.1"
创建事件循环
extern crate tokio_core;
extern crate futures;
use tokio_core::reactor::Core;
use futures::Future;
fn main() {
// 创建事件循环
let mut core = Core::new().unwrap();
let handle = core.handle();
// 创建一个立即完成的Future
let future = futures::future::ok::<_, ()>(42);
// 运行Future到完成
let result = core.run(future).unwrap();
println!("Result: {}", result);
}
TCP服务器示例
use tokio_core::net::TcpListener;
use tokio_core::reactor::Core;
use futures::Stream;
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
// 绑定到本地地址
let addr = "127.0.0.1:8080".parse().unwrap();
let listener = TcpListener::bind(&addr, &handle).unwrap();
println!("Listening on {}", addr);
// 接受连接并处理
let server = listener.incoming().for_each(|(socket, addr)| {
println!("New connection from {}", addr);
// 这里可以处理socket
// 例如: tokio_core::io::read_to_end(socket, vec![])
Ok(())
});
// 运行服务器
core.run(server).unwrap();
}
TCP客户端示例
use tokio_core::net::TcpStream;
use tokio_core::reactor::Core;
use futures::Future;
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let addr = "127.0.0.1:8080".parse().unwrap();
// 连接服务器
let client = TcpStream::connect(&addr, &handle)
.and_then(|socket| {
println!("Connected to server!");
// 这里可以写入或读取数据
// 例如: tokio_core::io::write_all(socket, b"hello world")
Ok(())
});
// 运行客户端
core.run(client).unwrap();
}
高级功能
超时处理
use tokio_core::reactor::Core;
use futures::Future;
use std::time::Duration;
use tokio_core::reactor::Timeout;
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
// 创建2秒超时
let timeout = Timeout::new(Duration::from_secs(2), &handle).unwrap();
// 等待超时
let result = core.run(timeout).unwrap();
println!("Timeout occurred at {:?}", result);
}
组合多个Future
use tokio_core::reactor::Core;
use futures::Future;
fn main() {
let mut core = Core::new().unwrap();
let future1 = futures::future::ok::<i32, ()>(10);
let future2 = futures::future::ok::<i32, ()>(20);
// 组合两个Future,等待它们都完成
let combined = future1.join(future2);
let (result1, result2) = core.run(combined).unwrap();
println!("Results: {} and {}", result1, result2);
}
完整示例:TCP Echo服务器
下面是一个完整的TCP Echo服务器示例,它会将客户端发送的数据原样返回:
use tokio_core::net::TcpListener;
use tokio_core::reactor::Core;
use tokio_core::io::{read_to_end, write_all};
use futures::{Future, Stream};
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
// 绑定到本地8080端口
let addr = "127.0.0.1:8080".parse().unwrap();
let listener = TcpListener::bind(&addr, &handle).unwrap();
println!("Echo server listening on {}", addr);
// 处理每个连接
let server = listener.incoming().for_each(|(socket, addr)| {
println!("New connection from {}", addr);
// 读取客户端数据
let read_future = read_to_end(socket, Vec::new());
// 处理读取结果并回写
let echo_future = read_future.and_then(|(socket, data)| {
println!("Received {} bytes from {}", data.len(), addr);
write_all(socket, data)
});
// 处理回写结果
let connection = echo_future.then(|_| Ok(()));
// 在事件循环中执行这个连接的处理
handle.spawn(connection);
Ok(())
});
// 运行服务器
core.run(server).unwrap();
}
完整示例:带超时的HTTP请求
下面是一个使用tokio-core进行HTTP请求并设置超时的完整示例:
use tokio_core::reactor::Core;
use tokio_core::net::TcpStream;
use tokio_core::reactor::Timeout;
use futures::{Future, Stream};
use std::time::Duration;
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
// 创建5秒超时
let timeout = Timeout::new(Duration::from_secs(5), &handle).unwrap();
// 创建HTTP请求
let addr = "example.com:80".parse().unwrap();
let request = TcpStream::connect(&addr, &handle)
.and_then(|socket| {
// 发送HTTP GET请求
write_all(socket, b"GET / HTTP/1.0\r\nHost: example.com\r\n\r\n")
})
.and_then(|(socket, _)| {
// 读取响应
read_to_end(socket, Vec::new())
});
// 组合请求和超时
let response = request.select2(timeout)
.then(|res| match res {
Ok((success, _)) => Ok(success),
Err((e, _)) => Err(e),
});
// 运行并处理结果
match core.run(response) {
Ok((_, data)) => {
println!("Received response: {} bytes", data.len());
println!("{}", String::from_utf8_lossy(&data));
}
Err(e) => {
println!("Error: {}", e);
}
}
}
注意事项
- tokio-core是tokio生态系统的基础组件,对于更复杂的应用,可以考虑使用更高级的tokio库
- 所有I/O操作必须通过事件循环的handle进行
- 长时间运行的计算任务应该使用
tokio_core::reactor::Timeout
或移动到单独的线程 - tokio-core已经被更现代的tokio版本取代,新项目建议使用tokio 1.0+
tokio-core为Rust提供了强大的异步编程能力,特别适合需要轻量级异步I/O解决方案的场景。通过结合Future和Stream抽象,可以构建出高效且易于理解的异步代码。