-
Notifications
You must be signed in to change notification settings - Fork 66
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
req_perform_stream(round = c("byte", "line"))
#437
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this! It seems like a solid approach and I like your proposal for req_perform_stream()
syntax. I think the only thing that's missing is some way to test this, which I assume will need something in webfakes that involves sending some characters, then sleeping, then sending a newline?
Thanks for the review. I'll follow up. I think
No strong preference, either option eliminate the |
Yeah, good point. I think having it read all of the lines is fine, since most operations are going to be vectorised anyway. Maybe you could have something like |
req_perform_stream_lines()
req_perform_stream(round = "lines")
Made another pass, Instead of adding the With this version, I don't really know how to round at characters, and this would perhaps mean some sort of utf8 parsing which does not seem very useful. I suppose instead of library(httr2)
show_lines <- function(x) {
writeLines(cli::rule())
cat(rawToChar(x))
TRUE
}
resp <- request(example_url()) |>
req_url_path("/stream/10") |>
req_perform_stream(show_lines, buffer_kb = 0.5, round = "lines")
#> ────────────────────────────────────────────────────────────────────────────────
#> {"url":"http://127.0.0.1:52200/stream/10","args":{},"headers":{"Host":"127.0.0.1:52200","User-Agent":"httr2/1.0.0.9000 r-curl/5.2.0 libcurl/7.84.0","Accept":"*/*","Accept-Encoding":"deflate, gzip"},"origin":"127.0.0.1","id":0}
#> {"url":"http://127.0.0.1:52200/stream/10","args":{},"headers":{"Host":"127.0.0.1:52200","User-Agent":"httr2/1.0.0.9000 r-curl/5.2.0 libcurl/7.84.0","Accept":"*/*","Accept-Encoding":"deflate, gzip"},"origin":"127.0.0.1","id":1}
#> ────────────────────────────────────────────────────────────────────────────────
#> {"url":"http://127.0.0.1:52200/stream/10","args":{},"headers":{"Host":"127.0.0.1:52200","User-Agent":"httr2/1.0.0.9000 r-curl/5.2.0 libcurl/7.84.0","Accept":"*/*","Accept-Encoding":"deflate, gzip"},"origin":"127.0.0.1","id":2}
#> {"url":"http://127.0.0.1:52200/stream/10","args":{},"headers":{"Host":"127.0.0.1:52200","User-Agent":"httr2/1.0.0.9000 r-curl/5.2.0 libcurl/7.84.0","Accept":"*/*","Accept-Encoding":"deflate, gzip"},"origin":"127.0.0.1","id":3}
#> ────────────────────────────────────────────────────────────────────────────────
#> {"url":"http://127.0.0.1:52200/stream/10","args":{},"headers":{"Host":"127.0.0.1:52200","User-Agent":"httr2/1.0.0.9000 r-curl/5.2.0 libcurl/7.84.0","Accept":"*/*","Accept-Encoding":"deflate, gzip"},"origin":"127.0.0.1","id":4}
#> {"url":"http://127.0.0.1:52200/stream/10","args":{},"headers":{"Host":"127.0.0.1:52200","User-Agent":"httr2/1.0.0.9000 r-curl/5.2.0 libcurl/7.84.0","Accept":"*/*","Accept-Encoding":"deflate, gzip"},"origin":"127.0.0.1","id":5}
#> ────────────────────────────────────────────────────────────────────────────────
#> {"url":"http://127.0.0.1:52200/stream/10","args":{},"headers":{"Host":"127.0.0.1:52200","User-Agent":"httr2/1.0.0.9000 r-curl/5.2.0 libcurl/7.84.0","Accept":"*/*","Accept-Encoding":"deflate, gzip"},"origin":"127.0.0.1","id":6}
#> {"url":"http://127.0.0.1:52200/stream/10","args":{},"headers":{"Host":"127.0.0.1:52200","User-Agent":"httr2/1.0.0.9000 r-curl/5.2.0 libcurl/7.84.0","Accept":"*/*","Accept-Encoding":"deflate, gzip"},"origin":"127.0.0.1","id":7}
#> {"url":"http://127.0.0.1:52200/stream/10","args":{},"headers":{"Host":"127.0.0.1:52200","User-Agent":"httr2/1.0.0.9000 r-curl/5.2.0 libcurl/7.84.0","Accept":"*/*","Accept-Encoding":"deflate, gzip"},"origin":"127.0.0.1","id":8}
#> ────────────────────────────────────────────────────────────────────────────────
#> {"url":"http://127.0.0.1:52200/stream/10","args":{},"headers":{"Host":"127.0.0.1:52200","User-Agent":"httr2/1.0.0.9000 r-curl/5.2.0 libcurl/7.84.0","Accept":"*/*","Accept-Encoding":"deflate, gzip"},"origin":"127.0.0.1","id":9} Created on 2024-02-11 with reprex v2.1.0 |
This works for the motivating example with the updated (and simpler) PR in |
If we did characters, I think it would have to mean utf8 characters, so it would be I like keeping it as an enum because we might add extra rounding features in the future, and you could maybe even imagine supplying a callback function if we really want to give the user a lot of extra control. |
I updated it to use It also makes it easier to document IMO. |
Some followup 💅. I could do library(httr2)
show_lines <- function(x) {
cat(rawToChar(x))
TRUE
}
req <- request(example_url()) |> req_url_path("/stream/3")
resp <- req_perform_stream(req, show_lines, buffer_kb = 0.5, round = "line")
#> {"url":"http://127.0.0.1:64801/stream/3","args":{},"headers":{"Host":"127.0.0.1:64801","User-Agent":"httr2/1.0.0.9000 r-curl/5.2.0 libcurl/7.84.0","Accept":"*/*","Accept-Encoding":"deflate, gzip"},"origin":"127.0.0.1","id":0}
#> {"url":"http://127.0.0.1:64801/stream/3","args":{},"headers":{"Host":"127.0.0.1:64801","User-Agent":"httr2/1.0.0.9000 r-curl/5.2.0 libcurl/7.84.0","Accept":"*/*","Accept-Encoding":"deflate, gzip"},"origin":"127.0.0.1","id":1}
#> {"url":"http://127.0.0.1:64801/stream/3","args":{},"headers":{"Host":"127.0.0.1:64801","User-Agent":"httr2/1.0.0.9000 r-curl/5.2.0 libcurl/7.84.0","Accept":"*/*","Accept-Encoding":"deflate, gzip"},"origin":"127.0.0.1","id":2}
resp <- req_perform_stream(req, show_lines, buffer_kb = 0.5, round = "byte")
#> {"url":"http://127.0.0.1:64801/stream/3","args":{},"headers":{"Host":"127.0.0.1:64801","User-Agent":"httr2/1.0.0.9000 r-curl/5.2.0 libcurl/7.84.0","Accept":"*/*","Accept-Encoding":"deflate, gzip"},"origin":"127.0.0.1","id":0}
#> {"url":"http://127.0.0.1:64801/stream/3","args":{},"headers":{"Host":"127.0.0.1:64801","User-Agent":"httr2/1.0.0.9000 r-curl/5.2.0 libcurl/7.84.0","Accept":"*/*","Accept-Encoding":"deflate, gzip"},"origin":"127.0.0.1","id":1}
#> {"url":"http://127.0.0.1:64801/stream/3","args":{},"headers":{"Host":"127.0.0.1:64801","User-Agent":"httr2/1.0.0.9000 r-curl/5.2.0 libcurl/7.84.0","Accept":"*/*","Accept-Encoding":"deflate, gzip"},"origin":"127.0.0.1","id":2}
resp <- req_perform_stream(req, show_lines, buffer_kb = 0.5, round = \(bytes) length(bytes) / 2)
#> {"url":"http://127.0.0.1:64801/stream/3","args":{},"headers":{"Host":"127.0.0.1:64801","User-Agent":"httr2/1.0.0.9000 r-curl/5.2.0 libcurl/7.84.0","Accept":"*/*","Accept-Encoding":"deflate, gzip"},"origin":"127.0.0.1","id":0}
#> {"url":"http://127.0.0.1:64801/stream/3","args":{},"headers":{"Host":"127.0.0.1:64801","User-Agent":"httr2/1.0.0.9000 r-curl/5.2.0 libcurl/7.84.0","Accept":"*/*","Accept-Encoding":"deflate, gzip"},"origin":"127.0.0.1","id":1}
#> {"url":"http://127.0.0.1:64801/stream/3","args":{},"headers":{"Host":"127.0.0.1:64801","User-Agent":"httr2/1.0.0.9000 r-curl/5.2.0 libcurl/7.84.0","Accept":"*/*","Accept-Encoding":"deflate, gzip"},"origin":"127.0.0.1","id":2}
resp <- req_perform_stream(req, show_lines, buffer_kb = 0.5, round = 2L)
#> Error in `req_perform_stream()`:
#> ! `round` must be "byte", "line" or a function
resp <- req_perform_stream(req, show_lines, buffer_kb = 0.5, round = "banana")
#> Error in `req_perform_stream()`:
#> ! `round` must be one of "byte" or "line", not "banana". Created on 2024-02-12 with reprex v2.1.0 |
req_perform_stream(round = "lines")
req_perform_stream(round = c("byte", "line"))
And correct a bug I introduced
#' @param buffer_kb Buffer size, in kilobytes. | ||
#' @param round How should the raw vector sent to `callback` be rounded? | ||
#' Choose `"byte"`, `"line"`, or supply your own function that takes a | ||
#' raw vector of `bytes` and returns the locations of possible cut points |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think returning a vector of cut points makes the implementation a bit simpler; see the new implementation of round_lines()
below. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for improving the doc. With this reframe, returning the vector of cut points does make more sense, although, the current implementation only cares about the last one.
buffer_kb = 0.1, | ||
round = function(bytes) integer() | ||
) | ||
expect_equal(length(out), 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this correct, or should it return 1024? i.e. should we always call callback
on the final bytes in the buffer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think 0L
makes sense here. Should the response be different though in that case ? Is this a 408 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well the HTTP request succeeded, and returned all it's data, but it never used a cut point we expected. I'm now leaning more towards 0 being wrong, because it's surprising for data to silent vanish if there's not a trailing newline (which must be fairly common since readLines()
handles it).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😱 yeah, tweaked it the incomplete
logic a bit to accommodate for these cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the extra 💅✨
#' @param buffer_kb Buffer size, in kilobytes. | ||
#' @param round How should the raw vector sent to `callback` be rounded? | ||
#' Choose `"byte"`, `"line"`, or supply your own function that takes a | ||
#' raw vector of `bytes` and returns the locations of possible cut points |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for improving the doc. With this reframe, returning the vector of cut points does make more sense, although, the current implementation only cares about the last one.
buffer_kb = 0.1, | ||
round = function(bytes) integer() | ||
) | ||
expect_equal(length(out), 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think 0L
makes sense here. Should the response be different though in that case ? Is this a 408 ?
R/req-perform-stream.R
Outdated
# there are leftover bytes, but the stream is complete | ||
# break the loop so that the callback() is given the | ||
# whole buffer | ||
if (!incomplete) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Work it work to move this higher? i.e. make it the other side of the if (incomplete)
statement above.
while(continue && isIncomplete(stream) && Sys.time() < stop_time) { | ||
buf <- readBin(stream, raw(), buffer_kb * 1024) | ||
buf <- c(buf, readBin(stream, raw(), buffer_kb * 1024)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This update looks great!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like how it rolls back to the original while(continue && isIncomplete(stream) && Sys.time() < stop_time)
loop
Thanks for working on this! |
The backstory for this proposal is mlverse/chattr#63 The failure in
chattr
is because on occasions thecallback
inreq_perform_stream()
truncates the bytes mid multi byte string, e.g. only including 2 bytes in a 3 bytes utf-8 string.This is because
req_perform_stream()
consumes thecurl
connection with areadBin()
call and a fixed number of bytes.This proposes to add a
req_perform_stream_lines
sister function that usesreadLines()
instead. This would be useful when the stream is known to return text, e.g. json that can then be dealt with line by line.I can polish the PR if this gets buy-in. I'll do a pr to
mlverse/chattr
to demonstrate a potential use ofreq_perform_stream_lines()
.However, what I really believe is that
req_perform_stream()
should instead evolve so that "get the next chunk" is abstracted, e.g. something like: