Rust异步编程库pyo3-asyncio的使用:实现Python与Rust高效协程交互的桥梁

Rust异步编程库pyo3-asyncio的使用:实现Python与Rust高效协程交互的桥梁

简介

PyO3 Asyncio是PyO3生态系统中的一个全新部分,它提供了Rust Futures和Python协程之间的交互功能,并管理它们对应事件循环的生命周期。这个库主要适用于以下场景:

  1. 需要使用Python异步函数的Rust库
  2. 为异步Rust库提供Python绑定

快速入门

Rust应用程序示例

使用async-std运行时:

# Cargo.toml依赖
[dependencies]
pyo3 = { version = "0.20" }
pyo3-asyncio = { version = "0.20", features = ["attributes", "async-std-runtime"] }
async-std = "1.9"
// main.rs
use pyo3::prelude::*;

#[pyo3_asyncio::async_std::main]
async fn main() -> PyResult<()> {
    let fut = Python::with_gil(|py| {
        let asyncio = py.import("asyncio")?;
        // 将asyncio.sleep转换为Rust Future
        pyo3_asyncio::async_std::into_future(asyncio.call_method1("sleep", (1.into_py(py),))?)
    })?;

    fut.await?;

    Ok(())
}

使用tokio运行时:

# Cargo.toml依赖
[dependencies]
pyo3 = { version = "0.20" }
pyo3-asyncio = { version = "0.20", features = ["attributes", "tokio-runtime"] }
tokio = "1.9"
// main.rs
use pyo极速3::prelude::*;

#[pyo3_asyncio::tokio::main]
async fn main() -> PyResult<()> {
    let fut = Python::with_gil(|py| {
        let asyncio = py.import("asyncio")?;
        // 将asyncio.sleep转换为Rust Future
        pyo3_asyncio::tokio::into_future(asyncio.call_method1("sleep", (1.into_py(py),))?)
    })?;

    fut.await?;

    Ok(())
}

PyO3原生Rust模块示例

使用async-std运行时:

# Cargo.toml
[lib]
name = "my_async_module"
crate-type = ["cdylib"]

[dependencies]
pyo3 = { version = "0.20", features = ["extension-module"] }
pyo3-asyncio = { version = "极速0.20", features = ["async-std-runtime"] }
async-std = "1.9"
// lib.rs
use pyo3::{prelude::*, wrap_pyfunction};

#[pyfunction]
fn rust_sleep(py: Python) -> PyResult<&PyAny> {
    pyo3_asyncio::async_std::future_into_py(py, async {
        async_std::task::sleep(std::time::Duration::from_secs(1)).await;
        Ok(())
    })
}

#[pymodule]
fn my_async_module(py: Python, m: &PyModule) -> PyResult<()> {
    m.add_function(wrap_pyfunction!(rust_sleep, m)?)?;
    Ok(())
}

使用tokio运行时:

// lib.rs
use pyo3::{prelude::*, wrap_pyfunction};

#[pyfunction]
fn rust_sleep(py: Python) -> PyResult<&PyAny> {
    pyo3_asyncio::tokio::future_into_py(py, async {
        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
        Ok(())
    })
}

#[pymodule]
fn my_async_module(py: Python, m: &PyModule) -> PyResult<()> {
    m.add_function(wrap_pyfunction!(rust_sleep, m)?)?;
    Ok(())
}

核心功能

在Rust中等待Python异步函数

use pyo3::prelude::*;

#[pyo3_asyncio::tokio::main]
async fn main() -> PyResult<()> {
    let future = Python::with_gil(|py| -> PyResult<_> {
        // 导入包含py_sleep函数的模块
        let example = py.import("example")?;

        // 像普通函数一样调用py_sleep方法
        // 返回一个coroutine
        let coroutine = example.call_method0("py_sleep")?;

        // 使用tokio运行时将coroutine转换为Rust future
        pyo3_asyncio::tokio::into_future(coroutine)
    })?;

    // 等待future
    future.await?;

    Ok(())
}

在Python中等待Rust Future

Rust代码:

use pyo3::prelude::*;

async fn rust_sleep() {
    async_std::task::sleep(std::time::Duration::from_secs(1)).await;
}

#[pyfunction]
fn call_rust_sleep(py: Python) -> PyResult<&PyAny> {
    pyo3_asyncio::async_std::future_into_py(p极速y, async move {
        rust_sleep().await;
        Ok(())
    })
}

Python代码:

from example import call_rust_sleep

async def main():
    await call_rust_sleep()

完整示例

Rust调用Python异步函数

use pyo3::prelude::*;

#[pyo3_asyncio::tokio::main]
async fn main() -> PyResult<()> {
    Python::with_gil(|py| {
        // 创建一个简单的Python异步函数
        let code = r#"
async def py_sleep(seconds):
    import asyncio
    await asyncio.sleep(seconds)
    return seconds
"#;
        py.run(code, None, None)?;
        Ok(())
    })?;

    // 调用Python异步函数并等待结果
    let result = Python::with_gil(|py| -> PyResult<_> {
        let locals = PyDict::new(py);
        let coroutine = py.eval("py_sleep(2)", None, Some(locals))?;
        pyo3_asyncio::tokio::into_future(coroutine)
    })?;

    println!("Sleeping for {} seconds...", result.await?);
    Ok(())
}

Python调用Rust异步函数

Rust代码 (lib.rs):

use pyo3::{prelude::*, wrap_pyfunction};
use std::time::Duration;

#[pyfunction]
fn rust_sleep(py: Python, seconds: u64) -> PyResult<&PyAny> {
    pyo3_asyncio::tokio::future_into_py(py, async move {
        tokio::time::sleep(Duration::from_secs(seconds)).await;
        Ok(seconds)
    })
}

#[pymodule]
fn async_rust(_py: Python, m: &PyModule) -> PyResult<()> {
    m.add_function(wrap_pyfunction!(rust_sleep, m)?)?;
    Ok(())
}

Python代码:

import asyncio
from async_rust import rust_sleep

async def main():
    print("Calling Rust async function...")
    result = await rust_sleep(3)
    print(f"Slept for {result} seconds")

asyncio.run(main())

事件循环管理

PyO3 Asyncio允许Python控制主线程,而Rust的事件循环在后台运行。对于非标准事件循环如uvloop,也提供了支持:

use pyo3::{prelude::*, types::PyType};

fn main() -> PyResult<()> {
    pyo3::prepare_freethreaded_python();

    Python::with_gil(|py| {
        let uvloop = py.import("uvloop")?;
        uvloop.call_method0("install")?;

        // 存储引用用于断言
        let uvloop = PyObject::from(uvloop);

        pyo3_asyncio::async_std::run(py, async move {
            // 验证我们是否在uvloop.Loop上
            Python::with_gil(|py| -> PyResult<()> {
                assert!(uvloop
                    .as_ref(py)
                    .getattr("Loop")?
                    .downcast::<PyType>()
                    .unwrap()
                    .is_instance(pyo3_asyncio::async_std::get_current_loop(py)?)?);
                Ok(())
            })?;

            async_std::task::sleep(std::time::Duration::from_secs(1)).await;

            Ok(())
        })
    })
}

总结

PyO3 Asyncio提供了强大的工具来实现Python和Rust之间的异步交互。通过这个库,你可以:

  1. 在Rust中调用和等待Python的异步函数
  2. 在Python中调用和等待Rust的异步函数
  3. 管理两种语言事件循环的生命周期
  4. 支持多种运行时(tokio、async-std)和非标准事件循环(如uvloop)

这个库是PyO3生态系统的重要组成部分,特别适合需要高性能异步交互的混合Python/Rust应用程序。


1 回复

以下是基于您提供的内容整理的完整示例demo,首先展示内容中的示例,然后提供一个更完整的实现示例:

内容中提供的示例回顾

1. Rust调用Python协程

use pyo3::prelude::*;
use pyo3_asyncio::TaskLocals;

#[pyfunction]
fn call_python_coroutine(py: Python<'_>, coro: PyObject) -> PyResult<&PyAny> {
    pyo3_asyncio::into_future_with_locals(
        TaskLocals::new(py),
        coro.call0(py)?
    )
}

2. 将Rust Future暴露为Python协程

use pyo3::prelude::*;
use pyo3_asyncio::TaskLocals;

async fn rust_async_function() -> String {
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
    "Hello from Rust!".to_string()
}

#[pyfunction]
fn get_rust_coroutine(py: Python<'_>) -> PyResult<&PyAny> {
    pyo3_asyncio::async_std::future_into_py(py, async {
        Ok(Python::with_gil(|py| {
            rust_async_function().await.to_object(py)
        }))
    })
}

完整示例demo

下面是一个完整的双向交互示例,包含Rust模块定义和Python调用代码:

Rust端代码 (lib.rs)

use pyo3::{prelude::*, types::PyDict};
use pyo3_asyncio::{tokio::future_into_py, into_future};

// 异步Rust函数
async fn fetch_data_from_rust() -> String {
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
    "Data from Rust".to_string()
}

// 调用Python协程并返回结果的Rust函数
#[pyfunction]
fn call_python_and_rust(py: Python<'_>, python_coro: PyObject) -> PyResult<Py<PyAny>> {
    // 将Rust Future转换为Python协程
    future_into_py(py, async move {
        // 1. 先调用Python协程
        let py_result: i32 = into_future(python_coro).await?.extract()?;
        println!("Received from Python: {}", py_result);

        // 2. 然后执行Rust异步操作
        let rust_result = fetch_data_from_rust().await;
        println!("Generated in Rust: {}", rust_result);

        // 3. 返回组合结果
        Python::with_gil(|py| {
            let dict = PyDict::new(py);
            dict.set_item("python_result", py_result)?;
            dict.set_item("rust_result", rust_result)?;
            Ok(dict.to_object(py))
        })
    })
}

// 定义Python模块
#[pymodule]
fn async_interop(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
    m.add_function(wrap_pyfunction!(call_python_and_rust, m)?)?;
    Ok(())
}

Python端代码 (main.py)

import asyncio
from async_interop import call_python_and_rust

async def python_coroutine():
    await asyncio.sleep(0.5)
    return 42

async def main():
    # 调用混合函数
    result = await call_python_and_rust(python_coroutine())
    print("Final result:", result)

if __name__ == "__main__":
    asyncio.run(main())

Cargo.toml 配置

[lib]
name = "async_interop"
crate-type = ["cdylib"]

[dependencies]
pyo3 = { version = "0.18", features = ["extension-module"] }
pyo3-asyncio = { version = "0.18", features = ["tokio-runtime"] }
tokio = { version = "1.0", features = ["full"] }

构建和运行步骤

  1. 使用 maturin developmaturin build 构建Python模块
  2. 运行 python main.py 查看结果

这个完整示例展示了:

  • Rust调用Python协程并等待结果
  • Python调用Rust异步函数
  • 双向数据传递
  • 混合错误处理
  • 使用Tokio运行时

输出应该类似:

Received from Python: 42
Generated in Rust: Data from Rust
Final result: {'python_result': 42, 'rust_result': 'Data from Rust'}
回到顶部