Skip to content

Commit

Permalink
realtime: slight changes to interface
Browse files Browse the repository at this point in the history
  • Loading branch information
matslina committed Dec 17, 2023
1 parent 0fc48a4 commit c560aba
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 72 deletions.
21 changes: 10 additions & 11 deletions realtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,16 @@ type RealtimeUpdate struct {
Type parse.StopTimeUpdateScheduleRelationship
}

func NewRealtime(static *Static, reader storage.FeedReader) *Realtime {
return &Realtime{
static: static,
reader: reader,
func NewRealtime(ctx context.Context, static *Static, reader storage.FeedReader, feeds [][]byte) (*Realtime, error) {
rt := &Realtime{
static: static,
reader: reader,
updatesByTrip: map[string][]*RealtimeUpdate{},
}
}

func (rt *Realtime) LoadData(ctx context.Context, feedData [][]byte) error {
realtime, err := parse.ParseRealtime(ctx, feedData)
realtime, err := parse.ParseRealtime(ctx, feeds)
if err != nil {
return fmt.Errorf("parsing realtime feeds: %w", err)
return nil, fmt.Errorf("parsing feeds: %w", err)
}

rt.skippedTrips = realtime.SkippedTrips
Expand All @@ -77,13 +76,13 @@ func (rt *Realtime) LoadData(ctx context.Context, feedData [][]byte) error {
TripIDs: tripIDs,
})
if err != nil {
return fmt.Errorf("loading stop time events: %w", err)
return nil, fmt.Errorf("loading stop time events: %w", err)
}

// And the static feed's timezone
timezone, err := time.LoadLocation(rt.static.Metadata.Timezone)
if err != nil {
return fmt.Errorf("loading static timezone: %w", err)
return nil, fmt.Errorf("loading static timezone: %w", err)
}

// Infer missing stop_id/stop_sequence from static data
Expand All @@ -93,7 +92,7 @@ func (rt *Realtime) LoadData(ctx context.Context, feedData [][]byte) error {
// StopTimeUpdates.
rt.buildRealtimeUpdates(timezone, realtime.Updates, events)

return nil
return rt, nil
}

func (rt *Realtime) Departures(
Expand Down
4 changes: 2 additions & 2 deletions realtime_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ func loadNYCFerryRealtime(t *testing.T, suffix string) *gtfs.Realtime {
buf, err := ioutil.ReadFile(fmt.Sprintf("testdata/nycferry_realtime_%s", suffix))
require.NoError(t, err)

rt := gtfs.NewRealtime(static, reader)
require.NoError(t, rt.LoadData(context.Background(), [][]byte{buf}))
rt, err := gtfs.NewRealtime(context.Background(), static, reader, [][]byte{buf})
require.NoError(t, err)

return rt
}
Expand Down
96 changes: 37 additions & 59 deletions realtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ type TripUpdate struct {
Canceled bool
}

func buildFeed(t *testing.T, tripUpdates []TripUpdate) []byte {
func buildFeed(t *testing.T, tripUpdates []TripUpdate) [][]byte {
entity := make([]*p.FeedEntity, 0, len(tripUpdates))

for _, tripUpdate := range tripUpdates {
Expand Down Expand Up @@ -190,7 +190,7 @@ func buildFeed(t *testing.T, tripUpdates []TripUpdate) []byte {
panic(err)
}

return data
return [][]byte{data}
}

// A simple Static fixture. Trips t1 and t2 cover the same three
Expand Down Expand Up @@ -247,8 +247,6 @@ func TestRealtimeNoChanges(t *testing.T) {
// This realtime feed has updates for stops on trip t1, but
// none of them modify the departure time from what's already
// scheduled.
static, reader := SimpleStaticFixture(t)
rt := gtfs.NewRealtime(static, reader)
feed := buildFeed(t, []TripUpdate{
{
TripID: "t1",
Expand All @@ -266,10 +264,9 @@ func TestRealtimeNoChanges(t *testing.T) {
},
},
})

// Load realtime data
err := rt.LoadData(context.Background(), [][]byte{feed})
assert.Equal(t, nil, err)
static, reader := SimpleStaticFixture(t)
rt, err := gtfs.NewRealtime(context.Background(), static, reader, feed)
require.NoError(t, err)

// Check s1
departures, err := rt.Departures("s1", time.Date(2020, 1, 15, 23, 0, 0, 0, time.UTC), 10*time.Minute, -1, "", -1, nil)
Expand Down Expand Up @@ -362,8 +359,6 @@ func TestRealtimeDelayWithPropagation(t *testing.T) {
// Delays on s2! For trip t1 there are no updates for s3, so
// s1 delay should propagate. For trip t2, train is expected
// to catch up fully before s3.
static, reader := SimpleStaticFixture(t)
rt := gtfs.NewRealtime(static, reader)
feed := buildFeed(t, []TripUpdate{
{
TripID: "t1",
Expand All @@ -390,9 +385,9 @@ func TestRealtimeDelayWithPropagation(t *testing.T) {
},
},
})

err := rt.LoadData(context.Background(), [][]byte{feed})
assert.Equal(t, nil, err)
static, reader := SimpleStaticFixture(t)
rt, err := gtfs.NewRealtime(context.Background(), static, reader, feed)
require.NoError(t, err)

// Check s1
departures, err := rt.Departures("s1", time.Date(2020, 1, 15, 23, 0, 0, 0, time.UTC), 10*time.Minute, -1, "", -1, nil)
Expand Down Expand Up @@ -464,8 +459,6 @@ func TestRealtimeNoData(t *testing.T) {
// t1-s2, a real delay appears, which should be propagated to
// t1-s3 since no later update contradicts it.
// For trip 2 we have a delay at s1
storage, reader := SimpleStaticFixture(t)
rt := gtfs.NewRealtime(storage, reader)
feed := buildFeed(t, []TripUpdate{
{
// For trip t1 we've NO_DATA at s1, which
Expand Down Expand Up @@ -511,9 +504,9 @@ func TestRealtimeNoData(t *testing.T) {
},
},
})

err := rt.LoadData(context.Background(), [][]byte{feed})
assert.Equal(t, nil, err)
static, reader := SimpleStaticFixture(t)
rt, err := gtfs.NewRealtime(context.Background(), static, reader, feed)
require.NoError(t, err)

// Check s1
departures, err := rt.Departures("s1", time.Date(2020, 1, 15, 23, 0, 0, 0, time.UTC), 20*time.Minute, -1, "", -1, nil)
Expand Down Expand Up @@ -582,9 +575,6 @@ func TestRealtimeNoData(t *testing.T) {

// Verifies that StopTimeUpdate with SKIPPED stops work.
func TestRealtimeSkippedStop(t *testing.T) {
static, reader := SimpleStaticFixture(t)
rt := gtfs.NewRealtime(static, reader)

feed := buildFeed(t, []TripUpdate{
{
// Trip t1 skips stops s1 and s3. A delay is
Expand Down Expand Up @@ -621,9 +611,9 @@ func TestRealtimeSkippedStop(t *testing.T) {
},
},
})

err := rt.LoadData(context.Background(), [][]byte{feed})
assert.Equal(t, nil, err)
static, reader := SimpleStaticFixture(t)
rt, err := gtfs.NewRealtime(context.Background(), static, reader, feed)
require.NoError(t, err)

// Check s1. Expect t1 to skip past. t2 is delayed 30s.
departures, err := rt.Departures("s1", time.Date(2020, 1, 15, 23, 0, 0, 0, time.UTC), 20*time.Minute, -1, "", -1, nil)
Expand Down Expand Up @@ -669,8 +659,6 @@ func TestRealtimeSkippedStop(t *testing.T) {
func TestRealtimeCanceledTrip(t *testing.T) {

// Trip t1 is canceled, t2 runs with a delay from s2.
static, reader := SimpleStaticFixture(t)
rt := gtfs.NewRealtime(static, reader)
feed := buildFeed(t, []TripUpdate{
{
TripID: "t1",
Expand All @@ -687,9 +675,9 @@ func TestRealtimeCanceledTrip(t *testing.T) {
},
},
})

err := rt.LoadData(context.Background(), [][]byte{feed})
assert.Equal(t, nil, err)
static, reader := SimpleStaticFixture(t)
rt, err := gtfs.NewRealtime(context.Background(), static, reader, feed)
require.NoError(t, err)

departures, err := rt.Departures("s1", time.Date(2020, 1, 15, 23, 0, 0, 0, time.UTC), 10*time.Minute, -1, "", -1, nil)
assert.Equal(t, nil, err)
Expand Down Expand Up @@ -740,7 +728,6 @@ func TestRealtimeCanceledTrip(t *testing.T) {
}, departures)

// All trips are canceled!
rt = gtfs.NewRealtime(static, reader)
feed = buildFeed(t, []TripUpdate{
{
TripID: "t1",
Expand All @@ -755,8 +742,8 @@ func TestRealtimeCanceledTrip(t *testing.T) {
Canceled: true,
},
})
err = rt.LoadData(context.Background(), [][]byte{feed})
assert.Equal(t, nil, err)
rt, err = gtfs.NewRealtime(context.Background(), static, reader, feed)
require.NoError(t, err)

departures, err = rt.Departures("s1", time.Date(2020, 1, 15, 23, 0, 0, 0, time.UTC), 20*time.Minute, -1, "", -1, nil)
assert.Equal(t, nil, err)
Expand All @@ -779,8 +766,6 @@ func TestRealtimeCanceledTrip(t *testing.T) {
func TestRealtimeTimeWindowing(t *testing.T) {

// Delay some stuff, cancel some stuff.
static, reader := SimpleStaticFixture(t)
rt := gtfs.NewRealtime(static, reader)
feed := buildFeed(t, []TripUpdate{
{
TripID: "t1",
Expand All @@ -807,8 +792,9 @@ func TestRealtimeTimeWindowing(t *testing.T) {
},
},
})
err := rt.LoadData(context.Background(), [][]byte{feed})
assert.Equal(t, nil, err)
static, reader := SimpleStaticFixture(t)
rt, err := gtfs.NewRealtime(context.Background(), static, reader, feed)
require.NoError(t, err)

// This should produce the following schedule for t1 and t2:
// s1 - t1 23:00:00
Expand Down Expand Up @@ -925,8 +911,6 @@ func TestRealtimeTripWithLoop(t *testing.T) {
},
})

rt := gtfs.NewRealtime(static, reader)

// Let's drop in a delay at the 5th stop, skip stop 8 and stop
// propagating the delay at stop 11.
feed := buildFeed(t, []TripUpdate{
Expand All @@ -953,9 +937,8 @@ func TestRealtimeTripWithLoop(t *testing.T) {
},
},
})

err := rt.LoadData(context.Background(), [][]byte{feed})
assert.Equal(t, nil, err)
rt, err := gtfs.NewRealtime(context.Background(), static, reader, feed)
require.NoError(t, err)

departures, err := rt.Departures("s1", time.Date(2020, 1, 15, 23, 0, 0, 0, time.UTC), 10*time.Minute, -1, "", -1, nil)
assert.Equal(t, nil, err)
Expand Down Expand Up @@ -1099,8 +1082,6 @@ func TestRealtimeDepartureFiltering(t *testing.T) {
},
})

rt := gtfs.NewRealtime(static, reader)

// 1 second delay on the BusSouth route
feed := buildFeed(t, []TripUpdate{
{
Expand All @@ -1126,9 +1107,8 @@ func TestRealtimeDepartureFiltering(t *testing.T) {
},
},
})

err := rt.LoadData(context.Background(), [][]byte{feed})
assert.Equal(t, nil, err)
rt, err := gtfs.NewRealtime(context.Background(), static, reader, feed)
require.NoError(t, err)

// From center we have 2 departures on separate routes
departures, err := rt.Departures(
Expand Down Expand Up @@ -1267,6 +1247,7 @@ func TestRealtimeDepartureFiltering(t *testing.T) {
assert.Equal(t, []gtfs.Departure{}, departures)

}

func TestRealtimeLoadError(t *testing.T) {
static, reader := SimpleStaticFixture(t)

Expand All @@ -1280,7 +1261,8 @@ func TestRealtimeLoadError(t *testing.T) {
},
})
require.NoError(t, err)
assert.NoError(t, gtfs.NewRealtime(static, reader).LoadData(context.Background(), [][]byte{data}))
_, err = gtfs.NewRealtime(context.Background(), static, reader, [][]byte{data})
assert.NoError(t, err)

// This one is not (bad version)
data, err = proto.Marshal(&p.FeedMessage{
Expand All @@ -1291,19 +1273,16 @@ func TestRealtimeLoadError(t *testing.T) {
},
})
require.NoError(t, err)
assert.Error(t, gtfs.NewRealtime(static, reader).LoadData(context.Background(), [][]byte{data}))
_, err = gtfs.NewRealtime(context.Background(), static, reader, [][]byte{data})
assert.Error(t, err)
}

// Static departures can get pushed into a realtime request window by
// an update with delay.
func TestRealtimeUpdatePushingDepartureIntoWindow(t *testing.T) {
static, reader := SimpleStaticFixture(t)
rt := gtfs.NewRealtime(static, reader)

// Along trip t1, we normally have stops s1,...,s4 at 23:00,
// ..., 23:03. This update adds a big delay to s3, and has s1
// departing way early.

feed := buildFeed(t, []TripUpdate{
{
TripID: "t1",
Expand All @@ -1321,7 +1300,9 @@ func TestRealtimeUpdatePushingDepartureIntoWindow(t *testing.T) {
},
},
})
require.NoError(t, rt.LoadData(context.Background(), [][]byte{feed}))
static, reader := SimpleStaticFixture(t)
rt, err := gtfs.NewRealtime(context.Background(), static, reader, feed)
require.NoError(t, err)

// stop s1 is early, and can be found around 22:00
departures, err := rt.Departures(
Expand Down Expand Up @@ -1372,10 +1353,6 @@ func TestRealtimeUpdatePushingDepartureIntoWindow(t *testing.T) {
// "timepoint" stop_times, but at the time of this writing we're
// assuming all stop_times are exact times.
func TestRealtimeArrivalRecovery(t *testing.T) {

static, reader := SimpleStaticFixture(t)
rt := gtfs.NewRealtime(static, reader)

// Delay each departure by 30s at first stop
feed := buildFeed(t, []TripUpdate{
{
Expand Down Expand Up @@ -1424,8 +1401,9 @@ func TestRealtimeArrivalRecovery(t *testing.T) {
},
},
})

require.NoError(t, rt.LoadData(context.Background(), [][]byte{feed}))
static, reader := SimpleStaticFixture(t)
rt, err := gtfs.NewRealtime(context.Background(), static, reader, feed)
require.NoError(t, err)

// Check the delays on the first stop
departures, err := rt.Departures("s1", time.Date(2020, 1, 15, 23, 0, 0, 0, time.UTC), 30*time.Minute, -1, "", -1, nil)
Expand Down

0 comments on commit c560aba

Please sign in to comment.