Skip to content

Commit

Permalink
Migrate to update_type (#863)
Browse files Browse the repository at this point in the history
* Migrate to update_type

* clean up typespecs

* PR feedback

* Add back excluded prediction types

* remove accidentally added typespec
  • Loading branch information
PaulJKim authored Jan 3, 2025
1 parent 34bb060 commit cda9804
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 414 deletions.
2 changes: 0 additions & 2 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ if config_env() != :test do
scu_ip_map: System.get_env("SCU_IP_MAP", "null") |> Jason.decode!(),
chelsea_bridge_url: System.get_env("CHELSEA_BRIDGE_URL"),
chelsea_bridge_auth: System.get_env("CHELSEA_BRIDGE_AUTH"),
filter_uncertain_predictions?:
System.get_env("FILTER_UNCERTAIN_PREDICTIONS", "false") == "true",
number_of_http_updaters:
System.get_env("NUMBER_OF_HTTP_UPDATERS", "4") |> String.to_integer(),
message_log_zip_url: System.get_env("MESSAGE_LOG_ZIP_URL"),
Expand Down
3 changes: 1 addition & 2 deletions lib/content/message/predictions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,14 @@ defmodule Content.Message.Predictions do
end

min = round(sec / 60)
reverse_prediction? = Signs.Utilities.Predictions.reverse_prediction?(prediction, terminal?)

{minutes, approximate?} =
cond do
prediction.stopped_at_predicted_stop? and (!terminal? or sec <= 60) -> {:boarding, false}
!terminal? and sec <= 30 -> {:arriving, false}
!terminal? and sec <= 60 -> {:approaching, false}
min > 60 -> {60, true}
reverse_prediction? and min > 20 -> {div(min, 10) * 10, true}
prediction.type == :reverse and min > 20 -> {div(min, 10) * 10, true}
true -> {max(min, 1), false}
end

Expand Down
12 changes: 4 additions & 8 deletions lib/fake/httpoison.ex
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ defmodule Fake.HTTPoison do
"stop_time_update" => [
%{
"arrival" => %{
"time" => 1_491_570_120,
"uncertainty" => nil
"time" => 1_491_570_120
},
"departure" => nil,
"schedule_relationship" => "SCHEDULED",
Expand All @@ -80,8 +79,7 @@ defmodule Fake.HTTPoison do
},
%{
"arrival" => %{
"time" => 1_491_570_180,
"uncertainty" => nil
"time" => 1_491_570_180
},
"departure" => nil,
"schedule_relationship" => "SCHEDULED",
Expand Down Expand Up @@ -243,8 +241,7 @@ defmodule Fake.HTTPoison do
"stop_time_update" => [
%{
"arrival" => %{
"time" => 1_491_570_180,
"uncertainty" => 60
"time" => 1_491_570_180
},
"departure" => nil,
"stop_id" => "stop_to_update",
Expand Down Expand Up @@ -299,8 +296,7 @@ defmodule Fake.HTTPoison do
%{
"arrival" => %{
"delay" => nil,
"time" => 1_491_570_080,
"uncertainty" => nil
"time" => 1_491_570_080
},
"departure" => nil,
"schedule_relationship" => "SCHEDULED",
Expand Down
11 changes: 5 additions & 6 deletions lib/predictions/prediction.ex
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
defmodule Predictions.Prediction do
defstruct stop_id: nil,
seconds_until_arrival: nil,
arrival_certainty: nil,
seconds_until_departure: nil,
departure_certainty: nil,
seconds_until_passthrough: nil,
direction_id: nil,
schedule_relationship: nil,
Expand All @@ -13,16 +11,16 @@ defmodule Predictions.Prediction do
stopped_at_predicted_stop?: false,
boarding_status: nil,
revenue_trip?: true,
vehicle_id: nil
vehicle_id: nil,
type: nil

@type trip_id :: String.t()
@type prediction_type :: :mid_trip | :terminal | :reverse | nil

@type t :: %__MODULE__{
stop_id: String.t(),
seconds_until_arrival: non_neg_integer() | nil,
arrival_certainty: non_neg_integer() | nil,
seconds_until_departure: non_neg_integer() | nil,
departure_certainty: non_neg_integer() | nil,
seconds_until_passthrough: non_neg_integer() | nil,
direction_id: 0 | 1,
schedule_relationship: :scheduled | :skipped | nil,
Expand All @@ -32,6 +30,7 @@ defmodule Predictions.Prediction do
stopped_at_predicted_stop?: boolean(),
boarding_status: String.t() | nil,
revenue_trip?: boolean(),
vehicle_id: String.t() | nil
vehicle_id: String.t() | nil,
type: prediction_type()
}
end
162 changes: 90 additions & 72 deletions lib/predictions/predictions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,18 @@ defmodule Predictions.Predictions do
alias Predictions.Prediction
require Logger

@excluded_prediction_types []

@spec get_all(map(), DateTime.t()) ::
{%{
optional({String.t(), integer()}) => [Predictions.Prediction.t()]
optional({String.t(), integer()}) => [Prediction.t()]
}, MapSet.t(String.t())}
def get_all(feed_message, current_time) do
predictions =
feed_message["entity"]
|> Stream.map(& &1["trip_update"])
|> Stream.filter(
&(relevant_rail_route?(&1["trip"]["route_id"]) and
&1["trip"]["schedule_relationship"] != "CANCELED")
)
|> Stream.flat_map(&transform_stop_time_updates/1)
|> Stream.filter(fn {update, _, _, _, _, _, _} ->
(update["arrival"] && update["arrival"]["uncertainty"]) ||
(update["departure"] && update["departure"]["uncertainty"]) ||
update["passthrough_time"]
end)
|> Stream.map(&prediction_from_update(&1, current_time))
|> Enum.reject(
&((is_nil(&1.seconds_until_arrival) and is_nil(&1.seconds_until_departure) and
is_nil(&1.seconds_until_passthrough)) or
has_departed?(&1))
)
|> Stream.filter(&valid_trip_update?/1)
|> Stream.flat_map(&trip_update_to_predictions(&1, current_time))

vehicles_running_revenue_trips =
predictions
Expand All @@ -38,77 +26,90 @@ defmodule Predictions.Predictions do
end), vehicles_running_revenue_trips}
end

@spec transform_stop_time_updates(map()) :: [
{map(), String.t(), String.t(), integer(), String.t(), boolean(), String.t() | nil}
]
defp transform_stop_time_updates(trip_update) do
last_stop_id =
Enum.max_by(trip_update["stop_time_update"], fn update ->
if update["arrival"], do: update["arrival"]["time"], else: 0
end)
|> Map.get("stop_id")

vehicle_id = get_in(trip_update, ["vehicle", "id"])

Enum.map(
trip_update["stop_time_update"],
&{&1, last_stop_id, trip_update["trip"]["route_id"], trip_update["trip"]["direction_id"],
trip_update["trip"]["trip_id"], trip_update["trip"]["revenue"], vehicle_id}
)
defp valid_trip_update?(trip_update) do
relevant_rail_route?(trip_update["trip"]["route_id"]) and
trip_update["trip"]["schedule_relationship"] != "CANCELED"
end

@spec trip_update_to_predictions(map(), DateTime.t()) :: [Prediction.t()]
defp trip_update_to_predictions(trip_update, current_time) do
vehicle_id = trip_update["vehicle"]["id"]

for stop_time_update <- trip_update["stop_time_update"],
is_valid_prediction?(stop_time_update),
prediction =
build_prediction(
stop_time_update,
get_destination_stop_id(trip_update),
vehicle_id,
Engine.Locations.for_vehicle(vehicle_id),
trip_update["trip"]["route_id"],
trip_update["trip"]["direction_id"],
trip_update["trip"]["trip_id"],
trip_update["trip"]["revenue"],
get_prediction_type(trip_update["update_type"]),
DateTime.to_unix(current_time)
),
not has_departed?(prediction),
not is_excluded_prediction_type?(prediction),
do: prediction
end

@spec prediction_from_update(
{map(), String.t(), String.t(), integer(), Predictions.Prediction.trip_id(), boolean(),
String.t() | nil},
DateTime.t()
@spec build_prediction(
map(),
String.t(),
String.t(),
Locations.Location.t(),
String.t(),
integer(),
Predictions.Prediction.trip_id(),
boolean(),
atom(),
integer()
) :: Prediction.t()
defp prediction_from_update(
{stop_time_update, last_stop_id, route_id, direction_id, trip_id, revenue_trip?,
vehicle_id},
current_time
defp build_prediction(
stop_time_update,
destination_stop_id,
vehicle_id,
vehicle_location,
route_id,
direction_id,
trip_id,
revenue_trip?,
prediction_type,
current_time_seconds
) do
current_time_seconds = DateTime.to_unix(current_time)

schedule_relationship =
translate_schedule_relationship(stop_time_update["schedule_relationship"])

seconds_until_arrival =
if stop_time_update["arrival"] &&
sufficient_certainty?(stop_time_update["arrival"], route_id),
do: stop_time_update["arrival"]["time"] - current_time_seconds,
else: nil
stop_time_update["arrival"] && stop_time_update["arrival"]["time"] - current_time_seconds

seconds_until_departure =
if stop_time_update["departure"] &&
sufficient_certainty?(stop_time_update["departure"], route_id),
do: stop_time_update["departure"]["time"] - current_time_seconds,
else: nil
stop_time_update["departure"] &&
stop_time_update["departure"]["time"] - current_time_seconds

seconds_until_passthrough =
if stop_time_update["passthrough_time"],
do: stop_time_update["passthrough_time"] - current_time_seconds,
else: nil

vehicle_location = Engine.Locations.for_vehicle(vehicle_id)
stop_time_update["passthrough_time"] &&
stop_time_update["passthrough_time"] - current_time_seconds

%Prediction{
stop_id: stop_time_update["stop_id"],
direction_id: direction_id,
seconds_until_arrival: max(0, seconds_until_arrival),
arrival_certainty: stop_time_update["arrival"]["uncertainty"],
seconds_until_departure: seconds_until_departure,
departure_certainty: stop_time_update["departure"]["uncertainty"],
seconds_until_passthrough: max(0, seconds_until_passthrough),
schedule_relationship: schedule_relationship,
route_id: route_id,
trip_id: trip_id,
destination_stop_id: last_stop_id,
destination_stop_id: destination_stop_id,
stopped_at_predicted_stop?:
not is_nil(vehicle_location) and vehicle_location.status == :stopped_at and
stop_time_update["stop_id"] == vehicle_location.stop_id,
boarding_status: stop_time_update["boarding_status"],
revenue_trip?: revenue_trip?,
vehicle_id: vehicle_id
vehicle_id: vehicle_id,
type: prediction_type
}
end

Expand Down Expand Up @@ -142,23 +143,40 @@ defmodule Predictions.Predictions do
:scheduled
end

@spec sufficient_certainty?(map(), String.t()) :: boolean()
defp sufficient_certainty?(_stop_time_event, route_id)
when route_id in ["Mattapan", "Green-B", "Green-C", "Green-D", "Green-E"] do
true
defp get_destination_stop_id(trip_update) do
Enum.max_by(trip_update["stop_time_update"], fn update ->
if update["arrival"], do: update["arrival"]["time"], else: 0
end)
|> Map.get("stop_id")
end

defp sufficient_certainty?(stop_time_event, _route_id) do
if Application.get_env(:realtime_signs, :filter_uncertain_predictions?) do
is_nil(stop_time_event["uncertainty"]) or stop_time_event["uncertainty"] <= 300
else
true
@spec get_prediction_type(String.t()) :: Prediction.prediction_type()
defp get_prediction_type(update_type) do
case update_type do
"mid_trip" -> :mid_trip
"at_terminal" -> :terminal
"reverse_trip" -> :reverse
_ -> nil
end
end

@spec has_departed?(Predictions.Prediction.t()) :: boolean()
defp is_valid_prediction?(stop_time_update) do
not (is_nil(stop_time_update["arrival"]) and is_nil(stop_time_update["departure"]) and
is_nil(stop_time_update["passthrough_time"]))
end

@spec is_excluded_prediction_type?(Prediction.t()) :: boolean()
defp is_excluded_prediction_type?(prediction)
when prediction.route_id in ["Mattapan", "Green-B", "Green-C", "Green-D", "Green-E"],
do: false

defp is_excluded_prediction_type?(prediction) do
prediction.type in @excluded_prediction_types
end

@spec has_departed?(Prediction.t()) :: boolean()
defp has_departed?(prediction) do
prediction.seconds_until_departure && prediction.seconds_until_departure < 0 &&
not is_nil(prediction.seconds_until_departure) and prediction.seconds_until_departure < 0 and
not prediction.stopped_at_predicted_stop?
end
end
6 changes: 3 additions & 3 deletions lib/signs/utilities/messages.ex
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,12 @@ defmodule Signs.Utilities.Messages do
end, prediction.seconds_until_departure, prediction.seconds_until_arrival}
end)
|> then(fn predictions -> if(sign_config == :headway, do: [], else: predictions) end)
|> filter_early_am_predictions(config, current_time, scheduled)
|> filter_early_am_predictions(current_time, scheduled)
|> filter_large_red_line_gaps()
|> get_unique_destination_predictions(Signs.Utilities.SourceConfig.single_route(config))
end

defp filter_early_am_predictions(predictions, config, current_time, scheduled) do
defp filter_early_am_predictions(predictions, current_time, scheduled) do
cond do
!in_early_am?(current_time, scheduled) ->
predictions
Expand All @@ -216,7 +216,7 @@ defmodule Signs.Utilities.Messages do
# except for Prudential or Symphony EB
Enum.reject(
predictions,
&(Signs.Utilities.Predictions.reverse_prediction?(&1, config.terminal?) and
&(&1.type == :reverse and
&1.stop_id not in ["70240", "70242"])
)
end
Expand Down
Loading

0 comments on commit cda9804

Please sign in to comment.