Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can't run async function from Rust in Python #123

Open
cj-zhukov opened this issue Apr 7, 2024 · 0 comments
Open

Can't run async function from Rust in Python #123

cj-zhukov opened this issue Apr 7, 2024 · 0 comments

Comments

@cj-zhukov
Copy link

🐛 Bug Reports

Can't run async function from Rust in Python with error:

RuntimeError: Cannot run the event loop while another loop is running
sys:1: RuntimeWarning: coroutine 'BaseEventLoop.shutdown_asyncgens' was never awaited

What am I doing wrong?

🌍 Environment

  • Your operating system and version: macOS 14.3.1 x86_64
  • Your python version: Python 3.11.7
  • How did you install python (e.g. apt or pyenv)? Did you use a virtualenv?: virtualenv
  • Your Rust version (rustc --version): rustc 1.75.0
  • Your PyO3 version: pyo3 = "0.20.0"
  • Have you tried using latest PyO3 master (replace version = "0.x.y" with git = "https://github.com/awestlake87/pyo3-asyncio")?: no

💥 Reproducing

Cargo.toml

[package]
name = "bar"
version = "0.1.0"
edition = "2021"

[lib]
name = "bar"
crate-type = ["cdylib"]

[dependencies]
pyo3 = { version = "0.20", features = ["extension-module"] }
pyo3-asyncio = { version = "0.20", features = [
    "tokio-runtime",
    "attributes",
    "pyo3-asyncio-macros",
] }
tokio = { version = "1", features = ["full"] }
datafusion = { version = "36", features = ["pyarrow"] }

lib.rs

use std::sync::Arc;

use pyo3::prelude::*;
use datafusion::arrow::array::{Int32Array, RecordBatch, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::pyarrow::PyArrowType;
use datafusion::prelude::*;

#[pyfunction]
fn run(py: Python<'_>) -> PyResult<PyArrowType<Vec<RecordBatch>>> {
    pyo3::prepare_freethreaded_python();
    pyo3_asyncio::tokio::run(py, async move {
        let batches = dev().await;
        Ok(batches.into())
    })
}

#[pymodule]
fn bar(_py: Python, m: &PyModule) -> PyResult<()> {
    m.add_function(wrap_pyfunction!(run, m)?)?;
    Ok(())
}

pub async fn dev() -> Vec<RecordBatch> {
    let ctx = SessionContext::new();
    
    let schema = Schema::new(vec![
        Field::new("id", DataType::Int32, false),
        Field::new("name", DataType::Utf8, true),
    ]);

    let batch = RecordBatch::try_new(
        schema.into(),
        vec![
            Arc::new(Int32Array::from(vec![1, 2, 3])),
            Arc::new(StringArray::from(vec!["foo", "bar", "baz"])),
        ],
    ).unwrap();

    let df = ctx.read_batch(batch).unwrap();
    let res = df.collect().await.unwrap();

    res
}

main.py

import asyncio
import bar


async def main():
    batch = await bar.run()
    df = batch.to_pandas()
    print(df)


asyncio.run(main())

Here is the example without async that works
Cargo.toml

[package]
name = "foo"
version = "0.1.0"
edition = "2021"

[lib]
name = "foo"
crate-type = ["cdylib"]

[dependencies]
pyo3 = "0.20.0"
arrow = { version = "50", features = ["prettyprint", "pyarrow"] }

lib.rs

use std::sync::Arc;

use arrow::array::{RecordBatch, StringArray, Int32Array};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::pyarrow;
use pyo3::prelude::*;

#[pyfunction]
fn run() -> PyResult<pyarrow::PyArrowType<RecordBatch>> {
    let batch = dev();
    Ok(batch.into())
}

#[pymodule]
fn foo(_py: Python, m: &PyModule) -> PyResult<()> {
    m.add_function(wrap_pyfunction!(run, m)?)?;
    Ok(())
}

pub fn dev() -> RecordBatch {
    let id_array = Int32Array::from(vec![1, 2, 3]);
    let name_array = StringArray::from(vec!["foo", "bar", "baz"]);

    let schema = Schema::new(vec![
        Field::new("id", DataType::Int32, false),
        Field::new("name", DataType::Utf8, false)
    ]);

    let batch = RecordBatch::try_new(
        Arc::new(schema), 
        vec![
            Arc::new(id_array),
            Arc::new(name_array)
            ])
            .unwrap();

    batch
}

main.py

import foo

def main():
    batch = foo.run()
    df = batch.to_pandas()
    print(df)

main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant