Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

realtime: handle stops pushed into time window by delay #3

Merged
merged 1 commit into from
Dec 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 71 additions & 43 deletions realtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,27 @@ type Realtime struct {
static *Static
reader storage.FeedReader

updatesByTrip map[string][]RealtimeUpdate
updatesByTrip map[string][]*RealtimeUpdate
skippedTrips map[string]bool

// TODO: These are used to expand the time window when
// querying static departures, to make sure delayed (and
// early) stops are retrieved (and then updated). Not pretty,
// and will result in larger time windows than
// necessary. Doing the same per stop is tricky, as stop
// delays propagate along the trip. Come up with a better
// approach.
minDelay time.Duration
maxDelay time.Duration
}

// Similar to parse.StopTimeUpdate, but trimmed down to what's
// necessary to serve realtime predictions. Should be suitable for
// caching and sharing with other instances.
type RealtimeUpdate struct {
StopSequence uint32
ArrivalDelay time.Duration
DepartureDelay time.Duration
StopSequence uint32
Type parse.StopTimeUpdateScheduleRelationship
}

Expand All @@ -42,6 +52,7 @@ func (rt *Realtime) LoadData(ctx context.Context, feedData [][]byte) error {
}

rt.skippedTrips = realtime.SkippedTrips
rt.updatesByTrip = map[string][]*RealtimeUpdate{}

// Retrieve Static stop time events for all trips in the realtime feed
trips := map[string]bool{}
Expand Down Expand Up @@ -72,7 +83,7 @@ func (rt *Realtime) LoadData(ctx context.Context, feedData [][]byte) error {

// Construct RealtimeUpdate objects from the parsed
// StopTimeUpdates.
rt.updatesByTrip = buildRealtimeUpdates(timezone, realtime.Updates, events)
rt.buildRealtimeUpdates(timezone, realtime.Updates, events)

return nil
}
Expand All @@ -86,11 +97,12 @@ func (rt *Realtime) Departures(
directionID int8,
routeTypes []storage.RouteType) ([]Departure, error) {

// Get the static schedule for the requested time window.
// Get the scheduled departures. Extend the window so that
// delayed (or early) departures are included.
scheduled, err := rt.static.Departures(
stopID,
windowStart,
windowLength,
windowStart.Add(-rt.maxDelay),
windowLength-rt.minDelay+rt.maxDelay,
numDepartures,
routeID,
directionID,
Expand Down Expand Up @@ -167,30 +179,29 @@ func (rt *Realtime) Departures(
departures = append(departures, dep)
case parse.StopTimeUpdateScheduled:
// SCHEDULED => update to static schedule
if updates[idx].DepartureDelay > 0 {
dep.Time = dep.Time.Add(updates[idx].DepartureDelay)
}

// If the delay pushed the departure outside
// of the requested time window, it must be
// ignored
if dep.Time.Before(windowStart) || dep.Time.After(windowStart.Add(windowLength)) {
continue
}

dep.Time = dep.Time.Add(updates[idx].DepartureDelay)
departures = append(departures, dep)
}
}

sort.Slice(departures, func(i, j int) bool {
return departures[i].Time.Before(departures[j].Time)
// Filter out departures outside of the requested time
// window. Sort by time. Done.
result := []Departure{}
for _, dep := range departures {
if dep.Time.Before(windowStart) || dep.Time.After(windowStart.Add(windowLength)) {
continue
}
result = append(result, dep)
}

sort.Slice(result, func(i, j int) bool {
return result[i].Time.Before(result[j].Time)
})

return departures, nil
return result, nil

// Missing:
//
// - Trips pushed into the time window by a delay
// - Added trips
// - Added stops (is that a thing?)
//
Expand Down Expand Up @@ -260,17 +271,13 @@ func resolveStopReferences(updates []*parse.StopTimeUpdate, events []*storage.St
}
}

// The full GTFS-rt StopTimeUpdates are great and all, but we only
// need some of the information they hold.
//
// This function takes StopTimeUpdates from a realtime feed, along
// with all associated static StopTimeEvents, and returns
// RealtimeUpdates, grouped by trip, and sorted by stop_sequence.
func buildRealtimeUpdates(
// Construct RealtimeUpdates from StopTimeUpdates and
// StopTimeEvents. Groups them by trip and stop.
func (rt *Realtime) buildRealtimeUpdates(
timezone *time.Location,
stups []*parse.StopTimeUpdate,
events []*storage.StopTimeEvent,
) map[string][]RealtimeUpdate {
) {

// Group static events by trip, and sort by stop_sequence
eventsByTrip := map[string][]*storage.StopTimeEvent{}
Expand Down Expand Up @@ -309,9 +316,12 @@ func buildRealtimeUpdates(
eventTime := upNoon.Add(-12 * time.Hour).Add(eventOffset)

return upTime.Sub(eventTime)
}

realtimeUpdates := map[string][]RealtimeUpdate{}
// NTS: should redo this to just compute diff in both
// directions, maybe guess date to cover DST switches,
// and then take the smaller one. If diff is 23h, then
// it's more likely to b 1h early than 23h late.
}

// Combine static schedule and realtime updates
for tripID, tripUpdates := range updatesByTrip {
Expand Down Expand Up @@ -339,34 +349,32 @@ func buildRealtimeUpdates(
// the stop should be skipped. No need to
// attach delays information.
if u.Type == parse.StopTimeUpdateNoData || u.Type == parse.StopTimeUpdateSkipped {
realtimeUpdates[tripID] = append(realtimeUpdates[tripID], RealtimeUpdate{
rtUp := &RealtimeUpdate{
StopSequence: u.StopSequence,
Type: u.Type,
})
}
rt.updatesByTrip[tripID] = append(rt.updatesByTrip[tripID], rtUp)
continue
}

// Type is SCHEDULED. Compute delays.
rtUp := RealtimeUpdate{
rtUp := &RealtimeUpdate{
StopSequence: u.StopSequence,
Type: u.Type,
}

if u.ArrivalIsSet {
// If exact time is provided, it takes
// precedence over delay.
// Feeds can use the timestamp to communicate delays
rtUp.ArrivalDelay = u.ArrivalDelay
if !u.ArrivalTime.IsZero() {
if !u.ArrivalTime.IsZero() && u.ArrivalDelay == 0 {
rtUp.ArrivalDelay = updateDelay(
events[ei].StopTime.ArrivalTime(),
u.ArrivalTime,
)
}
}
if u.DepartureIsSet {
// Same here: if exact time is
// provided, it takes precedene over
// delay.
// Same thing here
rtUp.DepartureDelay = u.DepartureDelay
if !u.DepartureTime.IsZero() {
rtUp.DepartureDelay = updateDelay(
Expand All @@ -379,10 +387,30 @@ func buildRealtimeUpdates(
// arrival delay applies to departure
rtUp.DepartureDelay = u.ArrivalDelay
}
if !u.ArrivalIsSet {
// Lacking Arrival data, assume
// departure delay applies to arrival
rtUp.ArrivalDelay = u.DepartureDelay
}

// Track the min and max delays observed. This
// is used to expand time window when
// searching static schedule.
if rtUp.ArrivalDelay < rt.minDelay {
rt.minDelay = rtUp.ArrivalDelay
}
if rtUp.ArrivalDelay > rt.maxDelay {
rt.maxDelay = rtUp.ArrivalDelay
}
if rtUp.DepartureDelay < rt.minDelay {
rt.minDelay = rtUp.DepartureDelay
}
if rtUp.DepartureDelay > rt.maxDelay {
rt.maxDelay = rtUp.DepartureDelay
}

rt.updatesByTrip[tripID] = append(rt.updatesByTrip[tripID], rtUp)

realtimeUpdates[tripID] = append(realtimeUpdates[tripID], rtUp)
}
}

return realtimeUpdates
}
73 changes: 72 additions & 1 deletion realtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,6 @@ func TestRealtimeTimeWindowing(t *testing.T) {
Time: time.Date(2020, 1, 15, 23, 13, 0, 0, time.UTC),
},
}, departures)

}

// Make sure we can deal with trips that have loops. In these cases,
Expand Down Expand Up @@ -1294,3 +1293,75 @@ func TestRealtimeLoadError(t *testing.T) {
require.NoError(t, err)
assert.Error(t, gtfs.NewRealtime(static, reader).LoadData(context.Background(), [][]byte{data}))
}

// Static departures can get pushed into a realtime request window by
// an update with delay.
func TestRealtimeUpdatePushingDepartureIntoWindow(t *testing.T) {
static, reader := SimpleStaticFixture(t)
rt := gtfs.NewRealtime(static, reader)

// Along trip t1, we normally have stops s1,...,s4 at 23:00,
// ..., 23:03. This update adds a big delay to s3, and has s1
// departing way early.

feed := buildFeed(t, []TripUpdate{
{
TripID: "t1",
StopUpdates: []StopUpdate{
{
StopID: "s1",
DepartureSet: true,
DepartureDelay: -3600,
},
{
StopID: "s3",
DepartureSet: true,
DepartureTime: time.Date(2020, 1, 15, 23, 59, 30, 1, time.UTC),
},
},
},
})
require.NoError(t, rt.LoadData(context.Background(), [][]byte{feed}))

// stop s1 is early, and can be found around 22:00
departures, err := rt.Departures(
"s1",
time.Date(2020, 1, 15, 21, 55, 0, 0, time.UTC),
10*time.Minute,
-1, "", -1, nil)
assert.NoError(t, err)
assert.Equal(t, 1, len(departures))
assert.Equal(t, "t1", departures[0].TripID)
assert.Equal(t, "s1", departures[0].StopID)
assert.Equal(t, time.Date(2020, 1, 15, 22, 0, 0, 0, time.UTC), departures[0].Time)

// there's no departure from s1 around the original time
departures, err = rt.Departures(
"s1",
time.Date(2020, 1, 15, 22, 55, 0, 0, time.UTC),
10*time.Minute,
-1, "", -1, nil)
assert.NoError(t, err)
assert.Equal(t, 0, len(departures))

// stop s3 is delayed, so it's not returned around 23:03
departures, err = rt.Departures(
"s3",
time.Date(2020, 1, 15, 23, 0, 0, 0, time.UTC),
10*time.Minute,
-1, "", -1, nil)
assert.NoError(t, err)
assert.Equal(t, 0, len(departures))

// but it is returned around midnight
departures, err = rt.Departures(
"s3",
time.Date(2020, 1, 15, 23, 55, 0, 0, time.UTC),
10*time.Minute,
-1, "", -1, nil)
assert.NoError(t, err)
assert.Equal(t, 1, len(departures))
assert.Equal(t, "t1", departures[0].TripID)
assert.Equal(t, "s3", departures[0].StopID)
assert.Equal(t, time.Date(2020, 1, 15, 23, 59, 30, 0, time.UTC), departures[0].Time)
}
Loading