Rust异步编程库pyo3-asyncio-macros的使用:实现Python与Rust高效协程交互的宏工具

Rust异步编程库pyo3-asyncio-macros的使用:实现Python与Rust高效协程交互的宏工具

快速入门示例

Rust应用程序示例

使用async-std运行时:

# Cargo.toml dependencies
[dependencies]
pyo3 = { version = "0.20" }
pyo3-asyncio = { version = "0.20", features = ["attributes", "async-std-runtime"] }
async-std = "1.9"
//! main.rs
use pyo3::prelude::*;

#[pyo3_asyncio::async_std::main]
async fn main() -> PyResult<()> {
    let fut = Python::with_gil(|py| {
        let asyncio = py.import("asyncio")?;
        // 将asyncio.sleep转换为Rust Future
        pyo3_asyncio::async_std::into_future(asyncio.call_method1("sleep", (1.into_py(py),))?)
    })?;

    fut.await?;

    Ok(())
}

使用tokio运行时:

# Cargo.toml dependencies
[dependencies]
pyo3 = { version = "0.20" }
pyo3-asyncio = { version = "0.20", features = ["attributes", "tokio-runtime"] }
tokio = "1.9"
//! main.rs
use pyo3::prelude::*;

#[pyo3_asyncio::tokio::main]
async fn main() -> PyResult<()> {
    let fut = Python::with_gil(|py| {
        let asyncio = py.import("asyncio")?;
        // 将asyncio.sleep转换为Rust Future
        pyo3_asyncio::tokio::into_future(asyncio.call_method1("sleep", (1.into极速(py),))?)
    })?;

    fut.await?;

    Ok(())
}

PyO3原生Rust模块示例

使用async-std运行时:

# Cargo.toml
[lib]
name = "my_async_module"
crate-type = ["cdylib"]

[dependencies]
pyo3 = { version = "0.20", features = ["extension-module"] }
pyo3-asyncio = { version = "0.20", features = ["async-std-runtime"] }
async-std = "1.9"
//! lib.rs
use pyo3::{prelude::*, wrap_pyfunction};

#[pyfunction]
fn rust_sleep(py: Python) -> PyResult<&PyAny> {
    pyo3_asyncio::async_std::future_into_py(py, async {
        async_std::task::sleep(std::time::Duration::from_secs(1)).await;
        Ok(())
    })
}

#[pymodule]
fn my_async_module(py: Python, m: &PyModule) -> PyResult<()> {
    m.add_function(wrap_pyfunction!(rust_sleep, m)?)?;
    Ok(())
}

使用tokio运行时:

//! lib.rs
use pyo3::{prelude::*, wrap_pyfunction};

#[pyfunction]
fn rust_sleep(py: Python) -> PyResult极速&PyAny> {
    pyo3_asyncio::tokio::future_into_py(py, async {
        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
        Ok(())
    })
}

#[pymodule]
fn my_async_module(py: Python, m: &PyModule) -> PyResult<()> {
    m.add_function(wrap_pyfunction!(rust_sleep, m)?)?;
    Ok(())
}

完整示例

在Rust中调用Python异步函数

use pyo3::prelude::*;

#[pyo3_asyncio::tokio::main]
async fn main() -> PyResult<()> {
    let future = Python::with_gil(|py| -> PyResult<_> {
        // 导入包含py_sleep函数的模块
        let example = py.import("example")?;

        // 像普通函数一样调用py_sleep方法
        // 返回一个coroutine对象
        let coroutine = example.call_method0("py_sleep")?;

        // 使用tokio运行时将coroutine转换为Rust Future
        pyo3_asyncio::tokio::into_future(coroutine)
    })?;

    // 等待future完成
    future.await?;

    Ok(())
}

在Python中调用Rust异步函数

use pyo3::prelude::*;

async fn rust_sleep() {
    async_std::task::sleep(std::time::Duration::from_secs(1)).await;
}

#[pyfunction]
fn call_rust_sleep(py: Python) -> PyResult<&PyAny> {
    pyo3_asyncio::async_std::future_into_py(py, async move {
        rust_sleep().await;
        Ok(())
    })
}

然后在Python中调用:

from example import call_rust_sleep

async def rust_sleep():
    await call_rust_sleep()

关键特性

  1. 双向转换:支持Python协程和Rust Future之间的双向转换
  2. 多运行时支持:支持tokio和async-std两种主要Rust异步运行时
  3. 事件循环管理:自动处理Python asyncio事件循环的生命周期
  4. 原生模块支持:可以创建包含异步函数的Python扩展模块

使用场景

  • 在Rust应用程序中调用Python异步代码
  • 为Rust异步库创建Python绑定
  • 构建混合Python/Rust的异步应用程序

注意事项

  1. Python需要控制主线程,因此Rust的运行时需要在后台运行
  2. 使用非标准事件循环(如uvloop)时需要特殊处理
  3. 测试异步代码需要特殊的测试框架设置

通过pyo3-asyncio-macros,开发者可以轻松地在Python和Rust之间构建高效的异步互操作,充分利用两种语言的优势。


1 回复

Rust异步编程库pyo3-asyncio-macros使用指南

介绍

pyo3-asyncio-macros是一个Rust宏工具库,用于简化Python和Rust之间的异步协程交互。它构建在pyo3tokio之上,提供了便捷的宏来桥接Python的asyncio和Rust的异步运行时。

该库主要解决以下问题:

  • 在Rust中调用Python协程
  • 在Python中调用Rust异步函数
  • 自动处理两种语言运行时之间的转换

安装

在Cargo.toml中添加依赖:

[dependencies]
pyo3 = { version = "0.18", features = ["extension-module"] }
pyo3-asyncio = { version = "0.18", features = ["tokio-runtime"] }
pyo3-asyncio-macros = "0.18"
tokio = { version = "1.0", features = ["full"] }

基本使用方法

1. 从Rust调用Python协程

use pyo3::prelude::*;
use pyo3_asyncio_macros::pyo3_asyncio;

#[pyfunction]
fn call_python_coroutine(py: Python<'_>, coro: PyObject) -> PyResult<PyObject> {
    pyo3_asyncio::tokio::into_future(coro)
}

#[pymodule]
fn my_module(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
    m.add_function(wrap_pyfunction!(call_python_coroutine, m)?)?;
    Ok(())
}

2. 从Python调用Rust异步函数

use pyo3::prelude::*;
use pyo3_asyncio_macros::async_fn;

#[async_fn]
async fn rust_async_function(arg: i32) -> PyResult<i32> {
    tokio::time::sleep(std::time::Duration::from_secs(1).await;
    Ok(arg * 2)
}

#[pymodule]
fn my_module(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
    m.add_function(wrap_pyfunction!(rust_async_function, m)?)?;
    Ok(())
}

高级用法

1. 同时处理Python和Rust协程

use pyo3::prelude::*;
use pyo3_asyncio_macros::{async_fn, pyo3_asyncio};

#[async_fn]
async fn process_data(py: Python<'_>, coro: PyObject) -> PyResult<String> {
    // 调用Python协程并等待结果
    let result: i32 = pyo3_asyncio::tokio::into_future(coro).await?.extract(py)?;
    
    // 执行Rust异步操作
    let processed = tokio::task::spawn_blocking(move || {
        // 这里是CPU密集型计算
        result.to_string().repeat(3)
    }).await?;
    
    Ok(processed)
}

2. 错误处理

use pyo3::prelude::*;
use pyo3_asyncio_macros::async_fn;

#[async_fn]
async fn fallible_operation(arg: i32) -> PyResult<i32> {
    if arg < 0 {
        return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(
            "Argument must be positive"
        ));
    }
    
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
    Ok(arg * 2)
}

Python端使用示例

import asyncio
import my_module  # 假设编译后的模块名为my_module

async def python_coroutine(x):
    await asyncio.sleep(1)
    return x + 10

async def main():
    # 调用Rust异步函数
    result = await my_module.rust_async_function(42)
    print(f"Rust async result: {result}")
    
    # 从Rust调用Python协程
    result = await my_module.call_python_coroutine(python_coroutine(5))
    print(f"Python coroutine result from Rust: {result}")
    
    # 复杂处理
    processed = await my_module.process_data(python_coroutine(3))
    print(f"Processed data: {processed}")

asyncio.run(main())

完整示例demo

下面是一个完整的Rust模块示例,展示了如何实现双向的异步交互:

// src/lib.rs
use pyo3::prelude::*;
use pyo3_asyncio_macros::{async_fn, pyo3_asyncio};
use tokio::time;

#[pyfunction]
fn call_python_coroutine(py: Python<'_>, coro: PyObject) -> PyResult<PyObject> {
    // 将Python协程转换为Rust Future
    pyo3_asyncio::tokio::into_future(coro)
}

#[async_fn]
async fn rust_async_function(arg: i32) -> PyResult<i32> {
    // 模拟异步操作
    time::sleep(time::Duration::from_secs(1)).await;
    Ok(arg * 2)
}

#[async_fn]
async fn process_data(py: Python<'_>, coro: PyObject) -> PyResult<String> {
    // 调用Python协程并获取结果
    let result: i32 = pyo3_asyncio::tokio::into_future(coro).await?.extract(py)?;
    
    // 执行CPU密集型计算
    let processed = tokio::task::spawn_blocking(move || {
        format!("Processed-{}", result * 3)
    }).await?;
    
    Ok(processed)
}

#[pymodule]
fn async_utils(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
    m.add_function(wrap_pyfunction!(call_python_coroutine, m)?)?;
    m.add_function(wrap_pyfunction!(rust_async_function, m)?)?;
    m.add_function(wrap_pyfunction!(process_data, m)?)?;
    Ok(())
}

对应的Python测试代码:

# test_async.py
import asyncio
import async_utils  # 编译后的模块名

async def python_task(x):
    print(f"Python coroutine started with {x}")
    await asyncio.sleep(0.5)
    return x + 100

async def main():
    # 测试Rust异步函数
    rust_result = await async_utils.rust_async_function(21)
    print(f"Rust async result: {rust_result}")
    
    # 测试从Rust调用Python协程
    py_result = await async_utils.call_python_coroutine(python_task(7))
    print(f"Python coroutine result from Rust: {py_result}")
    
    # 测试复杂处理流程
    processed = await async_utils.process_data(python_task(3))
    print(f"Processed data: {processed}")

if __name__ == "__main__":
    asyncio.run(main())

注意事项

  1. 确保在Python端正确初始化了Rust模块
  2. 对于长时间运行的Rust异步任务,考虑使用tokio::task::spawn_blocking来避免阻塞事件循环
  3. 错误需要在Rust和Python之间正确转换
  4. 注意Python GIL的生命周期管理,特别是在跨await点时

pyo3-asyncio-macros通过提供这些宏简化了异步交互的样板代码,使得Python和Rust之间的异步协作更加直观和高效。

回到顶部