Rust异步编程库pyo3-asyncio的使用:实现Python与Rust高效协程交互的桥梁
Rust异步编程库pyo3-asyncio的使用:实现Python与Rust高效协程交互的桥梁
简介
PyO3 Asyncio是PyO3生态系统中的一个全新部分,它提供了Rust Futures和Python协程之间的交互功能,并管理它们对应事件循环的生命周期。这个库主要适用于以下场景:
- 需要使用Python异步函数的Rust库
- 为异步Rust库提供Python绑定
快速入门
Rust应用程序示例
使用async-std运行时:
# Cargo.toml依赖
[dependencies]
pyo3 = { version = "0.20" }
pyo3-asyncio = { version = "0.20", features = ["attributes", "async-std-runtime"] }
async-std = "1.9"
// main.rs
use pyo3::prelude::*;
#[pyo3_asyncio::async_std::main]
async fn main() -> PyResult<()> {
let fut = Python::with_gil(|py| {
let asyncio = py.import("asyncio")?;
// 将asyncio.sleep转换为Rust Future
pyo3_asyncio::async_std::into_future(asyncio.call_method1("sleep", (1.into_py(py),))?)
})?;
fut.await?;
Ok(())
}
使用tokio运行时:
# Cargo.toml依赖
[dependencies]
pyo3 = { version = "0.20" }
pyo3-asyncio = { version = "0.20", features = ["attributes", "tokio-runtime"] }
tokio = "1.9"
// main.rs
use pyo极速3::prelude::*;
#[pyo3_asyncio::tokio::main]
async fn main() -> PyResult<()> {
let fut = Python::with_gil(|py| {
let asyncio = py.import("asyncio")?;
// 将asyncio.sleep转换为Rust Future
pyo3_asyncio::tokio::into_future(asyncio.call_method1("sleep", (1.into_py(py),))?)
})?;
fut.await?;
Ok(())
}
PyO3原生Rust模块示例
使用async-std运行时:
# Cargo.toml
[lib]
name = "my_async_module"
crate-type = ["cdylib"]
[dependencies]
pyo3 = { version = "0.20", features = ["extension-module"] }
pyo3-asyncio = { version = "极速0.20", features = ["async-std-runtime"] }
async-std = "1.9"
// lib.rs
use pyo3::{prelude::*, wrap_pyfunction};
#[pyfunction]
fn rust_sleep(py: Python) -> PyResult<&PyAny> {
pyo3_asyncio::async_std::future_into_py(py, async {
async_std::task::sleep(std::time::Duration::from_secs(1)).await;
Ok(())
})
}
#[pymodule]
fn my_async_module(py: Python, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(rust_sleep, m)?)?;
Ok(())
}
使用tokio运行时:
// lib.rs
use pyo3::{prelude::*, wrap_pyfunction};
#[pyfunction]
fn rust_sleep(py: Python) -> PyResult<&PyAny> {
pyo3_asyncio::tokio::future_into_py(py, async {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
Ok(())
})
}
#[pymodule]
fn my_async_module(py: Python, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(rust_sleep, m)?)?;
Ok(())
}
核心功能
在Rust中等待Python异步函数
use pyo3::prelude::*;
#[pyo3_asyncio::tokio::main]
async fn main() -> PyResult<()> {
let future = Python::with_gil(|py| -> PyResult<_> {
// 导入包含py_sleep函数的模块
let example = py.import("example")?;
// 像普通函数一样调用py_sleep方法
// 返回一个coroutine
let coroutine = example.call_method0("py_sleep")?;
// 使用tokio运行时将coroutine转换为Rust future
pyo3_asyncio::tokio::into_future(coroutine)
})?;
// 等待future
future.await?;
Ok(())
}
在Python中等待Rust Future
Rust代码:
use pyo3::prelude::*;
async fn rust_sleep() {
async_std::task::sleep(std::time::Duration::from_secs(1)).await;
}
#[pyfunction]
fn call_rust_sleep(py: Python) -> PyResult<&PyAny> {
pyo3_asyncio::async_std::future_into_py(p极速y, async move {
rust_sleep().await;
Ok(())
})
}
Python代码:
from example import call_rust_sleep
async def main():
await call_rust_sleep()
完整示例
Rust调用Python异步函数
use pyo3::prelude::*;
#[pyo3_asyncio::tokio::main]
async fn main() -> PyResult<()> {
Python::with_gil(|py| {
// 创建一个简单的Python异步函数
let code = r#"
async def py_sleep(seconds):
import asyncio
await asyncio.sleep(seconds)
return seconds
"#;
py.run(code, None, None)?;
Ok(())
})?;
// 调用Python异步函数并等待结果
let result = Python::with_gil(|py| -> PyResult<_> {
let locals = PyDict::new(py);
let coroutine = py.eval("py_sleep(2)", None, Some(locals))?;
pyo3_asyncio::tokio::into_future(coroutine)
})?;
println!("Sleeping for {} seconds...", result.await?);
Ok(())
}
Python调用Rust异步函数
Rust代码 (lib.rs
):
use pyo3::{prelude::*, wrap_pyfunction};
use std::time::Duration;
#[pyfunction]
fn rust_sleep(py: Python, seconds: u64) -> PyResult<&PyAny> {
pyo3_asyncio::tokio::future_into_py(py, async move {
tokio::time::sleep(Duration::from_secs(seconds)).await;
Ok(seconds)
})
}
#[pymodule]
fn async_rust(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(rust_sleep, m)?)?;
Ok(())
}
Python代码:
import asyncio
from async_rust import rust_sleep
async def main():
print("Calling Rust async function...")
result = await rust_sleep(3)
print(f"Slept for {result} seconds")
asyncio.run(main())
事件循环管理
PyO3 Asyncio允许Python控制主线程,而Rust的事件循环在后台运行。对于非标准事件循环如uvloop,也提供了支持:
use pyo3::{prelude::*, types::PyType};
fn main() -> PyResult<()> {
pyo3::prepare_freethreaded_python();
Python::with_gil(|py| {
let uvloop = py.import("uvloop")?;
uvloop.call_method0("install")?;
// 存储引用用于断言
let uvloop = PyObject::from(uvloop);
pyo3_asyncio::async_std::run(py, async move {
// 验证我们是否在uvloop.Loop上
Python::with_gil(|py| -> PyResult<()> {
assert!(uvloop
.as_ref(py)
.getattr("Loop")?
.downcast::<PyType>()
.unwrap()
.is_instance(pyo3_asyncio::async_std::get_current_loop(py)?)?);
Ok(())
})?;
async_std::task::sleep(std::time::Duration::from_secs(1)).await;
Ok(())
})
})
}
总结
PyO3 Asyncio提供了强大的工具来实现Python和Rust之间的异步交互。通过这个库,你可以:
- 在Rust中调用和等待Python的异步函数
- 在Python中调用和等待Rust的异步函数
- 管理两种语言事件循环的生命周期
- 支持多种运行时(tokio、async-std)和非标准事件循环(如uvloop)
这个库是PyO3生态系统的重要组成部分,特别适合需要高性能异步交互的混合Python/Rust应用程序。
1 回复
以下是基于您提供的内容整理的完整示例demo,首先展示内容中的示例,然后提供一个更完整的实现示例:
内容中提供的示例回顾
1. Rust调用Python协程
use pyo3::prelude::*;
use pyo3_asyncio::TaskLocals;
#[pyfunction]
fn call_python_coroutine(py: Python<'_>, coro: PyObject) -> PyResult<&PyAny> {
pyo3_asyncio::into_future_with_locals(
TaskLocals::new(py),
coro.call0(py)?
)
}
2. 将Rust Future暴露为Python协程
use pyo3::prelude::*;
use pyo3_asyncio::TaskLocals;
async fn rust_async_function() -> String {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
"Hello from Rust!".to_string()
}
#[pyfunction]
fn get_rust_coroutine(py: Python<'_>) -> PyResult<&PyAny> {
pyo3_asyncio::async_std::future_into_py(py, async {
Ok(Python::with_gil(|py| {
rust_async_function().await.to_object(py)
}))
})
}
完整示例demo
下面是一个完整的双向交互示例,包含Rust模块定义和Python调用代码:
Rust端代码 (lib.rs)
use pyo3::{prelude::*, types::PyDict};
use pyo3_asyncio::{tokio::future_into_py, into_future};
// 异步Rust函数
async fn fetch_data_from_rust() -> String {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
"Data from Rust".to_string()
}
// 调用Python协程并返回结果的Rust函数
#[pyfunction]
fn call_python_and_rust(py: Python<'_>, python_coro: PyObject) -> PyResult<Py<PyAny>> {
// 将Rust Future转换为Python协程
future_into_py(py, async move {
// 1. 先调用Python协程
let py_result: i32 = into_future(python_coro).await?.extract()?;
println!("Received from Python: {}", py_result);
// 2. 然后执行Rust异步操作
let rust_result = fetch_data_from_rust().await;
println!("Generated in Rust: {}", rust_result);
// 3. 返回组合结果
Python::with_gil(|py| {
let dict = PyDict::new(py);
dict.set_item("python_result", py_result)?;
dict.set_item("rust_result", rust_result)?;
Ok(dict.to_object(py))
})
})
}
// 定义Python模块
#[pymodule]
fn async_interop(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(call_python_and_rust, m)?)?;
Ok(())
}
Python端代码 (main.py)
import asyncio
from async_interop import call_python_and_rust
async def python_coroutine():
await asyncio.sleep(0.5)
return 42
async def main():
# 调用混合函数
result = await call_python_and_rust(python_coroutine())
print("Final result:", result)
if __name__ == "__main__":
asyncio.run(main())
Cargo.toml 配置
[lib]
name = "async_interop"
crate-type = ["cdylib"]
[dependencies]
pyo3 = { version = "0.18", features = ["extension-module"] }
pyo3-asyncio = { version = "0.18", features = ["tokio-runtime"] }
tokio = { version = "1.0", features = ["full"] }
构建和运行步骤
- 使用
maturin develop
或maturin build
构建Python模块 - 运行
python main.py
查看结果
这个完整示例展示了:
- Rust调用Python协程并等待结果
- Python调用Rust异步函数
- 双向数据传递
- 混合错误处理
- 使用Tokio运行时
输出应该类似:
Received from Python: 42
Generated in Rust: Data from Rust
Final result: {'python_result': 42, 'rust_result': 'Data from Rust'}