-
Notifications
You must be signed in to change notification settings - Fork 215
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
New API Proposal #270
Comments
@sleipnir I think that's a really good. I pass trough the same problem as you. One thing that I thought in doing here is a more elixir-wise solution and instead of return a stream we could start send messages to the process that started the stream. Similarly to what gun/mint does for the requests. So, the process would receive an already parsed message on the stream. And you could implement an something like a gen_server to receive the messages. For example defmodule MyStreamHandlerProcess do
use GenServer
# ... boilerplate for GS
def handle_info({:elixir_grpc, {:headers, headers}}, state) do
# ...
end
def handle_info({:elixir_grpc, {:data, data}}, state) do
# ...
end
def handle_info({:elixir_grpc, {:trailers, trailers}}, state) do
# ...
end
def handle_info({:elixir_grpc, :done}, state) do
# ...
end
end This GenServer could also be responsible not only to receive the messages, but also send the data (in the case of a bidirectional stream. One workaround that I found to handle this on my projects to avoid having to wait for the server stream to end to start processing the messages is to do something similar to this. This adds a bit of async processing on your code. defmodule MyStreamHandlerProcess do
use GenServer
# ... boilerplate for GS
def send_request(grpc_stream) do
{:ok, ex_stream} = GRPC.Stub.recv(grpc_stream)
pid = self()
# async cause Stream.run() will block the code until the stream is done
Task.async(fn ->
ex_stream
|> Stream.each(fn resp ->
case resp do
{:ok, data} -> send(pid, {:elixir_grpc, {:data, data}})
# Covers
# {:headers, headers}
# {:trailers, trailers}
# {:error, error}
_other -> send(pid, {:elixir_grpc, resp})
end
end)
|> Stream.run() # code will be blocked until the stream end
# send a message to tell the process that the stream has ended
send(pid, {:elixir_grpc, :done})
end)
end
def handle_info({:elixir_grpc, {:headers, headers}}, state) do
# ...
end
def handle_info({:elixir_grpc, {:data, data}}, state) do
# ...
end
def handle_info({:elixir_grpc, {:trailers, trailers}}, state) do
# ...
end
def handle_info({:elixir_grpc, :done}, state) do
# ...
end
end What do you think? |
Hi @beligante, thanks for the interaction here. Sorry for the late reply, I've been quite busy on another project lately. Overall I agree with your approach and have adopted similar strategies in my projects as well. However, one thing that you need to be careful with is the management of GenServer because you need to create a GenServer process for each different incoming connection, or use a pool of processes, otherwise GenServer itself becomes a bottleneck. To be honest I would still like a flow-based API, but understanding that this might be complicated. A reasonable alternative might be to use the GenStage API along the lines of what Broadway already does. The advantage of something based on GenStage is that you would already natively include backpressure capabilities in the gRPC library, something that Akka gRPC has already demonstrated to be the state of the art for gRPC libraries. I strongly recommend taking a look at the Akka gRPC API and general Akka Streams concepts. It can be very useful as inspiration here, as other Elixir libraries like Flow have followed similar paths in essence and have been very successful. What do you think? |
@beligante I'm just attempting to integrate elixir-grpc now, and before I even found this issue, I was trying to use a workaround like to the one you sketched out here (doing Stream.run() in a Task on the stream of responses), to avoid the problem of it blocking the genserver. However, as far as I can tell, that doesn't work? The stream generation is calling :gun.await, but that is just a receive intercepting gun-specific messages sent to the calling process. So it's not achieving anything when called from a task process. The task never processes anything, and the calling process is just sent raw I have a workaround, which is to intercept messages from gun and forward them to the receiver (whose pid can be stored in the state), eg
This works, but is hacky. You said you'd implemented this in your own projects, in which case I suspect I'm overlooking a better way of doing things? Thanks in advance 🙂 |
@sleipnir - Sorry late reply too - same problem you're having hahahaha - I think we can skip apologies next time I gave a second look in what you're saying, but I'm not sure if I'm following + I think I misunderstood your your request.
In the ideal world, you would like the library to add more abstraction so that you can work we streams more purely. That's what I understood - Is that correct? |
Hey @SimonWoolf About your question. Yeah! I had to do a similar approach because of that strange behavior of gun - Just omitted for brevity . IDK why that happens to be honest hahaha. But yeah. That workaround is needed unfortunately :/ I've added a PR to add a new adapter to use And with that adapter we don't have this problem as I'm wrapping the messages inside a GenServer. (I've been testing that PR on my staging environments for a while it's on a pretty good state FYI) |
Hi @beligante
I refer to the server side, I think the biggest gap in the library is on the server side. Obviously the client side would benefit from such an approach as well.
By Stream I mean stream processing semantics and not the gRPC specification. I think the stream processing semantics can be good semantics for dealing with gRPC streams especially those of type Unbounded, ie infinite. This would bring the side benefit I referred to of back pressure. If we just add semantics, like you said "more elixir-wise solution", we still won't be able to handle back pressure, Not the kind I'm referring to (throtle, buffering, window, etc.) without having to do a lot of the work on the user side of the library. And then I cite two Elixir libraries that solved this problem, Genstate and Broadway. I usually use a GenServer and a DynamicSupervisor, in addition to including a start-of-stream message in my protos, to aid in the handling of gRPC streams. But I think it's a bad hack mainly because you have to monitor the PIDs of the process and this ends up adding complexities to the code which can lead to failures that are difficult to track in production. I really believe the library can go a step further and provide an API with batteries included like Akka gRPC. |
I'm not sure if we should discuss the error details mentioned in #109? |
Maybe yes |
Is your feature request related to a problem? Please describe.
One thing that has always bothered me about the elixir-grpc API is that it differs a lot in how we handle Stream gRPC in other languages and why we need a separate module and functions to handle responses from a gRPC stream. I would like to handle a grpc stream as fluidly as we do in other languages in which basically the types generated for input and output of an rpc stream are a stream type in the language, whereas in the case of elixir-grpc an output type of a stream is not an output type of the elixir function, instead we use a helper module (Server.send_reply) to output a response but this does not reflect in the function's Typespec as a stream.
I know it's hard to visualize what I'm saying so I'll put some examples from other languages here to demonstrate:
Kotlin Raw:
kotlin with Akka gRPC:
Java with Akka gRPC:
Java Raw:
Rust:
Python:
GO:
Notice that when we have an output stream in grpc declared then we also have an output data type as a Stream
Describe the solution you'd like
I would like given the following rpc to be created:
The equivalent implementation code in elixir would look like this:
This would make stream development sound more natural to those coming from other languages and would also make it possible for Elixir development to be more compatible with Elixir's own Stream types.
But notice that in the code above there are some tricks:
First a Flow.map or Stream.map does not return an Enumerable in fact they only return a "DAG" (i'm using DAG just as an illustration for an execution path type) of the operations to be performed on the stream. Even something like Stream.run or Flown.run wouldn't also return a stream, it would just run the DAG.
As gRPC streams are by definition Unbounded, that is, they are infinite and "decoupled" from each other (by "decoupled" I mean input and output streams), so calling Enum.to_list(stream) would not solve the problem either. In this case an API that makes more sense for Elixir, and also true for other languages and frameworks (see the above example of Java with Akka) might be something like:
@polvalente wdyt?
The text was updated successfully, but these errors were encountered: