-
Notifications
You must be signed in to change notification settings - Fork 66
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
req_perform_stream(round = c("byte", "line"))
#437
Changes from all commits
0baf3a7
84b438c
6bd083b
b91a538
01ef008
75bb9fe
a0c1ed1
81ba51c
94c5eb0
a6883ad
1a46387
c3721fd
d7250aa
10a72ae
164ef90
ac842f2
c2ca392
1ded8f9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,8 +9,12 @@ | |
#' @param callback A single argument callback function. It will be called | ||
#' repeatedly with a raw vector whenever there is at least `buffer_kb` | ||
#' worth of data to process. It must return `TRUE` to continue streaming. | ||
#' @param timeout_sec Number of seconds to processs stream for. | ||
#' @param timeout_sec Number of seconds to process stream for. | ||
#' @param buffer_kb Buffer size, in kilobytes. | ||
#' @param round How should the raw vector sent to `callback` be rounded? | ||
#' Choose `"byte"`, `"line"`, or supply your own function that takes a | ||
#' raw vector of `bytes` and returns the locations of possible cut points | ||
#' (or `integer()` if there are none). | ||
#' @returns An HTTP [response]. | ||
#' @export | ||
#' @examples | ||
|
@@ -21,11 +25,16 @@ | |
#' resp <- request(example_url()) |> | ||
#' req_url_path("/stream-bytes/100000") |> | ||
#' req_perform_stream(show_bytes, buffer_kb = 32) | ||
req_perform_stream <- function(req, callback, timeout_sec = Inf, buffer_kb = 64) { | ||
req_perform_stream <- function(req, | ||
callback, | ||
timeout_sec = Inf, | ||
buffer_kb = 64, | ||
round = c("byte", "line")) { | ||
check_request(req) | ||
|
||
handle <- req_handle(req) | ||
callback <- as_function(callback) | ||
cut_points <- as_round_function(round) | ||
|
||
stopifnot(is.numeric(timeout_sec), timeout_sec > 0) | ||
stop_time <- Sys.time() + timeout_sec | ||
|
@@ -35,13 +44,27 @@ req_perform_stream <- function(req, callback, timeout_sec = Inf, buffer_kb = 64) | |
withr::defer(close(stream)) | ||
|
||
continue <- TRUE | ||
incomplete <- TRUE | ||
buf <- raw() | ||
while(continue && isIncomplete(stream) && Sys.time() < stop_time) { | ||
buf <- readBin(stream, raw(), buffer_kb * 1024) | ||
buf <- c(buf, readBin(stream, raw(), buffer_kb * 1024)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This update looks great! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like how it rolls back to the original |
||
|
||
if (length(buf) > 0) { | ||
continue <- isTRUE(callback(buf)) | ||
cut <- cut_points(buf) | ||
n <- length(cut) | ||
if (n) { | ||
continue <- isTRUE(callback(head(buf, n = cut[n]))) | ||
buf <- tail(buf, n = -cut[n]) | ||
} | ||
} | ||
} | ||
|
||
# if there are leftover bytes and none of the callback() | ||
# returned FALSE. | ||
if (continue && length(buf)) { | ||
callback(buf) | ||
} | ||
|
||
data <- curl::handle_data(handle) | ||
new_response( | ||
method = req_method_get(req), | ||
|
@@ -52,6 +75,25 @@ req_perform_stream <- function(req, callback, timeout_sec = Inf, buffer_kb = 64) | |
) | ||
} | ||
|
||
as_round_function <- function(round = c("byte", "line"), | ||
error_call = caller_env()) { | ||
if (is.function(round)) { | ||
check_function2(round, args = "bytes") | ||
round | ||
} else if (is.character(round)) { | ||
round <- arg_match(round, error_call = error_call) | ||
switch(round, | ||
byte = function(bytes) length(bytes), | ||
line = function(bytes) which(bytes == charToRaw("\n")) | ||
) | ||
} else { | ||
cli::cli_abort( | ||
'{.arg round} must be "byte", "line" or a function.', | ||
call = error_call | ||
) | ||
} | ||
} | ||
|
||
#' @export | ||
#' @rdname req_perform_stream | ||
#' @usage NULL | ||
|
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think returning a vector of cut points makes the implementation a bit simpler; see the new implementation of
round_lines()
below. What do you think?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for improving the doc. With this reframe, returning the vector of cut points does make more sense, although, the current implementation only cares about the last one.