Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate from RTR to GTFS-rt #810

Merged
merged 31 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
d9c11dc
Migrate to GTFS-rt predictions and locations feeds
PaulJKim Mar 25, 2024
5419995
PR feedback
PaulJKim Mar 27, 2024
f03d8a7
simplify abstraction
PaulJKim Mar 27, 2024
948020e
Update .envrc.template
PaulJKim Mar 28, 2024
cae438f
Merge branch 'main' into pk/migrate-from-rtr-to-gtfs-rt
PaulJKim Jun 11, 2024
8251ca9
Filter out past departures and adjust last trip filtering
PaulJKim Jun 12, 2024
90f4da1
Merge branch 'pk/migrate-from-rtr-to-gtfs-rt' of https://github.com/m…
PaulJKim Jun 12, 2024
099efbe
Remove some references to RTR
PaulJKim Jun 12, 2024
a22e752
Move route_id check to a shared helper
PaulJKim Jun 13, 2024
8852743
Log prediction details for terminal predictions
PaulJKim Aug 8, 2024
5903d3f
add inspects
PaulJKim Aug 8, 2024
1726597
add inspect
PaulJKim Aug 8, 2024
41f3b10
fix typo
PaulJKim Aug 8, 2024
28a000b
only log when seconds til boarding is under terminal brd seconds
PaulJKim Aug 12, 2024
d76733e
Add some logging to stops after terminals
PaulJKim Aug 14, 2024
f7e550f
Try adding a buffer to account for potential latency between RTR and …
PaulJKim Aug 19, 2024
25f59bd
Log more details
PaulJKim Aug 20, 2024
c3e0837
Log stopped_at_predicted_stop
PaulJKim Aug 20, 2024
0f11d6b
Merge branch 'main' into pk/migrate-from-rtr-to-gtfs-rt
PaulJKim Aug 20, 2024
a67c8c3
increase buffer to account for negative departures
PaulJKim Aug 21, 2024
7fe2370
Account for skipped predictions when calculating seconds_until_passth…
PaulJKim Aug 21, 2024
135c50a
Add filter for determining if prediction has passed
PaulJKim Aug 22, 2024
b005b04
Remove extra logging
PaulJKim Aug 26, 2024
429bb18
read passthrough_time field directly
PaulJKim Nov 8, 2024
18ee9ff
oops forgot the actual variable
PaulJKim Nov 8, 2024
a4db738
don't fiter out passthroughs
PaulJKim Nov 13, 2024
9ca909b
use the right field
PaulJKim Nov 13, 2024
ef91775
Merge branch 'main' into pk/migrate-from-rtr-to-gtfs-rt
PaulJKim Nov 14, 2024
442d955
remove stops_away
PaulJKim Nov 14, 2024
79d976d
Remove required concentrate urls from envrc template
PaulJKim Nov 18, 2024
1974248
Merge branch 'main' into pk/migrate-from-rtr-to-gtfs-rt
PaulJKim Nov 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions .envrc.template
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ export API_V3_URL=https://api-dev-green.mbtace.com
#export CHELSEA_BRIDGE_URL=
#export CHELSEA_BRIDGE_AUTH=

# URLs of the enhanced trip-update and vehicle-position feeds. Default to the real feed URLs if
# not set here.
#export TRIP_UPDATE_URL=
#export VEHICLE_POSITIONS_URL=
# URLs of the enhanced trip-update and vehicle-position feeds.
#export TRIP_UPDATE_URL="https://s3.amazonaws.com/mbta-gtfs-s3/concentrate/TripUpdates_enhanced.json"
#export VEHICLE_POSITIONS_URL="https://s3.amazonaws.com/mbta-gtfs-s3/concentrate/VehiclePositions_enhanced.json"
4 changes: 2 additions & 2 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ if config_env() != :test do
trip_update_url:
System.get_env(
"TRIP_UPDATE_URL",
"https://s3.amazonaws.com/mbta-gtfs-s3/rtr/TripUpdates_enhanced.json"
"https://s3.amazonaws.com/mbta-gtfs-s3/concentrate/TripUpdates_enhanced.json"
),
vehicle_positions_url:
System.get_env(
"VEHICLE_POSITIONS_URL",
"https://s3.amazonaws.com/mbta-gtfs-s3/rtr/VehiclePositions_enhanced.json"
"https://s3.amazonaws.com/mbta-gtfs-s3/concentrate/VehiclePositions_enhanced.json"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a drive-by note, these have "official" public-facing URLs which are behind a CDN. We ended up using these in Screens LTOTD tracking rather than these internal S3 bucket URLs.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me, thanks for calling that out. Does Screens LTOTD also use dev/dev-blue versions of the CDN URLs as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only the production feeds have CDN URLs as far as I know — if we're fetching a non-prod feed those will probably have to still use S3 bucket URLs.

),
s3_bucket: System.get_env("SIGNS_S3_BUCKET"),
s3_path: System.get_env("SIGNS_S3_PATH"),
Expand Down
3 changes: 1 addition & 2 deletions lib/content/message/predictions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,11 @@ defmodule Content.Message.Predictions do
end

min = round(sec / 60)
stopped_at? = prediction.stops_away == 0
reverse_prediction? = Signs.Utilities.Predictions.reverse_prediction?(prediction, terminal?)

{minutes, approximate?} =
cond do
stopped_at? and (!terminal? or sec <= 30) -> {:boarding, false}
prediction.stopped_at_predicted_stop? and (!terminal? or sec <= 30) -> {:boarding, false}
!terminal? and sec <= 30 -> {:arriving, false}
!terminal? and sec <= 60 -> {:approaching, false}
min > 60 -> {60, true}
Expand Down
21 changes: 5 additions & 16 deletions lib/fake/httpoison.ex
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,11 @@ defmodule Fake.HTTPoison do
%{
"entity" => [
%{
"alert" => nil,
"id" => "1490783458_32568935",
"is_deleted" => false,
"trip_update" => %{
"delay" => nil,
"stop_time_update" => [
%{
"arrival" => %{
"delay" => nil,
"time" => 1_491_570_120,
"uncertainty" => nil
},
Expand All @@ -84,7 +80,6 @@ defmodule Fake.HTTPoison do
},
%{
"arrival" => %{
"delay" => nil,
"time" => 1_491_570_180,
"uncertainty" => nil
},
Expand Down Expand Up @@ -243,23 +238,17 @@ defmodule Fake.HTTPoison do
%{
"entity" => [
%{
"alert" => nil,
"id" => "1490783458_32568935",
"is_deleted" => false,
"trip_update" => %{
"delay" => nil,
"stop_time_update" => [
%{
"arrival" => %{
"delay" => nil,
"time" => 1_491_570_180,
"uncertainty" => nil
"uncertainty" => 60
},
"departure" => nil,
"schedule_relationship" => "SCHEDULED",
"stop_id" => "stop_to_update",
"stop_sequence" => 1,
"stops_away" => 0
"stop_sequence" => 1
}
],
"timestamp" => nil,
Expand All @@ -269,15 +258,15 @@ defmodule Fake.HTTPoison do
"schedule_relationship" => "SCHEDULED",
"start_date" => "20170329",
"start_time" => nil,
"trip_id" => "32568935"
"trip_id" => "32568935",
"revenue" => true
},
"vehicle" => %{
"id" => "G-10040",
"label" => "3260",
"license_plate" => nil
}
},
"vehicle" => nil
}
}
],
"header" => %{
Expand Down
7 changes: 6 additions & 1 deletion lib/predictions/last_trip.ex
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
defmodule Predictions.LastTrip do
alias Predictions.Predictions

defp get_running_trips(predictions_feed) do
predictions_feed["entity"]
|> Stream.map(& &1["trip_update"])
|> Enum.reject(&(&1["trip"]["schedule_relationship"] == "CANCELED"))
|> Stream.filter(
&(Predictions.relevant_rail_route?(&1["trip"]["route_id"]) and
&1["trip"]["schedule_relationship"] != "CANCELED")
)
end

def get_last_trips(predictions_feed) do
Expand Down
6 changes: 2 additions & 4 deletions lib/predictions/prediction.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ defmodule Predictions.Prediction do
route_id: nil,
trip_id: nil,
destination_stop_id: nil,
stopped?: false,
stops_away: 0,
stopped_at_predicted_stop?: false,
boarding_status: nil,
revenue_trip?: true,
vehicle_id: nil
Expand All @@ -30,8 +29,7 @@ defmodule Predictions.Prediction do
route_id: String.t(),
trip_id: trip_id() | nil,
destination_stop_id: String.t(),
stopped?: boolean(),
stops_away: integer(),
stopped_at_predicted_stop?: boolean(),
boarding_status: String.t() | nil,
revenue_trip?: boolean(),
vehicle_id: String.t() | nil
Expand Down
66 changes: 46 additions & 20 deletions lib/predictions/predictions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,28 @@ defmodule Predictions.Predictions do
def get_all(feed_message, current_time) do
predictions =
feed_message["entity"]
|> Enum.map(& &1["trip_update"])
|> Enum.reject(&(&1["trip"]["schedule_relationship"] == "CANCELED"))
|> Enum.flat_map(&transform_stop_time_updates/1)
|> Enum.filter(fn {update, _, _, _, _, _, _} ->
((update["arrival"] || update["departure"]) &&
not is_nil(update["stops_away"])) || update["passthrough_time"]
|> Stream.map(& &1["trip_update"])
|> Stream.filter(
&(relevant_rail_route?(&1["trip"]["route_id"]) and
&1["trip"]["schedule_relationship"] != "CANCELED")
)
|> Stream.flat_map(&transform_stop_time_updates/1)
|> Stream.filter(fn {update, _, _, _, _, _, _} ->
(update["arrival"] && update["arrival"]["uncertainty"]) ||
(update["departure"] && update["departure"]["uncertainty"]) ||
update["passthrough_time"]
end)
|> Enum.map(&prediction_from_update(&1, current_time))
|> Stream.map(&prediction_from_update(&1, current_time))
|> Enum.reject(
&(is_nil(&1.seconds_until_arrival) and is_nil(&1.seconds_until_departure) and
is_nil(&1.seconds_until_passthrough))
&((is_nil(&1.seconds_until_arrival) and is_nil(&1.seconds_until_departure) and
is_nil(&1.seconds_until_passthrough)) or
has_departed?(&1))
)

vehicles_running_revenue_trips =
predictions
|> Enum.filter(& &1.revenue_trip?)
|> Enum.map(& &1.vehicle_id)
|> Stream.filter(& &1.revenue_trip?)
|> Stream.map(& &1.vehicle_id)
|> MapSet.new()

{Enum.group_by(predictions, fn prediction ->
Expand All @@ -43,15 +48,12 @@ defmodule Predictions.Predictions do
end)
|> Map.get("stop_id")

revenue_trip? =
Enum.any?(trip_update["stop_time_update"], &(&1["schedule_relationship"] != "SKIPPED"))

vehicle_id = get_in(trip_update, ["vehicle", "id"])

Enum.map(
trip_update["stop_time_update"],
&{&1, last_stop_id, trip_update["trip"]["route_id"], trip_update["trip"]["direction_id"],
trip_update["trip"]["trip_id"], revenue_trip?, vehicle_id}
trip_update["trip"]["trip_id"], trip_update["trip"]["revenue"], vehicle_id}
)
end

Expand All @@ -67,6 +69,9 @@ defmodule Predictions.Predictions do
) do
current_time_seconds = DateTime.to_unix(current_time)

schedule_relationship =
translate_schedule_relationship(stop_time_update["schedule_relationship"])

seconds_until_arrival =
if stop_time_update["arrival"] &&
sufficient_certainty?(stop_time_update["arrival"], route_id),
Expand All @@ -84,21 +89,23 @@ defmodule Predictions.Predictions do
do: stop_time_update["passthrough_time"] - current_time_seconds,
else: nil

vehicle_location = Engine.Locations.for_vehicle(vehicle_id)

%Prediction{
stop_id: stop_time_update["stop_id"],
direction_id: direction_id,
seconds_until_arrival: max(0, seconds_until_arrival),
arrival_certainty: stop_time_update["arrival"]["uncertainty"],
seconds_until_departure: max(0, seconds_until_departure),
seconds_until_departure: seconds_until_departure,
departure_certainty: stop_time_update["departure"]["uncertainty"],
seconds_until_passthrough: max(0, seconds_until_passthrough),
schedule_relationship:
translate_schedule_relationship(stop_time_update["schedule_relationship"]),
schedule_relationship: schedule_relationship,
route_id: route_id,
trip_id: trip_id,
destination_stop_id: last_stop_id,
stopped?: stop_time_update["stopped?"],
stops_away: stop_time_update["stops_away"],
stopped_at_predicted_stop?:
not is_nil(vehicle_location) and vehicle_location.status == :stopped_at and
stop_time_update["stop_id"] == vehicle_location.stop_id,
boarding_status: stop_time_update["boarding_status"],
revenue_trip?: revenue_trip?,
vehicle_id: vehicle_id
Expand All @@ -113,6 +120,19 @@ defmodule Predictions.Predictions do
Jason.decode!(body)
end

def relevant_rail_route?(route_id) do
route_id in [
"Red",
"Blue",
"Orange",
"Green-B",
"Green-C",
"Green-D",
"Green-E",
"Mattapan"
]
end

@spec translate_schedule_relationship(String.t()) :: :skipped | :scheduled
defp translate_schedule_relationship("SKIPPED") do
:skipped
Expand All @@ -135,4 +155,10 @@ defmodule Predictions.Predictions do
true
end
end

@spec has_departed?(Predictions.Prediction.t()) :: boolean()
defp has_departed?(prediction) do
prediction.seconds_until_departure && prediction.seconds_until_departure < 0 &&
not prediction.stopped_at_predicted_stop?
end
Comment on lines +160 to +163
Copy link
Collaborator Author

@PaulJKim PaulJKim Aug 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flagging for review: This is new logic since the first attempt at migrating for more accurately filtering out departed trains.

end
7 changes: 3 additions & 4 deletions lib/signs/utilities/predictions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@ defmodule Signs.Utilities.Predictions do
{if terminal? do
0
else
case prediction.stops_away do
0 -> 0
_ -> 1
end
if prediction.stopped_at_predicted_stop?,
do: 0,
else: 1
end, prediction.seconds_until_departure, prediction.seconds_until_arrival}
end)
|> filter_large_red_line_gaps()
Expand Down
Loading
Loading