Skip to content

Commit

Permalink
fix: proper supervision cleanup (#35)
Browse files Browse the repository at this point in the history
* fix: proper supervision cleanup WIP

* fix: everything now supervised under :kadabra app

Was the original setup a while back. Supervisor was quitting
with :shutdown during cleanup, which would shutdown the process
that originally start_link'd it.

* fix: timeout on stream WindowUpdate

Replies immediately so connection can continue on with its life.
Call timeout was previously crashing connection on occasion.

* refactor: minor refactoring

* feat: CONTINUATION for sending large headers WIP

Chunks correctly, but remotes are still terminating with a
COMPRESSION_ERROR.

Possible ideas why:
1. Hpack is encoding the fragment wrong
2. Frames are being sent out of order
3. Continuation frames are being serialized wrong
4. Remotes aren't equipped to handle such large headers
5. Sending a 20mb header is just a shitty thing to do

* fix: don't crash Connection if Socket closed

Fixed by not shutting down Kadabra.Socket if the socket closes. Bins
sent to a closed socket are simply ignored.

Sending CONTINUATION frames is mostly there. Fairly certain the issue is
with hpack, so I'll need to figure something out.

* chore: bump version and update changelog

* test: increase test coverage
  • Loading branch information
hpopp authored May 25, 2018
1 parent 322ccb3 commit cdb9e36
Show file tree
Hide file tree
Showing 22 changed files with 411 additions and 197 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## v0.4.2
- Fixed `{:closed, pid}` task race condition during connection cleanup
- Everything is supervised under `Kadabra.Application` again, instead of
handling supervision yourself

## v0.4.1
- Send exactly number of allowed bytes on initial connection WINDOW_UPDATE
- Default settings use maximum values for MAX_FRAME_SIZE and INITIAL_WINDOW_SIZE
Expand Down
30 changes: 29 additions & 1 deletion lib/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,40 @@ defmodule Kadabra.Application do
use Application
import Supervisor.Spec

alias Kadabra.Connection

@app :kadabra

def start(_type, _args) do
children = [
supervisor(Registry, [:unique, Registry.Kadabra]),
supervisor(Task.Supervisor, [[name: Kadabra.Tasks]])
]

Supervisor.start_link(children, strategy: :one_for_one, name: :kadabra)
Supervisor.start_link(children, strategy: :one_for_one, name: @app)
end

def start_connection(uri, pid, opts) do
Supervisor.start_child(
@app,
supervisor(Kadabra.Supervisor, [uri, pid, opts], spec_opts())
)
end

defp spec_opts do
ref = :erlang.make_ref()
[id: ref, restart: :transient]
end

def ping(pid) do
pid
|> Connection.via_tuple()
|> Connection.ping()
end

def close(pid) do
pid
|> Connection.via_tuple()
|> Connection.close()
end
end
36 changes: 14 additions & 22 deletions lib/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ defmodule Kadabra.Connection do
Error,
Frame,
Socket,
Tasks
StreamSupervisor
}

alias Kadabra.Frame.{Goaway, Ping}
Expand Down Expand Up @@ -51,6 +51,7 @@ defmodule Kadabra.Connection do
state = initial_state(config)
Kernel.send(self(), :start)
Process.flag(:trap_exit, true)

{:consumer, state, subscribe_to: [queue]}
end

Expand Down Expand Up @@ -101,13 +102,12 @@ defmodule Kadabra.Connection do
config: config
} = state

StreamSupervisor.stop(state.config.ref)

bin = flow.stream_set.stream_id |> Goaway.new() |> Encodable.to_bin()
Socket.send(config.socket, bin)

Kernel.send(config.client, {:closed, config.supervisor})
Tasks.run(fn -> Kadabra.Supervisor.stop(config.supervisor) end)

{:reply, :ok, [], state}
{:stop, :shutdown, :ok, state}
end

# sendf
Expand Down Expand Up @@ -148,19 +148,16 @@ defmodule Kadabra.Connection do
{:noreply, [], state}
end

def handle_info({:closed, _pid}, %{config: config} = state) do
Kernel.send(config.client, {:closed, config.supervisor})
Tasks.run(fn -> Kadabra.Supervisor.stop(config.supervisor) end)

{:noreply, [], state}
def handle_info({:closed, _pid}, state) do
{:stop, :shutdown, state}
end

def handle_info({:EXIT, _pid, {:shutdown, {:finished, stream_id}}}, state) do
def handle_info({:DOWN, _, _, _pid, {:shutdown, {:finished, sid}}}, state) do
GenStage.ask(state.queue, 1)

flow =
state.flow_control
|> FlowControl.finish_stream(stream_id)
|> FlowControl.finish_stream(sid)
|> FlowControl.process(state.config)

{:noreply, [], %{state | flow_control: flow}}
Expand All @@ -178,28 +175,23 @@ defmodule Kadabra.Connection do

{:connection_error, error, reason, state} ->
handle_connection_error(state, error, reason)
{:noreply, [], state}
{:stop, {:shutdown, :connection_error}, state}
end
end

defp handle_connection_error(%{config: config} = state, error, reason) do
code = Error.code(error)
code = <<Error.code(error)::32>>

bin =
state.flow_control.stream_set.stream_id
|> Goaway.new(code, reason)
|> Encodable.to_bin()

Socket.send(config.socket, bin)
Tasks.run(fn -> Kadabra.Supervisor.stop(config.supervisor) end)
end

def send_goaway(%{config: config, flow_control: flow}, error) do
bin =
flow.stream_set.stream_id
|> Frame.Goaway.new(Error.code(error))
|> Encodable.to_bin()

Socket.send(config.socket, bin)
def terminate(_reason, %{config: config}) do
Kernel.send(config.client, {:closed, config.supervisor})
:ok
end
end
18 changes: 9 additions & 9 deletions lib/connection/flow_control.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ defmodule Kadabra.Connection.FlowControl do
max_frame_size: @default_max_frame_size,
window: @default_window_size

alias Kadabra.{Config, Stream, StreamSet}
alias Kadabra.{Config, StreamSet, StreamSupervisor}

@type t :: %__MODULE__{
queue: :queue.queue(),
Expand Down Expand Up @@ -86,8 +86,8 @@ defmodule Kadabra.Connection.FlowControl do
%{flow_control | queue: queue}
end

def add_active(%{stream_set: set} = flow_control, stream_id, pid) do
new_set = StreamSet.add_active(set, stream_id, pid)
def add_active(%{stream_set: set} = flow_control, stream_id) do
new_set = StreamSet.add_active(set, stream_id)
%{flow_control | stream_set: new_set}
end

Expand All @@ -102,14 +102,14 @@ defmodule Kadabra.Connection.FlowControl do
max_frame_size: max_frame
} = flow

stream = Stream.new(config, stream_id, window, max_frame)

case Stream.start_link(stream) do
case StreamSupervisor.start_stream(config, stream_id, window, max_frame) do
{:ok, pid} ->
Process.monitor(pid)

size = byte_size(request.body || <<>>)
:gen_statem.call(pid, {:send_headers, request})

updated_set = add_stream(stream_set, stream_id, pid)
updated_set = add_stream(stream_set, stream_id)

flow
|> Map.put(:queue, queue)
Expand All @@ -127,9 +127,9 @@ defmodule Kadabra.Connection.FlowControl do
end
end

defp add_stream(stream_set, stream_id, pid) do
defp add_stream(stream_set, stream_id) do
stream_set
|> StreamSet.add_active(stream_id, pid)
|> StreamSet.add_active(stream_id)
|> StreamSet.increment_active_stream_count()
|> StreamSet.increment_stream_id()
end
Expand Down
50 changes: 27 additions & 23 deletions lib/connection/processor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ defmodule Kadabra.Connection.Processor do
Frame,
Hpack,
Socket,
Stream
StreamSupervisor
}

alias Kadabra.Connection.FlowControl
Expand Down Expand Up @@ -58,14 +58,14 @@ defmodule Kadabra.Connection.Processor do
send_window_update(config.socket, 0, size)
send_window_update(config.socket, stream_id, bin_size)

process_on_stream(state, stream_id, frame)
StreamSupervisor.send_frame(config.ref, stream_id, frame)

{:ok, %{state | remote_window: state.remote_window + size}}
end

def process(%Headers{stream_id: stream_id} = frame, state) do
state
|> process_on_stream(stream_id, frame)
state.config.ref
|> StreamSupervisor.send_frame(stream_id, frame)
|> case do
:ok ->
{:ok, state}
Expand All @@ -81,7 +81,7 @@ defmodule Kadabra.Connection.Processor do
end

def process(%RstStream{stream_id: stream_id} = frame, state) do
process_on_stream(state, stream_id, frame)
StreamSupervisor.send_frame(state.config.ref, stream_id, frame)

{:ok, state}
end
Expand Down Expand Up @@ -121,7 +121,7 @@ defmodule Kadabra.Connection.Processor do
notify_settings_change(old_settings, settings, flow)

config.ref
|> Hpack.via_tuple(:encoder)
|> Hpack.via_tuple(:decoder)
|> Hpack.update_max_table_size(settings.max_header_list_size)

bin = Frame.Settings.ack() |> Encodable.to_bin()
Expand All @@ -136,7 +136,16 @@ defmodule Kadabra.Connection.Processor do
end

def process(%Frame.Settings{ack: true}, %{config: c} = state) do
c.ref
|> Hpack.via_tuple(:encoder)
|> Hpack.update_max_table_size(state.local_settings.max_header_list_size)

c.ref
|> Hpack.via_tuple(:decoder)
|> Hpack.update_max_table_size(state.local_settings.max_header_list_size)

send_huge_window_update(c.socket, state.remote_window)

{:ok, %{state | remote_window: FlowControl.window_max()}}
end

Expand All @@ -148,15 +157,11 @@ defmodule Kadabra.Connection.Processor do
max_frame_size: max_frame
} = flow_control

stream = Stream.new(config, stream_id, window, max_frame)

case Stream.start_link(stream) do
{:ok, pid} ->
Stream.call_recv(pid, frame)

flow = FlowControl.add_active(flow_control, stream_id, pid)

{:ok, %{state | flow_control: flow}}
case StreamSupervisor.start_stream(config, stream_id, window, max_frame) do
{:ok, _pid} ->
StreamSupervisor.send_frame(config.ref, stream_id, frame)
state = add_active(state, stream_id)
{:ok, state}

error ->
raise "#{inspect(error)}"
Expand Down Expand Up @@ -200,12 +205,12 @@ defmodule Kadabra.Connection.Processor do
end

def process(%WindowUpdate{stream_id: stream_id} = frame, state) do
process_on_stream(state, stream_id, frame)
StreamSupervisor.send_frame(state.config.ref, stream_id, frame)
{:ok, state}
end

def process(%Continuation{stream_id: stream_id} = frame, state) do
process_on_stream(state, stream_id, frame)
StreamSupervisor.send_frame(state.config.ref, stream_id, frame)
{:ok, state}
end

Expand All @@ -220,6 +225,11 @@ defmodule Kadabra.Connection.Processor do
{:ok, state}
end

def add_active(state, stream_id) do
flow = FlowControl.add_active(state.flow_control, stream_id)
%{state | flow_control: flow}
end

def log_goaway(%Goaway{last_stream_id: id, error_code: c, debug_data: b}) do
error = Error.parse(c)
Logger.error("Got GOAWAY, #{error}, Last Stream: #{id}, Rest: #{b}")
Expand Down Expand Up @@ -256,10 +266,4 @@ defmodule Kadabra.Connection.Processor do
send(pid, {:settings_change, window_diff, max_frame_size})
end
end

def process_on_stream(state, stream_id, frame) do
state.flow_control.stream_set.active_streams
|> Map.get(stream_id)
|> Stream.call_recv(frame)
end
end
12 changes: 4 additions & 8 deletions lib/encodable.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ defprotocol Kadabra.Encodable do
<<0, 5, 0, 0, 64, 0, 0, 4, 0, 0, 255, 255, 0, 1, 0, 0, 16, 0, 0, 2,
0, 0, 0, 0>>
iex> %Kadabra.Frame.Continuation{end_headers: true,
...> stream_id: 1, header_block_fragment: <<255, 255, 255>>} |> to_bin()
<<0, 0, 3, 9, 4, 0, 0, 0, 1, 255, 255, 255>>
iex> Kadabra.Encodable.to_bin(:any_non_frame_term)
:error
"""
Expand All @@ -24,13 +28,5 @@ defprotocol Kadabra.Encodable do
end

defimpl Kadabra.Encodable, for: Any do
@doc ~S"""
Encodes to binary.
## Examples
iex> Kadabra.Encodable.to_bin(1234)
:error
"""
def to_bin(_), do: :error
end
9 changes: 9 additions & 0 deletions lib/frame/continuation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,12 @@ defmodule Kadabra.Frame.Continuation do
}
end
end

defimpl Kadabra.Encodable, for: Kadabra.Frame.Continuation do
alias Kadabra.Frame

def to_bin(frame) do
f = if frame.end_headers, do: 0x4, else: 0x0
Frame.binary_frame(0x9, f, frame.stream_id, frame.header_block_fragment)
end
end
Loading

0 comments on commit cdb9e36

Please sign in to comment.