Rust异步编程库pyo3-asyncio-macros的使用:实现Python与Rust高效协程交互的宏工具
Rust异步编程库pyo3-asyncio-macros的使用:实现Python与Rust高效协程交互的宏工具
快速入门示例
Rust应用程序示例
使用async-std运行时:
# Cargo.toml dependencies
[dependencies]
pyo3 = { version = "0.20" }
pyo3-asyncio = { version = "0.20", features = ["attributes", "async-std-runtime"] }
async-std = "1.9"
//! main.rs
use pyo3::prelude::*;
#[pyo3_asyncio::async_std::main]
async fn main() -> PyResult<()> {
let fut = Python::with_gil(|py| {
let asyncio = py.import("asyncio")?;
// 将asyncio.sleep转换为Rust Future
pyo3_asyncio::async_std::into_future(asyncio.call_method1("sleep", (1.into_py(py),))?)
})?;
fut.await?;
Ok(())
}
使用tokio运行时:
# Cargo.toml dependencies
[dependencies]
pyo3 = { version = "0.20" }
pyo3-asyncio = { version = "0.20", features = ["attributes", "tokio-runtime"] }
tokio = "1.9"
//! main.rs
use pyo3::prelude::*;
#[pyo3_asyncio::tokio::main]
async fn main() -> PyResult<()> {
let fut = Python::with_gil(|py| {
let asyncio = py.import("asyncio")?;
// 将asyncio.sleep转换为Rust Future
pyo3_asyncio::tokio::into_future(asyncio.call_method1("sleep", (1.into极速(py),))?)
})?;
fut.await?;
Ok(())
}
PyO3原生Rust模块示例
使用async-std运行时:
# Cargo.toml
[lib]
name = "my_async_module"
crate-type = ["cdylib"]
[dependencies]
pyo3 = { version = "0.20", features = ["extension-module"] }
pyo3-asyncio = { version = "0.20", features = ["async-std-runtime"] }
async-std = "1.9"
//! lib.rs
use pyo3::{prelude::*, wrap_pyfunction};
#[pyfunction]
fn rust_sleep(py: Python) -> PyResult<&PyAny> {
pyo3_asyncio::async_std::future_into_py(py, async {
async_std::task::sleep(std::time::Duration::from_secs(1)).await;
Ok(())
})
}
#[pymodule]
fn my_async_module(py: Python, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(rust_sleep, m)?)?;
Ok(())
}
使用tokio运行时:
//! lib.rs
use pyo3::{prelude::*, wrap_pyfunction};
#[pyfunction]
fn rust_sleep(py: Python) -> PyResult极速&PyAny> {
pyo3_asyncio::tokio::future_into_py(py, async {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
Ok(())
})
}
#[pymodule]
fn my_async_module(py: Python, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(rust_sleep, m)?)?;
Ok(())
}
完整示例
在Rust中调用Python异步函数
use pyo3::prelude::*;
#[pyo3_asyncio::tokio::main]
async fn main() -> PyResult<()> {
let future = Python::with_gil(|py| -> PyResult<_> {
// 导入包含py_sleep函数的模块
let example = py.import("example")?;
// 像普通函数一样调用py_sleep方法
// 返回一个coroutine对象
let coroutine = example.call_method0("py_sleep")?;
// 使用tokio运行时将coroutine转换为Rust Future
pyo3_asyncio::tokio::into_future(coroutine)
})?;
// 等待future完成
future.await?;
Ok(())
}
在Python中调用Rust异步函数
use pyo3::prelude::*;
async fn rust_sleep() {
async_std::task::sleep(std::time::Duration::from_secs(1)).await;
}
#[pyfunction]
fn call_rust_sleep(py: Python) -> PyResult<&PyAny> {
pyo3_asyncio::async_std::future_into_py(py, async move {
rust_sleep().await;
Ok(())
})
}
然后在Python中调用:
from example import call_rust_sleep
async def rust_sleep():
await call_rust_sleep()
关键特性
- 双向转换:支持Python协程和Rust Future之间的双向转换
- 多运行时支持:支持tokio和async-std两种主要Rust异步运行时
- 事件循环管理:自动处理Python asyncio事件循环的生命周期
- 原生模块支持:可以创建包含异步函数的Python扩展模块
使用场景
- 在Rust应用程序中调用Python异步代码
- 为Rust异步库创建Python绑定
- 构建混合Python/Rust的异步应用程序
注意事项
- Python需要控制主线程,因此Rust的运行时需要在后台运行
- 使用非标准事件循环(如uvloop)时需要特殊处理
- 测试异步代码需要特殊的测试框架设置
通过pyo3-asyncio-macros,开发者可以轻松地在Python和Rust之间构建高效的异步互操作,充分利用两种语言的优势。
1 回复
Rust异步编程库pyo3-asyncio-macros使用指南
介绍
pyo3-asyncio-macros
是一个Rust宏工具库,用于简化Python和Rust之间的异步协程交互。它构建在pyo3
和tokio
之上,提供了便捷的宏来桥接Python的asyncio和Rust的异步运行时。
该库主要解决以下问题:
- 在Rust中调用Python协程
- 在Python中调用Rust异步函数
- 自动处理两种语言运行时之间的转换
安装
在Cargo.toml中添加依赖:
[dependencies]
pyo3 = { version = "0.18", features = ["extension-module"] }
pyo3-asyncio = { version = "0.18", features = ["tokio-runtime"] }
pyo3-asyncio-macros = "0.18"
tokio = { version = "1.0", features = ["full"] }
基本使用方法
1. 从Rust调用Python协程
use pyo3::prelude::*;
use pyo3_asyncio_macros::pyo3_asyncio;
#[pyfunction]
fn call_python_coroutine(py: Python<'_>, coro: PyObject) -> PyResult<PyObject> {
pyo3_asyncio::tokio::into_future(coro)
}
#[pymodule]
fn my_module(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(call_python_coroutine, m)?)?;
Ok(())
}
2. 从Python调用Rust异步函数
use pyo3::prelude::*;
use pyo3_asyncio_macros::async_fn;
#[async_fn]
async fn rust_async_function(arg: i32) -> PyResult<i32> {
tokio::time::sleep(std::time::Duration::from_secs(1).await;
Ok(arg * 2)
}
#[pymodule]
fn my_module(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(rust_async_function, m)?)?;
Ok(())
}
高级用法
1. 同时处理Python和Rust协程
use pyo3::prelude::*;
use pyo3_asyncio_macros::{async_fn, pyo3_asyncio};
#[async_fn]
async fn process_data(py: Python<'_>, coro: PyObject) -> PyResult<String> {
// 调用Python协程并等待结果
let result: i32 = pyo3_asyncio::tokio::into_future(coro).await?.extract(py)?;
// 执行Rust异步操作
let processed = tokio::task::spawn_blocking(move || {
// 这里是CPU密集型计算
result.to_string().repeat(3)
}).await?;
Ok(processed)
}
2. 错误处理
use pyo3::prelude::*;
use pyo3_asyncio_macros::async_fn;
#[async_fn]
async fn fallible_operation(arg: i32) -> PyResult<i32> {
if arg < 0 {
return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(
"Argument must be positive"
));
}
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
Ok(arg * 2)
}
Python端使用示例
import asyncio
import my_module # 假设编译后的模块名为my_module
async def python_coroutine(x):
await asyncio.sleep(1)
return x + 10
async def main():
# 调用Rust异步函数
result = await my_module.rust_async_function(42)
print(f"Rust async result: {result}")
# 从Rust调用Python协程
result = await my_module.call_python_coroutine(python_coroutine(5))
print(f"Python coroutine result from Rust: {result}")
# 复杂处理
processed = await my_module.process_data(python_coroutine(3))
print(f"Processed data: {processed}")
asyncio.run(main())
完整示例demo
下面是一个完整的Rust模块示例,展示了如何实现双向的异步交互:
// src/lib.rs
use pyo3::prelude::*;
use pyo3_asyncio_macros::{async_fn, pyo3_asyncio};
use tokio::time;
#[pyfunction]
fn call_python_coroutine(py: Python<'_>, coro: PyObject) -> PyResult<PyObject> {
// 将Python协程转换为Rust Future
pyo3_asyncio::tokio::into_future(coro)
}
#[async_fn]
async fn rust_async_function(arg: i32) -> PyResult<i32> {
// 模拟异步操作
time::sleep(time::Duration::from_secs(1)).await;
Ok(arg * 2)
}
#[async_fn]
async fn process_data(py: Python<'_>, coro: PyObject) -> PyResult<String> {
// 调用Python协程并获取结果
let result: i32 = pyo3_asyncio::tokio::into_future(coro).await?.extract(py)?;
// 执行CPU密集型计算
let processed = tokio::task::spawn_blocking(move || {
format!("Processed-{}", result * 3)
}).await?;
Ok(processed)
}
#[pymodule]
fn async_utils(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(call_python_coroutine, m)?)?;
m.add_function(wrap_pyfunction!(rust_async_function, m)?)?;
m.add_function(wrap_pyfunction!(process_data, m)?)?;
Ok(())
}
对应的Python测试代码:
# test_async.py
import asyncio
import async_utils # 编译后的模块名
async def python_task(x):
print(f"Python coroutine started with {x}")
await asyncio.sleep(0.5)
return x + 100
async def main():
# 测试Rust异步函数
rust_result = await async_utils.rust_async_function(21)
print(f"Rust async result: {rust_result}")
# 测试从Rust调用Python协程
py_result = await async_utils.call_python_coroutine(python_task(7))
print(f"Python coroutine result from Rust: {py_result}")
# 测试复杂处理流程
processed = await async_utils.process_data(python_task(3))
print(f"Processed data: {processed}")
if __name__ == "__main__":
asyncio.run(main())
注意事项
- 确保在Python端正确初始化了Rust模块
- 对于长时间运行的Rust异步任务,考虑使用
tokio::task::spawn_blocking
来避免阻塞事件循环 - 错误需要在Rust和Python之间正确转换
- 注意Python GIL的生命周期管理,特别是在跨await点时
pyo3-asyncio-macros
通过提供这些宏简化了异步交互的样板代码,使得Python和Rust之间的异步协作更加直观和高效。