Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous RecordBatchReader equivalent #7228

Open
westonpace opened this issue Mar 3, 2025 · 1 comment
Open

Asynchronous RecordBatchReader equivalent #7228

westonpace opened this issue Mar 3, 2025 · 1 comment
Labels
enhancement Any new improvement worthy of a entry in the changelog

Comments

@westonpace
Copy link
Member

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

Currently we use Arrow as part of our public API in Lance. RecordBatchReader is extremely useful. However, there are times we would like an asynchronous version. There is datafusion's RecordBatchStream and we have our own equivalent in lancedb (also called RecordBatchStream for better or worse). The reason we have our own is that we don't want to make datafusion a part of the public API just to keep the API simpler. Transferring between the various endpoints we have a lot of conversion from arrow's error to datafusion's error to lancedb's error.

I'm mainly opening this issue in the interest of discussion, to see if this is something we'd be willing to add. If so, I can put together a proposal PR.

Describe the solution you'd like

// Pretty much identical to datafusion's `RecordBatchStream` except using arrow's `Result`
pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
    fn schema(&self) -> Arc<Schema>;
}

Describe alternatives you've considered

As far as I can tell the biggest drawback would be the introduction of futures as a dependency. This could be feature-gated.

Alternatively, we could vendor the Stream trait:

pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
    fn schema(&self) -> Arc<Schema>;
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<RecordBatch>>>
}

I'm not sure how I feel about that but I don't think futures is going to be absorbed into std anytime soon. We could even still have a futures trait that provides an impl futures::Stream for RecordBatchStream.

Additional context

@westonpace westonpace added the enhancement Any new improvement worthy of a entry in the changelog label Mar 3, 2025
@wjones127
Copy link
Member

I think the need for this is pretty well demonstrated elsewhere in the ecosystem, given the use in both Lance and DataFusion, and I'm sure other places.

There is also an async C stream interface that was recently created: https://arrow.apache.org/docs/format/CDeviceDataInterface.html#async-device-stream-interface

I'm sure being able to accept these over FFI would be welcome.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Any new improvement worthy of a entry in the changelog
Projects
None yet
Development

No branches or pull requests

2 participants