Skip to content

Commit

Permalink
Revert "Poll alerts (#1291)"
Browse files Browse the repository at this point in the history
This reverts commit 7ebc258.
  • Loading branch information
PaulJKim authored Nov 29, 2023
1 parent 7ebc258 commit de94f5e
Show file tree
Hide file tree
Showing 12 changed files with 411 additions and 154 deletions.
6 changes: 5 additions & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ config :signs_ui, SignsUiWeb.Endpoint,
config :signs_ui,
config_store: SignsUi.Config.S3,
refresh_token_store: SignsUi.RefreshTokenStore,
http_client: HTTPoison
alert_producer: ServerSentEventStage,
alert_consumer_opts: [
name: SignsUi.Alerts.State,
subscribe_to: [AlertProducer]
]

# HTTP config
config :signs_ui, :redirect_http?, false
Expand Down
4 changes: 0 additions & 4 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,3 @@ screenplay_base_url =

config :signs_ui, SignsUiWeb.Endpoint,
screenplay_base_url: screenplay_base_url

config :signs_ui,
api_v3_url: System.get_env("API_V3_ORIGIN"),
api_v3_key: System.get_env("API_V3_KEY")
3 changes: 2 additions & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ config :signs_ui, SignsUiWeb.Endpoint,
config :signs_ui,
config_store: SignsUi.Mock.ConfigStore,
messages_api_keys: "test_user_1:test_key_1,test_user_2:test_key_2",
http_client: SignsUi.Mock.FakeHttpClient
alert_producer: SignsUi.Mock.FakeAlertProducer,
alert_consumer_opts: [name: SignsUi.Alerts.State, subscribe_to: []]

config :signs_ui, SignsUiWeb.AuthManager,
issuer: "signs_ui",
Expand Down
58 changes: 58 additions & 0 deletions lib/signs_ui/alerts/events.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
defmodule SignsUi.Alerts.Events do
@moduledoc """
Parses incoming alert data from the API into multi-route alerts for internal
storage.
"""
require Logger
alias SignsUi.Alerts.Alert

@spec parse(String.t() | list() | map()) :: [Alert.multi_route()] | Alert.multi_route()
def parse(payload) when is_binary(payload) do
payload |> Jason.decode!() |> parse()
end

def parse(payload) when is_list(payload) do
Enum.map(payload, &parse/1)
end

def parse(payload) when is_map(payload) do
id = payload["id"]

{created_at, service_effect, routes} = parse_attributes(payload["attributes"])

%{
id: id,
created_at: created_at,
service_effect: service_effect,
affected_routes: routes
}
end

@spec parse_attributes(nil | map()) ::
{DateTime.t() | nil, String.t() | nil, MapSet.t(Alert.route_id()) | nil}
defp parse_attributes(nil), do: {nil, nil, nil}

defp parse_attributes(attributes) do
case DateTime.from_iso8601(attributes["created_at"]) do
{:ok, created_at, _} ->
{created_at, attributes["service_effect"], parse_routes(attributes)}

{:error, reason} ->
Logger.error([
"Failed to parse created_at, reason=",
inspect(reason),
" attributes=",
inspect(attributes)
])

{nil, attributes["service_effect"], parse_routes(attributes)}
end
end

@spec parse_routes(map()) :: MapSet.t(Alert.route_id())
defp parse_routes(attributes) do
attributes
|> get_in([Access.key("informed_entity", []), Access.all(), "route"])
|> MapSet.new()
end
end
146 changes: 45 additions & 101 deletions lib/signs_ui/alerts/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,143 +4,87 @@ defmodule SignsUi.Alerts.State do
them with signs to auto-expire.
"""

use GenServer
use GenStage

require Logger
alias ServerSentEventStage.Event
alias SignsUi.Alerts.Alert
alias SignsUi.Alerts.Display
alias SignsUi.Alerts.Events

@type t :: %{Alert.id() => Alert.multi_route()}

@spec start_link(keyword) :: :ignore | {:error, any} | {:ok, pid}
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, [], opts)
{start_opts, init_opts} = Keyword.split(opts, [:name])
GenStage.start_link(__MODULE__, init_opts, start_opts)
end

@spec active_alert_ids(SignsUi.Alerts.State) :: MapSet.t(Alert.id())
@spec active_alert_ids(GenStage.stage()) :: MapSet.t(Alert.id())
def active_alert_ids(pid \\ __MODULE__) do
GenServer.call(pid, :active_alert_ids)
GenStage.call(pid, :active_alert_ids)
end

@spec all(SignsUi.Alerts.State) :: Display.t()
@spec all(GenStage.stage()) :: Display.t()
def all(pid \\ __MODULE__) do
GenServer.call(pid, :all)
GenStage.call(pid, :all)
end

@impl GenServer
def init(_) do
send(self(), :update)
{:ok, %{}}
@impl GenStage
def init(opts) do
{:consumer, %{}, opts}
end

@impl GenServer
@impl GenStage
def handle_call(:active_alert_ids, _from, state) do
alert_ids = state |> Map.keys() |> MapSet.new()

{:reply, alert_ids, state}
{:reply, alert_ids, [], state}
end

@impl GenServer
@impl GenStage
def handle_call(:all, _from, state) do
{:reply, Display.format_state(state), state}
{:reply, Display.format_state(state), [], state}
end

@impl GenServer
def handle_info(:update, state) do
state =
case fetch_alerts() do
{:ok, response} ->
new_state = Enum.into(parse_response(response), %{}, &{&1.id, &1})

SignsUiWeb.Endpoint.broadcast!(
"alerts:all",
"new_alert_state",
Display.format_state(new_state)
)

Logger.info(["alert_state_updated ", inspect(new_state)])
new_state

{:error, error} ->
Logger.error([
"alerts_fetch_failed, reason=",
inspect(error)
])

state
end

schedule_update(self(), 5_000)
{:noreply, state}
@impl GenStage
@spec handle_events([Event.t()], GenStage.from(), t()) :: {:noreply, [], t()}
def handle_events(events, _from, state) do
# Works in two primary phases. First, we generate fresh t using the
# provided events:
new_state = update_state(events, state)
Logger.info(["alert_state_updated ", inspect(new_state)])

# Next, we convert our internal model to the specified format:
display_state = Display.format_state(new_state)
SignsUiWeb.Endpoint.broadcast!("alerts:all", "new_alert_state", display_state)
{:noreply, [], new_state}
end

defp fetch_alerts do
http_client = Application.get_env(:signs_ui, :http_client)
url = "#{Application.get_env(:signs_ui, :api_v3_url)}/alerts"
headers = [{"x-api-key", Application.get_env(:signs_ui, :api_v3_key)}]

case http_client.get(
url,
headers,
params: %{
"filter[datetime]" => "NOW",
"filter[route_type]" => "0,1"
}
) do
{:ok, %HTTPoison.Response{status_code: 200, body: body}} ->
{:ok, body}

error ->
{:error, error}
end
@spec update_state(Event.t() | [Event.t()], t()) :: t()
defp update_state(events, state) when is_list(events) do
# This reduce combines the effects of a set of operations into a single new
# state.
Enum.reduce(events, state, &update_state(&1, &2))
end

def parse_response(response) do
response |> Jason.decode!() |> Map.get("data", []) |> Enum.map(&parse_alert/1)
defp update_state(%Event{event: "reset", data: data}, _) do
alerts = Events.parse(data)
Map.new(alerts, &{&1.id, &1})
end

def parse_alert(alert) do
id = alert["id"]

{created_at, service_effect, routes} = parse_attributes(alert["attributes"])

%{
id: id,
created_at: created_at,
service_effect: service_effect,
affected_routes: routes
}
end

@spec parse_attributes(nil | map()) ::
{DateTime.t() | nil, String.t() | nil, MapSet.t(Alert.route_id()) | nil}
defp parse_attributes(nil), do: {nil, nil, nil}

defp parse_attributes(attributes) do
case DateTime.from_iso8601(attributes["created_at"]) do
{:ok, created_at, _} ->
{created_at, attributes["service_effect"], parse_routes(attributes)}

{:error, reason} ->
Logger.error([
"Failed to parse created_at, reason=",
inspect(reason),
" attributes=",
inspect(attributes)
])

{nil, attributes["service_effect"], parse_routes(attributes)}
end
defp update_state(%Event{event: "update", data: data}, state) do
alert = Events.parse(data)
Map.put(state, alert.id, alert)
end

@spec parse_routes(map()) :: MapSet.t(Alert.route_id())
defp parse_routes(attributes) do
attributes
|> get_in([Access.key("informed_entity", []), Access.all(), "route"])
|> MapSet.new()
defp update_state(%Event{event: "add", data: data}, state) do
alert = Events.parse(data)
Map.put(state, alert.id, alert)
end

defp schedule_update(pid, ms) do
Process.send_after(pid, :update, ms)
defp update_state(%Event{event: "remove", data: data}, state) do
alert = Events.parse(data)
Map.delete(state, alert.id)
end
end
9 changes: 8 additions & 1 deletion lib/signs_ui/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,14 @@ defmodule SignsUi.Application do
{SignsUi.Signs.State, [name: SignsUi.Signs.State]},
SignsUi.Config.Expiration,
SignsUi.RefreshTokenStore,
{SignsUi.Alerts.State, [name: SignsUi.Alerts.State]}
{Application.get_env(:signs_ui, :alert_producer),
name: AlertProducer,
url:
"#{System.get_env("API_V3_ORIGIN")}/alerts?filter[datetime]=NOW&filter[route_type]=0,1",
headers: [
{"x-api-key", System.get_env("API_V3_KEY")}
]},
{SignsUi.Alerts.State, Application.get_env(:signs_ui, :alert_consumer_opts)}
]

opts = [strategy: :one_for_one, name: SignsUi.Supervisor]
Expand Down
7 changes: 0 additions & 7 deletions lib/signs_ui/mock/fake_http_client.ex

This file was deleted.

3 changes: 2 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ defmodule SignsUi.Mixfile do
{:ex_aws, "~> 2.0"},
{:lcov_ex, "~> 0.2", only: [:dev, :test], runtime: false},
{:gettext, "~> 0.11"},
{:gen_stage, "~> 1.0"},
{:server_sent_event_stage, "~> 1.1.0"},
{:castore, "~> 0.1"},
{:guardian_phoenix, "~> 2.0"},
{:guardian, "~> 2.0"},
{:httpoison, "~> 2.0"},
{:jason, "~> 1.4.0"},
{:nimble_parsec, "~> 1.0"},
{:phoenix_html, "~> 3.0"},
Expand Down
5 changes: 4 additions & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
"ex_aws_s3": {:hex, :ex_aws_s3, "2.3.3", "61412e524616ea31d3f31675d8bc4c73f277e367dee0ae8245610446f9b778aa", [:mix], [{:ex_aws, "~> 2.0", [hex: :ex_aws, repo: "hexpm", optional: false]}, {:sweet_xml, ">= 0.0.0", [hex: :sweet_xml, repo: "hexpm", optional: true]}], "hexpm", "0044f0b6f9ce925666021eafd630de64c2b3404d79c85245cc7c8a9a32d7f104"},
"expo": {:hex, :expo, "0.4.1", "1c61d18a5df197dfda38861673d392e642649a9cef7694d2f97a587b2cfb319b", [:mix], [], "hexpm", "2ff7ba7a798c8c543c12550fa0e2cbc81b95d4974c65855d8d15ba7b37a1ce47"},
"file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"},
"gen_stage": {:hex, :gen_stage, "1.2.1", "19d8b5e9a5996d813b8245338a28246307fd8b9c99d1237de199d21efc4c76a1", [:mix], [], "hexpm", "83e8be657fa05b992ffa6ac1e3af6d57aa50aace8f691fcf696ff02f8335b001"},
"gettext": {:hex, :gettext, "0.22.3", "c8273e78db4a0bb6fba7e9f0fd881112f349a3117f7f7c598fa18c66c888e524", [:mix], [{:expo, "~> 0.4.0", [hex: :expo, repo: "hexpm", optional: false]}], "hexpm", "935f23447713954a6866f1bb28c3a878c4c011e802bcd68a726f5e558e4b64bd"},
"guardian": {:hex, :guardian, "2.3.1", "2b2d78dc399a7df182d739ddc0e566d88723299bfac20be36255e2d052fd215d", [:mix], [{:jose, "~> 1.8", [hex: :jose, repo: "hexpm", optional: false]}, {:plug, "~> 1.3.3 or ~> 1.4", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "bbe241f9ca1b09fad916ad42d6049d2600bbc688aba5b3c4a6c82592a54274c3"},
"guardian_phoenix": {:hex, :guardian_phoenix, "2.0.1", "89a817265af09a6ddf7cb1e77f17ffca90cea2db10ff888375ef34502b2731b1", [:mix], [{:guardian, "~> 2.0", [hex: :guardian, repo: "hexpm", optional: false]}, {:phoenix, "~> 1.3", [hex: :phoenix, repo: "hexpm", optional: false]}], "hexpm", "21f439246715192b231f228680465d1ed5fbdf01555a4a3b17165532f5f9a08c"},
"hackney": {:hex, :hackney, "1.18.1", "f48bf88f521f2a229fc7bae88cf4f85adc9cd9bcf23b5dc8eb6a1788c662c4f6", [:rebar3], [{:certifi, "~> 2.9.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~> 6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~> 1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~> 1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.3.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "a4ecdaff44297e9b5894ae499e9a070ea1888c84afdd1fd9b7b2bc384950128e"},
"httpoison": {:hex, :httpoison, "2.2.1", "87b7ed6d95db0389f7df02779644171d7319d319178f6680438167d7b69b1f3d", [:mix], [{:hackney, "~> 1.17", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "51364e6d2f429d80e14fe4b5f8e39719cacd03eb3f9a9286e61e216feac2d2df"},
"hpax": {:hex, :hpax, "0.1.2", "09a75600d9d8bbd064cdd741f21fc06fc1f4cf3d0fcc335e5aa19be1a7235c84", [:mix], [], "hexpm", "2c87843d5a23f5f16748ebe77969880e29809580efdaccd615cd3bed628a8c13"},
"idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"},
"jason": {:hex, :jason, "1.4.0", "e855647bc964a44e2f67df589ccf49105ae039d4179db7f6271dfd3843dc27e6", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "79a3791085b2a0f743ca04cec0f7be26443738779d09302e01318f97bdb82121"},
"jose": {:hex, :jose, "1.11.5", "3bc2d75ffa5e2c941ca93e5696b54978323191988eb8d225c2e663ddfefd515e", [:mix, :rebar3], [], "hexpm", "dcd3b215bafe02ea7c5b23dafd3eb8062a5cd8f2d904fd9caa323d37034ab384"},
Expand All @@ -25,6 +26,7 @@
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"},
"mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"},
"mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"},
"mint": {:hex, :mint, "1.4.2", "50330223429a6e1260b2ca5415f69b0ab086141bc76dc2fbf34d7c389a6675b2", [:mix], [{:castore, "~> 0.1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "ce75a5bbcc59b4d7d8d70f8b2fc284b1751ffb35c7b6a6302b5192f8ab4ddd80"},
"nimble_parsec": {:hex, :nimble_parsec, "1.3.1", "2c54013ecf170e249e9291ed0a62e5832f70a476c61da16f6aac6dca0189f2af", [:mix], [], "hexpm", "2682e3c0b2eb58d90c6375fc0cc30bc7be06f365bf72608804fb9cffa5e1b167"},
"parse_trans": {:hex, :parse_trans, "3.3.1", "16328ab840cc09919bd10dab29e431da3af9e9e7e7e6f0089dd5a2d2820011d8", [:rebar3], [], "hexpm", "07cd9577885f56362d414e8c4c4e6bdf10d43a8767abb92d24cbe8b24c54888b"},
"phoenix": {:hex, :phoenix, "1.7.6", "61f0625af7c1d1923d582470446de29b008c0e07ae33d7a3859ede247ddaf59a", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:phoenix_template, "~> 1.0", [hex: :phoenix_template, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 2.0", [hex: :phoenix_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.6", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:websock_adapter, "~> 0.5.3", [hex: :websock_adapter, repo: "hexpm", optional: false]}], "hexpm", "f6b4be7780402bb060cbc6e83f1b6d3f5673b674ba73cc4a7dd47db0322dfb88"},
Expand All @@ -38,6 +40,7 @@
"plug_crypto": {:hex, :plug_crypto, "1.2.5", "918772575e48e81e455818229bf719d4ab4181fcbf7f85b68a35620f78d89ced", [:mix], [], "hexpm", "26549a1d6345e2172eb1c233866756ae44a9609bd33ee6f99147ab3fd87fd842"},
"ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"},
"sentry": {:hex, :sentry, "8.0.6", "c8de1bf0523bc120ec37d596c55260901029ecb0994e7075b0973328779ceef7", [:mix], [{:hackney, "~> 1.8", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:plug, "~> 1.6", [hex: :plug, repo: "hexpm", optional: true]}, {:plug_cowboy, "~> 2.3", [hex: :plug_cowboy, repo: "hexpm", optional: true]}], "hexpm", "051a2d0472162f3137787c7c9d6e6e4ef239de9329c8c45b1f1bf1e9379e1883"},
"server_sent_event_stage": {:hex, :server_sent_event_stage, "1.1.0", "cd5f93e1110455be569533b3c68503fce9f33bbd6e8895f5d5bc139b47d79cab", [:mix], [{:castore, "~> 0.1", [hex: :castore, repo: "hexpm", optional: true]}, {:ex_doc, "~> 0.21", [hex: :ex_doc, repo: "hexpm", optional: true]}, {:gen_stage, "~> 1.0", [hex: :gen_stage, repo: "hexpm", optional: false]}, {:mint, "~> 1.4", [hex: :mint, repo: "hexpm", optional: false]}], "hexpm", "5a5fdcd34cac962a24bcbffef843c5bb1127f0cf8f84795dcdf3c8949051ed6f"},
"sobelow": {:hex, :sobelow, "0.12.2", "45f4d500e09f95fdb5a7b94c2838d6b26625828751d9f1127174055a78542cf5", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "2f0b617dce551db651145662b84c8da4f158e7abe049a76daaaae2282df01c5d"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.6", "cf344f5692c82d2cd7554f5ec8fd961548d4fd09e7d22f5b62482e5aeaebd4b0", [:make, :mix, :rebar3], [], "hexpm", "bdb0d2471f453c88ff3908e7686f86f9be327d065cc1ec16fa4540197ea04680"},
"telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"},
Expand Down
Loading

0 comments on commit de94f5e

Please sign in to comment.