Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

req_perform_stream(round = c("byte", "line")) #437

Merged
merged 18 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@

* `req_template()` now works when you have a bare `:` in a template that
uses "uri" style (#389).

* `req_perform_stream()` gains a `round = c("byte", "line")` argument to control
how the stream is rounded (#437).

# httr2 1.0.0

Expand Down
50 changes: 46 additions & 4 deletions R/req-perform-stream.R
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@
#' @param callback A single argument callback function. It will be called
#' repeatedly with a raw vector whenever there is at least `buffer_kb`
#' worth of data to process. It must return `TRUE` to continue streaming.
#' @param timeout_sec Number of seconds to processs stream for.
#' @param timeout_sec Number of seconds to process stream for.
#' @param buffer_kb Buffer size, in kilobytes.
#' @param round How should the raw vector sent to `callback` be rounded?
#' Choose `"byte"`, `"line"`, or supply your own function that takes a
#' raw vector of `bytes` and returns the locations of possible cut points
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think returning a vector of cut points makes the implementation a bit simpler; see the new implementation of round_lines() below. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for improving the doc. With this reframe, returning the vector of cut points does make more sense, although, the current implementation only cares about the last one.

#' (or `integer()` if there are none).
#' @returns An HTTP [response].
#' @export
#' @examples
Expand All @@ -21,11 +25,16 @@
#' resp <- request(example_url()) |>
#' req_url_path("/stream-bytes/100000") |>
#' req_perform_stream(show_bytes, buffer_kb = 32)
req_perform_stream <- function(req, callback, timeout_sec = Inf, buffer_kb = 64) {
req_perform_stream <- function(req,
callback,
timeout_sec = Inf,
buffer_kb = 64,
round = c("byte", "line")) {
check_request(req)

handle <- req_handle(req)
callback <- as_function(callback)
cut_points <- as_round_function(round)

stopifnot(is.numeric(timeout_sec), timeout_sec > 0)
stop_time <- Sys.time() + timeout_sec
Expand All @@ -35,13 +44,27 @@ req_perform_stream <- function(req, callback, timeout_sec = Inf, buffer_kb = 64)
withr::defer(close(stream))

continue <- TRUE
incomplete <- TRUE
buf <- raw()
while(continue && isIncomplete(stream) && Sys.time() < stop_time) {
buf <- readBin(stream, raw(), buffer_kb * 1024)
buf <- c(buf, readBin(stream, raw(), buffer_kb * 1024))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This update looks great!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like how it rolls back to the original while(continue && isIncomplete(stream) && Sys.time() < stop_time) loop


if (length(buf) > 0) {
continue <- isTRUE(callback(buf))
cut <- cut_points(buf)
n <- length(cut)
if (n) {
continue <- isTRUE(callback(head(buf, n = cut[n])))
buf <- tail(buf, n = -cut[n])
}
}
}

# if there are leftover bytes and none of the callback()
# returned FALSE.
if (continue && length(buf)) {
callback(buf)
}

data <- curl::handle_data(handle)
new_response(
method = req_method_get(req),
Expand All @@ -52,6 +75,25 @@ req_perform_stream <- function(req, callback, timeout_sec = Inf, buffer_kb = 64)
)
}

as_round_function <- function(round = c("byte", "line"),
error_call = caller_env()) {
if (is.function(round)) {
check_function2(round, args = "bytes")
round
} else if (is.character(round)) {
round <- arg_match(round, error_call = error_call)
switch(round,
byte = function(bytes) length(bytes),
line = function(bytes) which(bytes == charToRaw("\n"))
)
} else {
cli::cli_abort(
'{.arg round} must be "byte", "line" or a function.',
call = error_call
)
}
}

#' @export
#' @rdname req_perform_stream
#' @usage NULL
Expand Down
15 changes: 13 additions & 2 deletions man/req_perform_stream.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion man/resps_successes.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions tests/testthat/_snaps/req-perform-stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,22 @@
`req_stream()` was deprecated in httr2 1.0.0.
i Please use `req_perform_stream()` instead.

# as_round_function checks its inputs

Code
as_round_function(1)
Condition
Error:
! `round` must be "byte", "line" or a function.
Code
as_round_function("bytes")
Condition
Error:
! `round` must be one of "byte" or "line", not "bytes".
i Did you mean "byte"?
Code
as_round_function(function(x) 1)
Condition
Error in `as_round_function()`:
! `round` must have the argument `bytes`; it currently has `x`.

56 changes: 56 additions & 0 deletions tests/testthat/test-req-perform-stream.R
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,59 @@ test_that("req_stream() is deprecated", {
resp <- req_stream(req, identity, buffer_kb = 32)
)
})

test_that("can buffer to lines", {
lines <- character()
accumulate_lines <- function(x) {
lines <<- c(lines, strsplit(rawToChar(x), "\n")[[1]])
TRUE
}

# Each line is 225 bytes, should should be split into ~2 pieces
resp <- request_test("/stream/10") %>%
req_perform_stream(accumulate_lines, buffer_kb = 0.1, round = "line")
expect_equal(length(lines), 10)

valid_json <- map_lgl(lines, jsonlite::validate)
expect_equal(valid_json, rep(TRUE, 10))
})

test_that("can supply custom rounding", {
out <- list()
accumulate <- function(x) {
out <<- c(out, list(x))
TRUE
}

resp <- request_test("/stream-bytes/1024") %>%
req_perform_stream(
accumulate,
buffer_kb = 0.1,
round = function(bytes) if (length(bytes) > 100) 100 else integer()
)
expect_equal(lengths(out), c(rep(100, 10), 24))
})

test_that("eventually terminates even if never rounded", {
out <- raw()
accumulate <- function(x) {
out <<- c(out, x)
TRUE
}

resp <- request_test("/stream-bytes/1024") %>%
req_perform_stream(
accumulate,
buffer_kb = 0.1,
round = function(bytes) integer()
)
expect_equal(length(out), 1024)
})

test_that("as_round_function checks its inputs", {
expect_snapshot(error = TRUE, {
as_round_function(1)
as_round_function("bytes")
as_round_function(function(x) 1)
})
})
Loading