Skip to content

Commit

Permalink
Merge pull request #83 from exponentially/event-producer
Browse files Browse the repository at this point in the history
Add event_producer
  • Loading branch information
burmajam authored Oct 18, 2024
2 parents 8ed68c6 + 79b56dc commit 4002569
Show file tree
Hide file tree
Showing 30 changed files with 1,706 additions and 288 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ jobs:
runs-on: ubuntu-20.04
strategy:
matrix:
elixir: ['1.11.4', '1.12.3']
erlang: ['22.3', '23.3', '24.1']
elixir: ['1.15', '1.16']
erlang: ['24', '25', '26']

services:
es:
Expand Down Expand Up @@ -62,8 +62,8 @@ jobs:
runs-on: ubuntu-20.04
strategy:
matrix:
elixir: ['1.11.4', '1.12.3']
erlang: ['22.3', '23.3', '24.1']
elixir: ['1.15', '1.16']
erlang: ['24', '25', '26']

services:
es:
Expand Down Expand Up @@ -108,8 +108,8 @@ jobs:
runs-on: ubuntu-20.04
strategy:
matrix:
elixir: ['1.12.3']
erlang: ['24.1']
elixir: ['1.15', '1.16']
erlang: ['24', '25', '26']

services:
es:
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog for extreme v1.1.0

- Rename `RequestManager._unregister_subscription/2` (removed leading `_`)
- Add event_producer functionality for module that uses `Extreme`.

# Changelog for extreme v1.0.7

Expand Down
24 changes: 23 additions & 1 deletion lib/extreme.ex
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,28 @@ defmodule Extreme do
)
end

@spec start_event_producer(stream :: String.t(), subscriber :: pid(), opts :: Keyword.t()) ::
Supervisor.on_start_child()
def start_event_producer(stream, subscriber, opts \\ []) do
Extreme.EventProducer.Supervisor.start_event_producer(
__MODULE__,
[{:stream, stream}, {:subscriber, subscriber} | opts]
)
end

@spec subscribe_producer(producer :: pid()) :: :ok
def subscribe_producer(producer),
do: Extreme.EventProducer.subscribe(producer)

@spec unsubscribe_producer(producer :: pid()) :: :ok
def unsubscribe_producer(producer),
do: Extreme.EventProducer.unsubscribe(producer)

@spec producer_subscription_status(producer :: pid()) ::
:disconnected | :catching_up | :live | :paused
def producer_subscription_status(producer),
do: Extreme.EventProducer.subscription_status(producer)

def unsubscribe(subscription) when is_pid(subscription),
do: Extreme.Subscription.unsubscribe(subscription)

Expand Down Expand Up @@ -109,7 +131,7 @@ defmodule Extreme do
@doc """
TODO
"""
@callback unsubscribe(subscription :: pid()) :: :unsubscribed
@callback unsubscribe(subscription :: pid()) :: :ok

@doc """
TODO
Expand Down
8 changes: 4 additions & 4 deletions lib/extreme/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ defmodule Extreme.Connection do
|> GenServer.cast({:execute, message})
end

@impl true
@impl GenServer
def init({base_name, configuration}) do
GenServer.cast(self(), {:connect, configuration, 1})

Expand All @@ -30,7 +30,7 @@ defmodule Extreme.Connection do
{:ok, state}
end

@impl true
@impl GenServer
def handle_cast({:connect, configuration, attempt}, state) do
configuration
|> _connect(attempt)
Expand All @@ -57,7 +57,7 @@ defmodule Extreme.Connection do
end
end

@impl true
@impl GenServer
def handle_info({:tcp, socket, pkg}, %State{socket: socket} = state) do
{:ok, state} = Impl.receive_package(pkg, state)
{:noreply, state}
Expand All @@ -66,7 +66,7 @@ defmodule Extreme.Connection do
def handle_info({:tcp_closed, _port}, state),
do: {:stop, :tcp_closed, state}

@impl true
@impl GenServer
def terminate(reason, state) do
Logger.warning("[Extreme] Connection terminated: #{inspect(reason)}")
RequestManager.kill_all_subscriptions(state.base_name)
Expand Down
10 changes: 5 additions & 5 deletions lib/extreme/connection_impl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,22 @@ defmodule Extreme.ConnectionImpl do

def receive_package(pkg, %State{socket: socket, received_data: received_data} = state) do
:inet.setopts(socket, active: :once)
state = _process_package(state, received_data <> pkg)
state = _process_package(received_data <> pkg, state)
{:ok, state}
end

defp _process_package(
state,
<<message_length::32-unsigned-little-integer, content::binary-size(message_length),
rest::binary>>
rest::binary>>,
%State{} = state
) do
# Handle binary data containing zero, one or many messages
# All messages start with a 32 bit unsigned little endian integer of the content length + a binary body of that size
:ok = RequestManager.process_server_message(state.base_name, content)
_process_package(state, rest)
_process_package(rest, state)
end

# No full message left, keep state in GenServer to reprocess once more data arrives
defp _process_package(state, package_with_incomplete_message),
defp _process_package(package_with_incomplete_message, %State{} = state),
do: %{state | received_data: package_with_incomplete_message}
end
95 changes: 95 additions & 0 deletions lib/extreme/event_producer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
defmodule Extreme.EventProducer do
use GenServer
require Logger
alias Extreme.EventProducer.EventBuffer
alias Extreme.SharedSubscription, as: Shared

defmodule State do
defstruct ~w(base_name subscriber ack_timeout buffer_pid)a
end

def start_link(base_name, opts) do
GenServer.start_link(
__MODULE__,
{base_name, opts}
)
end

def subscribe(pid),
do: GenServer.cast(pid, :subscribe)

def unsubscribe(pid),
do: GenServer.cast(pid, :unsubscribe)

def on_sync_event(pid, event, timeout),
do: GenServer.call(pid, {:on_sync_event, event}, timeout)

def on_async_event(pid, event),
do: GenServer.cast(pid, {:on_async_event, event})

def send_to_subscriber(pid, msg),
do: GenServer.cast(pid, {:send_to_subscriber, msg})

def subscription_status(pid),
do: GenServer.call(pid, :subscription_status)

@impl GenServer
def init({base_name, opts}) do
{subscriber, opts} = Keyword.pop!(opts, :subscriber)
ack_timeout = Keyword.get(opts, :ack_timeout, 5_000)
{:ok, buffer_pid} = EventBuffer.start_link(base_name, self(), opts)

state = %State{
base_name: base_name,
subscriber: subscriber,
buffer_pid: buffer_pid,
ack_timeout: ack_timeout
}

{:ok, state}
end

@impl GenServer
def handle_call({:on_sync_event, event}, _from, %State{} = state) do
Logger.debug("Sending sync event")
response = Shared.on_event(state.subscriber, event, state.ack_timeout)
{:reply, response, state}
end

def handle_call(:subscription_status, _from, %State{} = state) do
response = EventBuffer.subscription_status(state.buffer_pid)
{:reply, response, state}
end

@impl GenServer
def handle_cast({:on_async_event, event}, %State{} = state) do
Logger.debug("Sending async event")

spawn_link(fn ->
response = Shared.on_event(state.subscriber, event, state.ack_timeout)
# Process.sleep(5)
:ok = EventBuffer.ack(state.buffer_pid, response)
end)

{:noreply, state}
end

def handle_cast({:send_to_subscriber, msg}, %State{} = state) do
Logger.debug("Proxing message #{inspect(msg)} to subscriber")
send(state.subscriber, msg)

{:noreply, state}
end

def handle_cast(:subscribe, %State{} = state) do
Logger.debug("Starting subscription")
:ok = EventBuffer.subscribe(state.buffer_pid)

{:noreply, state}
end

def handle_cast(:unsubscribe, %State{} = state) do
:ok = EventBuffer.unsubscribe(state.buffer_pid)
{:noreply, state}
end
end
Loading

0 comments on commit 4002569

Please sign in to comment.