diff --git a/lib/signs_ui/application.ex b/lib/signs_ui/application.ex index 3ea2a3d4..ed9361d6 100644 --- a/lib/signs_ui/application.ex +++ b/lib/signs_ui/application.ex @@ -13,6 +13,7 @@ defmodule SignsUi.Application do {Phoenix.PubSub, name: SignsUi.PubSub}, SignsUiWeb.Endpoint, SignsUi.Config.State, + SignsUi.Config.Writer, {SignsUi.Signs.State, [name: SignsUi.Signs.State]}, SignsUi.Config.Expiration, SignsUi.RefreshTokenStore, diff --git a/lib/signs_ui/config/state.ex b/lib/signs_ui/config/state.ex index 4a57eb48..0465e008 100644 --- a/lib/signs_ui/config/state.ex +++ b/lib/signs_ui/config/state.ex @@ -2,7 +2,7 @@ defmodule SignsUi.Config.State do @moduledoc """ Keeps an internal state of all the signs """ - use GenServer + use GenStage require Logger alias SignsUi.Config @@ -22,62 +22,62 @@ defmodule SignsUi.Config.State do def start_link(opts \\ []) do name = opts[:name] || __MODULE__ - GenServer.start_link(__MODULE__, [], name: name) + GenStage.start_link(__MODULE__, [], name: name) end @doc """ Gets all the current state. """ - @spec get_all(GenServer.server()) :: t() + @spec get_all(GenStage.stage()) :: t() def get_all(pid \\ __MODULE__) do - GenServer.call(pid, :get_all) + GenStage.call(pid, :get_all) end @doc """ Updates the state with new sign configurations by merging them in. """ - @spec update_sign_configs(GenServer.server(), %{Config.Sign.id() => Config.Sign.t()}) :: + @spec update_sign_configs(GenStage.stage(), %{Config.Sign.id() => Config.Sign.t()}) :: {:ok, t()} def update_sign_configs(pid \\ __MODULE__, changes) do - GenServer.call(pid, {:update_sign_configs, changes}) + GenStage.call(pid, {:update_sign_configs, changes}) end @doc """ Sets configured headways to the provided value. """ - @spec update_configured_headways(GenServer.server(), %{ + @spec update_configured_headways(GenStage.stage(), %{ String.t() => Config.ConfiguredHeadway.t() }) :: {:ok, t()} def update_configured_headways(pid \\ __MODULE__, changes) do - GenServer.call(pid, {:update_configured_headways, changes}) + GenStage.call(pid, {:update_configured_headways, changes}) end @doc """ Sets Chelsea Bridge announcements to the provided value. """ - @spec update_chelsea_bridge_announcements(GenServer.server(), %{ + @spec update_chelsea_bridge_announcements(GenStage.stage(), %{ String.t() => String.t() }) :: {:ok, t()} def update_chelsea_bridge_announcements(pid \\ __MODULE__, changes) do - GenServer.call(pid, {:update_chelsea_bridge_announcements, changes}) + GenStage.call(pid, {:update_chelsea_bridge_announcements, changes}) end @doc """ Applies the given SignGroups changes (inserts, updates, and deletes). """ - @spec update_sign_groups(GenServer.server(), SignGroups.t()) :: {:ok, t()} + @spec update_sign_groups(GenStage.stage(), SignGroups.t()) :: {:ok, t()} def update_sign_groups(pid \\ __MODULE__, changes) do - GenServer.call(pid, {:update_sign_groups, changes}) + GenStage.call(pid, {:update_sign_groups, changes}) end - @spec update_scu(GenServer.server(), String.t(), boolean()) :: :ok + @spec update_scu(GenStage.stage(), String.t(), boolean()) :: :ok def update_scu(pid \\ __MODULE__, id, migrated) do - GenServer.call(pid, {:update_scu, id, migrated}) + GenStage.call(pid, {:update_scu, id, migrated}) end - @spec init(any()) :: {:ok, t()} + @impl true def init(_) do schedule_clean(self(), 60_000) config_store = Application.get_env(:signs_ui, :config_store) @@ -101,52 +101,56 @@ defmodule SignsUi.Config.State do scus_migrated: Map.new(scu_ids, &{&1, Map.get(scu_lookup, &1, false)}) } - {:ok, state} + {:producer, state} end + @impl true def handle_call(:get_all, _from, signs) do - {:reply, signs, signs} + {:reply, signs, [], signs} end def handle_call({:update_sign_configs, changes}, _from, old_state) do new_state = save_sign_config_changes(changes, old_state) - {:reply, {:ok, new_state}, new_state} + {:reply, {:ok, new_state}, [new_state], new_state} end def handle_call({:update_configured_headways, changes}, _from, old_state) do new_state = save_configured_headways_changes(changes, old_state) - {:reply, {:ok, new_state}, new_state} + {:reply, {:ok, new_state}, [new_state], new_state} end def handle_call({:update_chelsea_bridge_announcements, changes}, _from, old_state) do new_state = save_chelsea_bridge_announcements(changes, old_state) - {:reply, {:ok, new_state}, new_state} + {:reply, {:ok, new_state}, [new_state], new_state} end def handle_call({:update_sign_groups, changes}, _from, old_state) do sign_config_changes = SignsUi.Config.SignGroupToSignConfigs.apply(changes, old_state) new_sign_group_state = save_sign_group_changes(changes, old_state) new_state = save_sign_config_changes(sign_config_changes, new_sign_group_state) - {:reply, {:ok, new_state}, new_state} + {:reply, {:ok, new_state}, [new_state], new_state} end def handle_call({:update_scu, id, migrated}, _from, state) do state = update_in(state, [:scus_migrated], &Map.replace(&1, id, migrated)) - save_state(state) - {:reply, :ok, state} + {:reply, :ok, [state], state} end + @impl true def handle_info(:clean, %{signs: sign_configs} = state) do schedule_clean(self(), 60_000) new_state = %{state | signs: Utilities.clean_configs(sign_configs)} - save_state(new_state) - {:noreply, new_state} + {:noreply, [new_state], new_state} + end + + @impl true + def handle_demand(_, state) do + {:noreply, [], state} end @spec save_sign_config_changes(%{Config.Sign.id() => Config.Sign.t()}, t()) :: t() defp save_sign_config_changes(changes, %{signs: old_signs} = old_state) do signs = Map.merge(old_signs, changes) - save_state(%{old_state | signs: signs}) broadcast_data = signs @@ -166,8 +170,6 @@ defmodule SignsUi.Config.State do ) do new_state = %{old_state | configured_headways: new_configured_headways} - save_state(new_state) - SignsUiWeb.Endpoint.broadcast!( "headways:all", "new_configured_headways_state", @@ -180,7 +182,6 @@ defmodule SignsUi.Config.State do @spec save_chelsea_bridge_announcements(String.t(), t()) :: t() defp save_chelsea_bridge_announcements(value, old_state) do new_state = Map.put(old_state, :chelsea_bridge_announcements, value) - save_state(new_state) SignsUiWeb.Endpoint.broadcast!( "chelseaBridgeAnnouncements:all", @@ -197,8 +198,6 @@ defmodule SignsUi.Config.State do new_state = %{old_state | sign_groups: new_groups} - save_state(new_state) - SignsUiWeb.Endpoint.broadcast!( "signGroups:all", "new_sign_groups_state", @@ -208,52 +207,6 @@ defmodule SignsUi.Config.State do new_state end - defp save_state(%{ - signs: signs, - configured_headways: configured_headways, - chelsea_bridge_announcements: chelsea_bridge_announcements, - sign_groups: sign_groups, - sign_stops: sign_stops, - scus_migrated: scus_migrated - }) do - config_store = Application.get_env(:signs_ui, :config_store) - - Jason.encode!( - %{ - "signs" => Config.Signs.format_signs_for_json(signs), - "configured_headways" => - Config.ConfiguredHeadways.format_configured_headways_for_json(configured_headways), - "chelsea_bridge_announcements" => chelsea_bridge_announcements, - "sign_groups" => sign_groups, - "scus_migrated" => scus_migrated - }, - pretty: true - ) - |> config_store.write() - - for {%{stop_id: stop_id, route_id: route_id, direction_id: direction_id}, ids} <- sign_stops do - %{ - stop_id: stop_id, - route_id: route_id, - direction_id: direction_id, - predictions: - if Enum.any?(ids, fn id -> - case signs[id] do - nil -> false - sign -> sign.config.mode == :headway - end - end) do - "flagged" - else - "normal" - end - } - end - |> then(&%{"stops" => &1}) - |> Jason.encode!(pretty: true) - |> config_store.write_stops() - end - # sobelow_skip ["Traversal"] defp parse_signs_json do signs_json = diff --git a/lib/signs_ui/config/writer.ex b/lib/signs_ui/config/writer.ex new file mode 100644 index 00000000..24ffb492 --- /dev/null +++ b/lib/signs_ui/config/writer.ex @@ -0,0 +1,80 @@ +defmodule SignsUi.Config.Writer do + @moduledoc """ + Writes state updates out to storage + """ + use GenStage + + alias SignsUi.Config + + def start_link(opts) do + {name, opts} = Keyword.pop(opts, :name, __MODULE__) + GenStage.start_link(__MODULE__, opts, name: name) + end + + @impl true + def init(opts) do + producer = + opts + |> Keyword.get(:producer, SignsUi.Config.State) + |> List.wrap() + + state = :ok + + {:consumer, state, subscribe_to: producer} + end + + @impl true + def handle_events(events, _from, state) do + most_recent_state = List.last(events) + + unless is_nil(most_recent_state), do: save_state(most_recent_state) + + {:noreply, [], state} + end + + defp save_state(%{ + signs: signs, + configured_headways: configured_headways, + chelsea_bridge_announcements: chelsea_bridge_announcements, + sign_groups: sign_groups, + sign_stops: sign_stops, + scus_migrated: scus_migrated + }) do + config_store = Application.get_env(:signs_ui, :config_store) + + Jason.encode!( + %{ + "signs" => Config.Signs.format_signs_for_json(signs), + "configured_headways" => + Config.ConfiguredHeadways.format_configured_headways_for_json(configured_headways), + "chelsea_bridge_announcements" => chelsea_bridge_announcements, + "sign_groups" => sign_groups, + "scus_migrated" => scus_migrated + }, + pretty: true + ) + |> config_store.write() + + for {%{stop_id: stop_id, route_id: route_id, direction_id: direction_id}, ids} <- sign_stops do + %{ + stop_id: stop_id, + route_id: route_id, + direction_id: direction_id, + predictions: + if Enum.any?(ids, fn id -> + case signs[id] do + nil -> false + sign -> sign.config.mode == :headway + end + end) do + "flagged" + else + "normal" + end + } + end + |> then(&%{"stops" => &1}) + |> Jason.encode!(pretty: true) + |> config_store.write_stops() + end +end diff --git a/test/signs_ui/config/expiration_test.exs b/test/signs_ui/config/expiration_test.exs index 2a4cb4e9..a27a2037 100644 --- a/test/signs_ui/config/expiration_test.exs +++ b/test/signs_ui/config/expiration_test.exs @@ -178,7 +178,7 @@ defmodule SignsUi.Config.ExpirationTest do Logger.configure(level: :info) on_exit(fn -> Logger.configure(level: old_level) end) - {:ok, _state_pid} = SignsUi.Config.State.start_link(name: :sign_state_test) + {:ok, _state_pid} = start_supervised({SignsUi.Config.State, name: :sign_state_test}) state = %{ time_fetcher: fn -> diff --git a/test/signs_ui/config/state_test.exs b/test/signs_ui/config/state_test.exs index 296a97cf..8c186cb0 100644 --- a/test/signs_ui/config/state_test.exs +++ b/test/signs_ui/config/state_test.exs @@ -6,19 +6,27 @@ defmodule SignsUi.Config.StateTest do alias SignsUi.Config.ConfiguredHeadway alias SignsUi.Config.ConfiguredHeadways + setup do + pid = start_link_supervised!({Config.State, name: __MODULE__}) + + %{pid: pid} + end + describe "get_all/1" do - test "Returns all signs" do - {:ok, signs_server} = start_supervised({Config.State, [name: :sign_test]}) - signs = %{"sign1" => Sign.new("sign1", true), "sign2" => Sign.new("sign2", true)} - :sys.replace_state(signs_server, fn _state -> signs end) - assert get_all(signs_server) == signs + test "Returns entire state object", %{pid: signs_server} do + assert %{ + chelsea_bridge_announcements: _, + configured_headways: _, + scus_migrated: _, + sign_groups: _, + sign_stops: _, + signs: _ + } = get_all(signs_server) end end describe "update_sign_configs" do - test "updates some values and leaves others alone" do - {:ok, pid} = GenServer.start_link(SignsUi.Config.State, [], []) - + test "updates some values and leaves others alone", %{pid: pid} do @endpoint.subscribe("signs:all") assert %{ @@ -63,9 +71,7 @@ defmodule SignsUi.Config.StateTest do end describe "update_configured_headways" do - test "updates values properly" do - {:ok, pid} = GenServer.start_link(SignsUi.Config.State, [], []) - + test "updates values properly", %{pid: pid} do @endpoint.subscribe("headways:all") assert get_all(pid).configured_headways == %{ @@ -95,9 +101,7 @@ defmodule SignsUi.Config.StateTest do assert_broadcast("new_configured_headways_state", ^expected_broadcast) end - test "adds new values properly" do - {:ok, pid} = GenServer.start_link(SignsUi.Config.State, [], []) - + test "adds new values properly", %{pid: pid} do @endpoint.subscribe("headways:all") {:ok, new_state} = @@ -120,9 +124,7 @@ defmodule SignsUi.Config.StateTest do assert_broadcast("new_configured_headways_state", ^expected_broadcast) end - test "removes values properly" do - {:ok, pid} = GenServer.start_link(SignsUi.Config.State, [], []) - + test "removes values properly", %{pid: pid} do @endpoint.subscribe("headways:all") {:ok, new_state} = update_configured_headways(pid, %{}) @@ -139,9 +141,7 @@ defmodule SignsUi.Config.StateTest do end describe "update_chelsea_bridge_announcements" do - test "updates values properly" do - {:ok, pid} = GenServer.start_link(SignsUi.Config.State, [], []) - + test "updates values properly", %{pid: pid} do @endpoint.subscribe("chelseaBridgeAnnouncements:all") assert get_all(pid).chelsea_bridge_announcements == "auto" @@ -165,9 +165,7 @@ defmodule SignsUi.Config.StateTest do end describe "update_sign_groups/2" do - test "broadcasts updated sign groups after expiration" do - {:ok, pid} = GenServer.start_link(SignsUi.Config.State, [], []) - + test "broadcasts updated sign groups after expiration", %{pid: pid} do @endpoint.subscribe("signGroups:all") initial_state = %{ @@ -205,9 +203,7 @@ defmodule SignsUi.Config.StateTest do assert_broadcast("new_sign_groups_state", ^display_state) end - test "handles sequential updates" do - {:ok, pid} = GenServer.start_link(SignsUi.Config.State, [], []) - + test "handles sequential updates", %{pid: pid} do @endpoint.subscribe("signGroups:all") initial_state = %{ @@ -264,8 +260,7 @@ defmodule SignsUi.Config.StateTest do assert_broadcast("new_sign_groups_state", ^display_state) end - test "updates sign configs, too" do - {:ok, pid} = GenServer.start_link(SignsUi.Config.State, [], []) + test "updates sign configs, too", %{pid: pid} do @endpoint.subscribe("signGroups:all") @endpoint.subscribe("signs:all")