Rust异步运行时库tokio-core的使用,tokio-core为Rust提供高效、可靠的异步I/O和事件驱动编程支持

Rust异步运行时库tokio-core的使用

弃用通知

这个crate已被弃用,建议使用Tokio替代。

tokio-core介绍

tokio-core是Rust中基于futuresmio构建的异步I/O和事件循环抽象的核心库。

使用方法

首先,在您的Cargo.toml中添加以下依赖:

[dependencies]
tokio-core = "0.1"

接下来,在您的crate中添加以下引用:

extern crate tokio_core;

完整示例代码

以下是一个使用tokio-core创建简单TCP服务器的完整示例:

extern crate tokio_core;
extern crate futures;

use tokio_core::reactor::Core;
use tokio_core::net::TcpListener;
use futures::Stream;

fn main() {
    // 创建事件循环
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    
    // 绑定TCP监听端口
    let listener = TcpListener::bind(&"127.0.0.1:8080".parse().unwrap(), &handle).unwrap();
    
    println!("Server running on 127.0.0.1:8080");
    
    // 处理传入连接
    let server = listener.incoming().for_each(|(socket, _)| {
        // 这里处理每个连接
        println!("New connection from: {}", socket.peer_addr().unwrap());
        
        // 关闭连接
        drop(socket);
        Ok(())
    });
    
    // 运行服务器
    core.run(server).unwrap();
}

另一个示例:简单的TCP客户端

extern crate tokio_core;
extern crate futures;

use tokio_core::reactor::Core;
use tokio_core::net::TcpStream;
use futures::Future;

fn main() {
    // 创建事件循环
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    
    // 连接服务器
    let addr = "127.0.0.1:8080".parse().unwrap();
    let tcp = TcpStream::connect(&addr, &handle);
    
    // 处理连接
    let client = tcp.and_then(|socket| {
        println!("Connected to server!");
        
        // 关闭连接
        drop(socket);
        Ok(())
    });
    
    // 运行客户端
    core.run(client).unwrap();
}

完整示例demo

以下是一个增强版的TCP服务器示例,增加了简单的消息处理功能:

extern crate tokio_core;
extern crate futures;

use tokio_core::reactor::Core;
use tokio_core::net::{TcpListener, TcpStream};
use futures::{Future, Stream};
use std::io::{self, Read, Write};

fn main() {
    // 创建事件循环
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    
    // 绑定TCP监听端口
    let listener = TcpListener::bind(&"127.0.0.1:8080".parse().unwrap(), &handle).unwrap();
    
    println!("Server running on 127.0.0.1:8080");
    
    // 处理传入连接
    let server = listener.incoming().for_each(|(socket, _)| {
        // 获取客户端地址
        let addr = socket.peer_addr().unwrap();
        println!("New connection from: {}", addr);
        
        // 创建读写缓冲区
        let (reader, writer) = socket.split();
        
        // 读取客户端数据
        let read_future = tokio_core::io::read(reader, vec![0; 1024])
            .and_then(move |(reader, buf, n)| {
                if n == 0 {
                    println!("Connection closed by: {}", addr);
                    return Ok(());
                }
                
                let received = String::from_utf8_lossy(&buf[..n]);
                println!("Received from {}: {}", addr, received);
                
                // 回显消息
                tokio_core::io::write_all(writer, buf)
                    .map(|_| ())
                    .map_err(|e| e.into())
            });
        
        // 处理错误
        handle.spawn(read_future.map_err(|e| {
            println!("Error with connection from {}: {}", addr, e);
        }));
        
        Ok(())
    });
    
    // 运行服务器
    core.run(server).unwrap();
}

许可证

本项目采用以下任一许可证:

  • Apache License, Version 2.0
  • MIT license

贡献

除非您明确声明,否则根据Apache-2.0许可证提交的任何贡献都将按上述双重许可,不附加任何额外条款或条件。


1 回复

Rust异步运行时库tokio-core的使用指南

tokio-core简介

tokio-core是Rust生态中一个轻量级的异步I/O库,为开发者提供了高效、可靠的事件驱动编程支持。它是tokio生态系统的基础组件,专注于提供核心的异步I/O功能。

tokio-core主要特点:

  • 基于mio库构建,提供跨平台的事件通知系统
  • 实现了Future和Stream抽象
  • 提供TCP/UDP网络支持
  • 轻量级设计,适合构建小型异步应用

基本使用方法

添加依赖

首先在Cargo.toml中添加依赖:

[dependencies]
tokio-core = "0.1"
futures = "0.1"

创建事件循环

extern crate tokio_core;
extern crate futures;

use tokio_core::reactor::Core;
use futures::Future;

fn main() {
    // 创建事件循环
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    
    // 创建一个立即完成的Future
    let future = futures::future::ok::<_, ()>(42);
    
    // 运行Future到完成
    let result = core.run(future).unwrap();
    println!("Result: {}", result);
}

TCP服务器示例

use tokio_core::net::TcpListener;
use tokio_core::reactor::Core;
use futures::Stream;

fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    
    // 绑定到本地地址
    let addr = "127.0.0.1:8080".parse().unwrap();
    let listener = TcpListener::bind(&addr, &handle).unwrap();
    
    println!("Listening on {}", addr);
    
    // 接受连接并处理
    let server = listener.incoming().for_each(|(socket, addr)| {
        println!("New connection from {}", addr);
        
        // 这里可以处理socket
        // 例如: tokio_core::io::read_to_end(socket, vec![])
        
        Ok(())
    });
    
    // 运行服务器
    core.run(server).unwrap();
}

TCP客户端示例

use tokio_core::net::TcpStream;
use tokio_core::reactor::Core;
use futures::Future;

fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    
    let addr = "127.0.0.1:8080".parse().unwrap();
    
    // 连接服务器
    let client = TcpStream::connect(&addr, &handle)
        .and_then(|socket| {
            println!("Connected to server!");
            
            // 这里可以写入或读取数据
            // 例如: tokio_core::io::write_all(socket, b"hello world")
            
            Ok(())
        });
    
    // 运行客户端
    core.run(client).unwrap();
}

高级功能

超时处理

use tokio_core::reactor::Core;
use futures::Future;
use std::time::Duration;
use tokio_core::reactor::Timeout;

fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    
    // 创建2秒超时
    let timeout = Timeout::new(Duration::from_secs(2), &handle).unwrap();
    
    // 等待超时
    let result = core.run(timeout).unwrap();
    println!("Timeout occurred at {:?}", result);
}

组合多个Future

use tokio_core::reactor::Core;
use futures::Future;

fn main() {
    let mut core = Core::new().unwrap();
    
    let future1 = futures::future::ok::<i32, ()>(10);
    let future2 = futures::future::ok::<i32, ()>(20);
    
    // 组合两个Future,等待它们都完成
    let combined = future1.join(future2);
    
    let (result1, result2) = core.run(combined).unwrap();
    println!("Results: {} and {}", result1, result2);
}

完整示例:TCP Echo服务器

下面是一个完整的TCP Echo服务器示例,它会将客户端发送的数据原样返回:

use tokio_core::net::TcpListener;
use tokio_core::reactor::Core;
use tokio_core::io::{read_to_end, write_all};
use futures::{Future, Stream};

fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    
    // 绑定到本地8080端口
    let addr = "127.0.0.1:8080".parse().unwrap();
    let listener = TcpListener::bind(&addr, &handle).unwrap();
    
    println!("Echo server listening on {}", addr);
    
    // 处理每个连接
    let server = listener.incoming().for_each(|(socket, addr)| {
        println!("New connection from {}", addr);
        
        // 读取客户端数据
        let read_future = read_to_end(socket, Vec::new());
        
        // 处理读取结果并回写
        let echo_future = read_future.and_then(|(socket, data)| {
            println!("Received {} bytes from {}", data.len(), addr);
            write_all(socket, data)
        });
        
        // 处理回写结果
        let connection = echo_future.then(|_| Ok(()));
        
        // 在事件循环中执行这个连接的处理
        handle.spawn(connection);
        
        Ok(())
    });
    
    // 运行服务器
    core.run(server).unwrap();
}

完整示例:带超时的HTTP请求

下面是一个使用tokio-core进行HTTP请求并设置超时的完整示例:

use tokio_core::reactor::Core;
use tokio_core::net::TcpStream;
use tokio_core::reactor::Timeout;
use futures::{Future, Stream};
use std::time::Duration;

fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    
    // 创建5秒超时
    let timeout = Timeout::new(Duration::from_secs(5), &handle).unwrap();
    
    // 创建HTTP请求
    let addr = "example.com:80".parse().unwrap();
    let request = TcpStream::connect(&addr, &handle)
        .and_then(|socket| {
            // 发送HTTP GET请求
            write_all(socket, b"GET / HTTP/1.0\r\nHost: example.com\r\n\r\n")
        })
        .and_then(|(socket, _)| {
            // 读取响应
            read_to_end(socket, Vec::new())
        });
    
    // 组合请求和超时
    let response = request.select2(timeout)
        .then(|res| match res {
            Ok((success, _)) => Ok(success),
            Err((e, _)) => Err(e),
        });
    
    // 运行并处理结果
    match core.run(response) {
        Ok((_, data)) => {
            println!("Received response: {} bytes", data.len());
            println!("{}", String::from_utf8_lossy(&data));
        }
        Err(e) => {
            println!("Error: {}", e);
        }
    }
}

注意事项

  1. tokio-core是tokio生态系统的基础组件,对于更复杂的应用,可以考虑使用更高级的tokio库
  2. 所有I/O操作必须通过事件循环的handle进行
  3. 长时间运行的计算任务应该使用tokio_core::reactor::Timeout或移动到单独的线程
  4. tokio-core已经被更现代的tokio版本取代,新项目建议使用tokio 1.0+

tokio-core为Rust提供了强大的异步编程能力,特别适合需要轻量级异步I/O解决方案的场景。通过结合Future和Stream抽象,可以构建出高效且易于理解的异步代码。

回到顶部