Skip to content

Commit

Permalink
feat: Use GenStage to write Config.State changes asynchronously (#1346)
Browse files Browse the repository at this point in the history
* refactor: make Config.State a GenStage producer

* feat: emit updated state as an event

* refactor: move writing out of config to consumer
  • Loading branch information
sloanelybutsurely authored Jun 21, 2024
1 parent f3c647b commit 037c7d0
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 106 deletions.
1 change: 1 addition & 0 deletions lib/signs_ui/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ defmodule SignsUi.Application do
{Phoenix.PubSub, name: SignsUi.PubSub},
SignsUiWeb.Endpoint,
SignsUi.Config.State,
SignsUi.Config.Writer,
{SignsUi.Signs.State, [name: SignsUi.Signs.State]},
SignsUi.Config.Expiration,
SignsUi.RefreshTokenStore,
Expand Down
107 changes: 30 additions & 77 deletions lib/signs_ui/config/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule SignsUi.Config.State do
@moduledoc """
Keeps an internal state of all the signs
"""
use GenServer
use GenStage
require Logger

alias SignsUi.Config
Expand All @@ -22,62 +22,62 @@ defmodule SignsUi.Config.State do

def start_link(opts \\ []) do
name = opts[:name] || __MODULE__
GenServer.start_link(__MODULE__, [], name: name)
GenStage.start_link(__MODULE__, [], name: name)
end

@doc """
Gets all the current state.
"""
@spec get_all(GenServer.server()) :: t()
@spec get_all(GenStage.stage()) :: t()
def get_all(pid \\ __MODULE__) do
GenServer.call(pid, :get_all)
GenStage.call(pid, :get_all)
end

@doc """
Updates the state with new sign configurations by merging them in.
"""
@spec update_sign_configs(GenServer.server(), %{Config.Sign.id() => Config.Sign.t()}) ::
@spec update_sign_configs(GenStage.stage(), %{Config.Sign.id() => Config.Sign.t()}) ::
{:ok, t()}
def update_sign_configs(pid \\ __MODULE__, changes) do
GenServer.call(pid, {:update_sign_configs, changes})
GenStage.call(pid, {:update_sign_configs, changes})
end

@doc """
Sets configured headways to the provided value.
"""
@spec update_configured_headways(GenServer.server(), %{
@spec update_configured_headways(GenStage.stage(), %{
String.t() => Config.ConfiguredHeadway.t()
}) ::
{:ok, t()}
def update_configured_headways(pid \\ __MODULE__, changes) do
GenServer.call(pid, {:update_configured_headways, changes})
GenStage.call(pid, {:update_configured_headways, changes})
end

@doc """
Sets Chelsea Bridge announcements to the provided value.
"""
@spec update_chelsea_bridge_announcements(GenServer.server(), %{
@spec update_chelsea_bridge_announcements(GenStage.stage(), %{
String.t() => String.t()
}) ::
{:ok, t()}
def update_chelsea_bridge_announcements(pid \\ __MODULE__, changes) do
GenServer.call(pid, {:update_chelsea_bridge_announcements, changes})
GenStage.call(pid, {:update_chelsea_bridge_announcements, changes})
end

@doc """
Applies the given SignGroups changes (inserts, updates, and deletes).
"""
@spec update_sign_groups(GenServer.server(), SignGroups.t()) :: {:ok, t()}
@spec update_sign_groups(GenStage.stage(), SignGroups.t()) :: {:ok, t()}
def update_sign_groups(pid \\ __MODULE__, changes) do
GenServer.call(pid, {:update_sign_groups, changes})
GenStage.call(pid, {:update_sign_groups, changes})
end

@spec update_scu(GenServer.server(), String.t(), boolean()) :: :ok
@spec update_scu(GenStage.stage(), String.t(), boolean()) :: :ok
def update_scu(pid \\ __MODULE__, id, migrated) do
GenServer.call(pid, {:update_scu, id, migrated})
GenStage.call(pid, {:update_scu, id, migrated})
end

@spec init(any()) :: {:ok, t()}
@impl true
def init(_) do
schedule_clean(self(), 60_000)
config_store = Application.get_env(:signs_ui, :config_store)
Expand All @@ -101,52 +101,56 @@ defmodule SignsUi.Config.State do
scus_migrated: Map.new(scu_ids, &{&1, Map.get(scu_lookup, &1, false)})
}

{:ok, state}
{:producer, state}
end

@impl true
def handle_call(:get_all, _from, signs) do
{:reply, signs, signs}
{:reply, signs, [], signs}
end

def handle_call({:update_sign_configs, changes}, _from, old_state) do
new_state = save_sign_config_changes(changes, old_state)
{:reply, {:ok, new_state}, new_state}
{:reply, {:ok, new_state}, [new_state], new_state}
end

def handle_call({:update_configured_headways, changes}, _from, old_state) do
new_state = save_configured_headways_changes(changes, old_state)
{:reply, {:ok, new_state}, new_state}
{:reply, {:ok, new_state}, [new_state], new_state}
end

def handle_call({:update_chelsea_bridge_announcements, changes}, _from, old_state) do
new_state = save_chelsea_bridge_announcements(changes, old_state)
{:reply, {:ok, new_state}, new_state}
{:reply, {:ok, new_state}, [new_state], new_state}
end

def handle_call({:update_sign_groups, changes}, _from, old_state) do
sign_config_changes = SignsUi.Config.SignGroupToSignConfigs.apply(changes, old_state)
new_sign_group_state = save_sign_group_changes(changes, old_state)
new_state = save_sign_config_changes(sign_config_changes, new_sign_group_state)
{:reply, {:ok, new_state}, new_state}
{:reply, {:ok, new_state}, [new_state], new_state}
end

def handle_call({:update_scu, id, migrated}, _from, state) do
state = update_in(state, [:scus_migrated], &Map.replace(&1, id, migrated))
save_state(state)
{:reply, :ok, state}
{:reply, :ok, [state], state}
end

@impl true
def handle_info(:clean, %{signs: sign_configs} = state) do
schedule_clean(self(), 60_000)
new_state = %{state | signs: Utilities.clean_configs(sign_configs)}
save_state(new_state)
{:noreply, new_state}
{:noreply, [new_state], new_state}
end

@impl true
def handle_demand(_, state) do
{:noreply, [], state}
end

@spec save_sign_config_changes(%{Config.Sign.id() => Config.Sign.t()}, t()) :: t()
defp save_sign_config_changes(changes, %{signs: old_signs} = old_state) do
signs = Map.merge(old_signs, changes)
save_state(%{old_state | signs: signs})

broadcast_data =
signs
Expand All @@ -166,8 +170,6 @@ defmodule SignsUi.Config.State do
) do
new_state = %{old_state | configured_headways: new_configured_headways}

save_state(new_state)

SignsUiWeb.Endpoint.broadcast!(
"headways:all",
"new_configured_headways_state",
Expand All @@ -180,7 +182,6 @@ defmodule SignsUi.Config.State do
@spec save_chelsea_bridge_announcements(String.t(), t()) :: t()
defp save_chelsea_bridge_announcements(value, old_state) do
new_state = Map.put(old_state, :chelsea_bridge_announcements, value)
save_state(new_state)

SignsUiWeb.Endpoint.broadcast!(
"chelseaBridgeAnnouncements:all",
Expand All @@ -197,8 +198,6 @@ defmodule SignsUi.Config.State do

new_state = %{old_state | sign_groups: new_groups}

save_state(new_state)

SignsUiWeb.Endpoint.broadcast!(
"signGroups:all",
"new_sign_groups_state",
Expand All @@ -208,52 +207,6 @@ defmodule SignsUi.Config.State do
new_state
end

defp save_state(%{
signs: signs,
configured_headways: configured_headways,
chelsea_bridge_announcements: chelsea_bridge_announcements,
sign_groups: sign_groups,
sign_stops: sign_stops,
scus_migrated: scus_migrated
}) do
config_store = Application.get_env(:signs_ui, :config_store)

Jason.encode!(
%{
"signs" => Config.Signs.format_signs_for_json(signs),
"configured_headways" =>
Config.ConfiguredHeadways.format_configured_headways_for_json(configured_headways),
"chelsea_bridge_announcements" => chelsea_bridge_announcements,
"sign_groups" => sign_groups,
"scus_migrated" => scus_migrated
},
pretty: true
)
|> config_store.write()

for {%{stop_id: stop_id, route_id: route_id, direction_id: direction_id}, ids} <- sign_stops do
%{
stop_id: stop_id,
route_id: route_id,
direction_id: direction_id,
predictions:
if Enum.any?(ids, fn id ->
case signs[id] do
nil -> false
sign -> sign.config.mode == :headway
end
end) do
"flagged"
else
"normal"
end
}
end
|> then(&%{"stops" => &1})
|> Jason.encode!(pretty: true)
|> config_store.write_stops()
end

# sobelow_skip ["Traversal"]
defp parse_signs_json do
signs_json =
Expand Down
80 changes: 80 additions & 0 deletions lib/signs_ui/config/writer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
defmodule SignsUi.Config.Writer do
@moduledoc """
Writes state updates out to storage
"""
use GenStage

alias SignsUi.Config

def start_link(opts) do
{name, opts} = Keyword.pop(opts, :name, __MODULE__)
GenStage.start_link(__MODULE__, opts, name: name)
end

@impl true
def init(opts) do
producer =
opts
|> Keyword.get(:producer, SignsUi.Config.State)
|> List.wrap()

state = :ok

{:consumer, state, subscribe_to: producer}
end

@impl true
def handle_events(events, _from, state) do
most_recent_state = List.last(events)

unless is_nil(most_recent_state), do: save_state(most_recent_state)

{:noreply, [], state}
end

defp save_state(%{
signs: signs,
configured_headways: configured_headways,
chelsea_bridge_announcements: chelsea_bridge_announcements,
sign_groups: sign_groups,
sign_stops: sign_stops,
scus_migrated: scus_migrated
}) do
config_store = Application.get_env(:signs_ui, :config_store)

Jason.encode!(
%{
"signs" => Config.Signs.format_signs_for_json(signs),
"configured_headways" =>
Config.ConfiguredHeadways.format_configured_headways_for_json(configured_headways),
"chelsea_bridge_announcements" => chelsea_bridge_announcements,
"sign_groups" => sign_groups,
"scus_migrated" => scus_migrated
},
pretty: true
)
|> config_store.write()

for {%{stop_id: stop_id, route_id: route_id, direction_id: direction_id}, ids} <- sign_stops do
%{
stop_id: stop_id,
route_id: route_id,
direction_id: direction_id,
predictions:
if Enum.any?(ids, fn id ->
case signs[id] do
nil -> false
sign -> sign.config.mode == :headway
end
end) do
"flagged"
else
"normal"
end
}
end
|> then(&%{"stops" => &1})
|> Jason.encode!(pretty: true)
|> config_store.write_stops()
end
end
2 changes: 1 addition & 1 deletion test/signs_ui/config/expiration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ defmodule SignsUi.Config.ExpirationTest do
Logger.configure(level: :info)
on_exit(fn -> Logger.configure(level: old_level) end)

{:ok, _state_pid} = SignsUi.Config.State.start_link(name: :sign_state_test)
{:ok, _state_pid} = start_supervised({SignsUi.Config.State, name: :sign_state_test})

state = %{
time_fetcher: fn ->
Expand Down
Loading

0 comments on commit 037c7d0

Please sign in to comment.