Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add GDALVector::getArrowStream(): expose an Arrow C stream on a layer via nanoarrow_array_stream object #591

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from

Conversation

ctoney
Copy link
Collaborator

@ctoney ctoney commented Feb 6, 2025

Exposes an ArrowArrayStream through a nanoarrow_array_stream external pointer in the nanoarrow package. This PR improves on the prototype implementation described in #545:

  • adds a writable field $arrowStreamOptions on GDALVector objects that can be used to set options
  • adds documentation, examples and tests
  • fixes the release callback to avoid a memory leak
  • conditions the code, tests and examples on GDAL >= 3.6

As noted in the code comments, this implementation was adapted from GDALStreamWrapper by Dewey Dunnington in:
https://github.com/r-spatial/sf/blob/main/src/gdal_read_stream.cpp

That file does not contain a license/copyright notice or other attribution. Although we don't copy GDALStreamWrapper, the approach used there was adapted to work with class GDALVector so this PR relied heavily on that code. This is draft pending confirmation of proper attribution.

Copy link

@paleolimbot paleolimbot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very cool!

One of the things I would suggest testing as part of this is the case where your layer or dataset goes out of scope and is garbage collected to make sure the stream doesn't segfault (this is what the "wrapper stream" is trying to accomplish, although I think that how you've done it here, the wrapper stream isn't actually doing anything). There is also nanoarrow::array_stream_set_finalizer() to do this at the R level, although because you're pushing all of the implementation into C++ you probably need a C++y way to keep the underlying layer/dataset alive until the stream is released.

It's worth checking out Python's pyogrio, because they use this more actively and have a more mature implementation! https://github.com/geopandas/pyogrio/blob/main/pyogrio/_io.pyx#L1414

Comment on lines 1594 to 1599
bool GDALVector::getArrowStream(Rcpp::RObject stream_xptr) {
/*
Exposes an Arrow C stream to be consumed by {nanoarrow}
Implementation adapted from GDALStreamWrapper by Dewey Dunnington in:
https://github.com/r-spatial/sf/blob/main/src/gdal_read_stream.cpp
*/

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you may want to take zero arguments here and allocate the pointer yourself (allocating a stream pointer is a rather advanced thing to ask a user to do). The implementation is at https://github.com/apache/arrow-nanoarrow/blob/main/r/inst/include/nanoarrow/r.h#L307-L319 (which you can get via LinkingTo: nanoarrow or by vendoring that file or a portion of it 🙂 ).

R/gdalvector.R Outdated
#' * INCLUDE_FID=YES/NO. Defaults to YES.
#' * TIMEZONE=unknown/UTC/(+|:)HH:MM or any other value supported by
#' Arrow (GDAL >= 3.8).
#' * GEOMETRY_METADATA_ENCODING=OGC/GEOARROW (GDAL >= 3.8). The default is OGC.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you will want to turn this on by default (when on GDAL >= 3.8), because this ensure that the CRS from GDAL is propagated to Arrow (and beyond) without any user intervention (i.e., dig a pit of success with respect to CRS propagation!). This is what pyogrio does 🙂

R/gdalvector.R Outdated
@@ -822,6 +859,34 @@
#' str(feat_set)
#'
#' lyr$close()
#'
#' # Arrow array stream to be consumed by {nanoarrow}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be nice to show a library(geoarrow) example, too (particularly with METADATA_ENCODING=GEOARROW). This should give you something like:

library(geoarrow)

lyr <- new(GDALVector, dsn)
lyr$getArrowStream() |> sf::st_as_sf()

...for free (with CRS propagation!)

@ctoney
Copy link
Collaborator Author

ctoney commented Feb 12, 2025

Thanks for your review and suggestions. That is very helpful. The xptr is now allocated internally and returned. arrowStreamOptions is now set to "GEOMETRY_METADATA_ENCODING=GEOARROW" if GDAL >= 3.8.

One of the things I would suggest testing as part of this is the case where your layer or dataset goes out of scope and is garbage collected to make sure the stream doesn't segfault

Yes, that needed more work. It looks okay to me now:

library(gdalraster)
#> GDAL 3.8.4, released 2024/02/08, GEOS 3.12.1, PROJ 9.3.1

f <- system.file("extdata/ynp_fires_1984_2022.gpkg", package = "gdalraster")
# work with a copy to be safe, probably unnecessary for read-only
dsn <- file.path(tempdir(), basename(f))
file.copy(f, dsn)
#> [1] TRUE
lyr <- new(GDALVector, dsn, "mtbs_perims")


## dataset/layer closed without explicit release - OK
stream <- lyr$getArrowStream()
stream
#> <nanoarrow_array_stream struct<fid: int64, event_id: string, incid_name: string, incid_type: string, map_id: int64, burn_bnd_ac: int64, burn_bnd_lat: string, burn_bnd_lon: string, ig_date: date32, ig_year: int32, geom: geoarrow.wkb{binary}>>
#>  $ get_schema:function ()  
#>  $ get_next  :function (schema = x$get_schema(), validate = TRUE)  
#>  $ release   :function ()
lyr$close()
stream
#> <nanoarrow_array_stream[invalid pointer]>
stream$get_next()
#> Error in stream$get_next(): nanoarrow_array_stream() is an external pointer to NULL


## stream garbage collected without explicit release - OK
lyr$open(read_only = TRUE)
stream <- lyr$getArrowStream()
stream
#> <nanoarrow_array_stream struct<fid: int64, event_id: string, incid_name: string, incid_type: string, map_id: int64, burn_bnd_ac: int64, burn_bnd_lat: string, burn_bnd_lon: string, ig_date: date32, ig_year: int32, geom: geoarrow.wkb{binary}>>
#>  $ get_schema:function ()  
#>  $ get_next  :function (schema = x$get_schema(), validate = TRUE)  
#>  $ release   :function ()
rm(stream)
gc()
#>           used (Mb) gc trigger (Mb) max used (Mb)
#> Ncells  825019 44.1    1460401   78  1460401 78.0
#> Vcells 1468993 11.3    8388608   64  2588634 19.8
#
# would be unable to activate a new stream unless the previous one had been
# released implicitly
stream <- lyr$getArrowStream()
stream
#> <nanoarrow_array_stream struct<fid: int64, event_id: string, incid_name: string, incid_type: string, map_id: int64, burn_bnd_ac: int64, burn_bnd_lat: string, burn_bnd_lon: string, ig_date: date32, ig_year: int32, geom: geoarrow.wkb{binary}>>
#>  $ get_schema:function ()  
#>  $ get_next  :function (schema = x$get_schema(), validate = TRUE)  
#>  $ release   :function ()
stream$release()
lyr$close()


## dataset/layer garbage collected without close or explicit release - OK
lyr$open(read_only = TRUE)
stream <- lyr$getArrowStream()
stream
#> <nanoarrow_array_stream struct<fid: int64, event_id: string, incid_name: string, incid_type: string, map_id: int64, burn_bnd_ac: int64, burn_bnd_lat: string, burn_bnd_lon: string, ig_date: date32, ig_year: int32, geom: geoarrow.wkb{binary}>>
#>  $ get_schema:function ()  
#>  $ get_next  :function (schema = x$get_schema(), validate = TRUE)  
#>  $ release   :function ()
rm(lyr)
gc()
#>           used (Mb) gc trigger (Mb) max used (Mb)
#> Ncells  825115 44.1    1460401   78  1460401 78.0
#> Vcells 1469116 11.3    8388608   64  2588634 19.8
stream
#> <nanoarrow_array_stream[invalid pointer]>
stream$get_next()
#> Error in stream$get_next(): nanoarrow_array_stream() is an external pointer to NULL

Created on 2025-02-11 with reprex v2.1.1

It might be nice to show a library(geoarrow) example, too (particularly with METADATA_ENCODING=GEOARROW).

I haven't gotten this to work yet but it led to a couple of other issues.

library(geoarrow)

lyr <- new(GDALVector, dsn)
lyr$getArrowStream() |> sf::st_as_sf()
#>  Error in UseMethod("st_as_sf") : 
#>   no applicable method for 'st_as_sf' applied to an object of class "nanoarrow_array_stream"

If lyr$getArrowStream() is piped then it's output is not assigned, and if the statement fails as above, now the stream is active but there is no way to call stream$release(). Since a stream is now active on the layer it is not possible to activate another one without closing and re-opening the dataset:

stream <- lyr$getArrowStream()
#> Error: OGR_L_GetArrowStream() failed: An arrow Arrow Stream is in progress on that layer. Only one at a time is allowed in this implementation.
#> In addition: Warning message:
#> In lyr$getArrowStream() :
#>   GDAL Error 1: An arrow Arrow Stream is in progress on that layer. Only one at a time is allowed in this implementation.

lyr$close()
lyr$open(read_only = TRUE)
stream <- lyr$getArrowStream()

I exposed GDALVector::releaseArrowStream() so that release can be done from the layer side too. This works as expected with a bound object:

lyr$releaseArrowStream()
stream
#> <nanoarrow_array_stream[invalid pointer]>

but currently does not work reliably for the intended use case when it's unassigned:

lyr$getArrowStream()  # unassigned
#> <nanoarrow_array_stream struct<fid: int64, event_id: string, incid_name: string, incid_type: string, map_id: int64, burn_bnd_ac: int64, burn_bnd_lat: string, burn_bnd_lon: string, ig_date: date32, ig_year: int32, geom: geoarrow.wkb{binary}>>
#>  $ get_schema:function ()  
#>  $ get_next  :function (schema = x$get_schema(), validate = TRUE)  
#>  $ release   :function ()

lyr$releaseArrowStream()
lyr$getArrowStream()
#> Error: OGR_L_GetArrowStream() failed: An arrow Arrow Stream is in progress on that layer. Only one at a time is allowed in this implementation.
#> In addition: Warning message:
#> In lyr$getArrowStream() :
#>   GDAL Error 1: An arrow Arrow Stream is in progress on that layer. Only one at a time is allowed in this implementation.

Sometimes it works and seems to depend on how many unassigned calls to lyr$getArrowStream() have been made, or maybe whether it was previously assigned to stream. I'll work on that some more. Either way I can't get it to segfault and it passes clang-asan checks, but I'm also troubleshooting a small memory leak related to the xptr malloc.

  [ FAIL 0 | WARN 0 | SKIP 8 | PASS 1232 ]
  > 
  > proc.time()
     user  system elapsed 
  486.797   2.342 488.081 
  ==3483== 
  ==3483== HEAP SUMMARY:
  ==3483==     in use at exit: 188,610,380 bytes in 52,546 blocks
  ==3483==   total heap usage: 5,558,374 allocs, 5,505,828 frees, 2,582,189,992 bytes allocated
  ==3483== 
  ==3483== 80 bytes in 2 blocks are definitely lost in loss record 6,461 of 12,248
  ==3483==    at 0x484280F: malloc (vg_replace_malloc.c:442)
  ==3483==    by 0x3038BDC9: nanoarrow_array_stream_owning_xptr (r.h:309)
  ==3483==    by 0x3038BDC9: GDALVector::getArrowStream() (gdalvector.cpp:1637)
  ==3483==    by 0x303A5628: Rcpp::class_<GDALVector>::invoke_notvoid(SEXPREC*, SEXPREC*, SEXPREC**, int) (class.h:246)
  ==3483==    by 0x2F88BA0F: CppMethod__invoke_notvoid(SEXPREC*) (module.cpp:220)
  ==3483==    by 0x4956BC8: do_External (dotcode.c:576)
  ==3483==    by 0x49A5C17: Rf_eval (eval.c:1260)
  ==3483==    by 0x49A92EA: do_begin (eval.c:3000)
  ==3483==    by 0x49A59E5: Rf_eval (eval.c:1232)
  ==3483==    by 0x49A776E: R_execClosure (eval.c:2393)
  ==3483==    by 0x49A8506: applyClosure_core (eval.c:2306)
  ==3483==    by 0x49A5745: Rf_applyClosure (eval.c:2328)
  ==3483==    by 0x49A5745: Rf_eval (eval.c:1280)
  ==3483==    by 0x49AA8DF: do_set (eval.c:3571)
  ==3483== 
  ==3483== LEAK SUMMARY:
  ==3483==    definitely lost: 80 bytes in 2 blocks
  ==3483==    indirectly lost: 0 bytes in 0 blocks
  ==3483==      possibly lost: 0 bytes in 0 blocks
  ==3483==    still reachable: 188,610,300 bytes in 52,544 blocks
  ==3483==                       of which reachable via heuristic:
  ==3483==                         newarray           : 4,264 bytes in 1 blocks
  ==3483==         suppressed: 0 bytes in 0 blocks
  ==3483== Reachable blocks (those to which a pointer was found) are not shown.
  ==3483== To see them, rerun with: --leak-check=full --show-leak-kinds=all
  ==3483== 
  ==3483== For lists of detected and suppressed errors, rerun with: -s
`

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants