diff --git a/realtime.go b/realtime.go index a8d504d..bec109c 100644 --- a/realtime.go +++ b/realtime.go @@ -46,17 +46,16 @@ type RealtimeUpdate struct { Type parse.StopTimeUpdateScheduleRelationship } -func NewRealtime(static *Static, reader storage.FeedReader) *Realtime { - return &Realtime{ - static: static, - reader: reader, +func NewRealtime(ctx context.Context, static *Static, reader storage.FeedReader, feeds [][]byte) (*Realtime, error) { + rt := &Realtime{ + static: static, + reader: reader, + updatesByTrip: map[string][]*RealtimeUpdate{}, } -} -func (rt *Realtime) LoadData(ctx context.Context, feedData [][]byte) error { - realtime, err := parse.ParseRealtime(ctx, feedData) + realtime, err := parse.ParseRealtime(ctx, feeds) if err != nil { - return fmt.Errorf("parsing realtime feeds: %w", err) + return nil, fmt.Errorf("parsing feeds: %w", err) } rt.skippedTrips = realtime.SkippedTrips @@ -77,13 +76,13 @@ func (rt *Realtime) LoadData(ctx context.Context, feedData [][]byte) error { TripIDs: tripIDs, }) if err != nil { - return fmt.Errorf("loading stop time events: %w", err) + return nil, fmt.Errorf("loading stop time events: %w", err) } // And the static feed's timezone timezone, err := time.LoadLocation(rt.static.Metadata.Timezone) if err != nil { - return fmt.Errorf("loading static timezone: %w", err) + return nil, fmt.Errorf("loading static timezone: %w", err) } // Infer missing stop_id/stop_sequence from static data @@ -93,7 +92,7 @@ func (rt *Realtime) LoadData(ctx context.Context, feedData [][]byte) error { // StopTimeUpdates. rt.buildRealtimeUpdates(timezone, realtime.Updates, events) - return nil + return rt, nil } func (rt *Realtime) Departures( diff --git a/realtime_integration_test.go b/realtime_integration_test.go index cc3821c..23d57ea 100644 --- a/realtime_integration_test.go +++ b/realtime_integration_test.go @@ -25,8 +25,8 @@ func loadNYCFerryRealtime(t *testing.T, suffix string) *gtfs.Realtime { buf, err := ioutil.ReadFile(fmt.Sprintf("testdata/nycferry_realtime_%s", suffix)) require.NoError(t, err) - rt := gtfs.NewRealtime(static, reader) - require.NoError(t, rt.LoadData(context.Background(), [][]byte{buf})) + rt, err := gtfs.NewRealtime(context.Background(), static, reader, [][]byte{buf}) + require.NoError(t, err) return rt } diff --git a/realtime_test.go b/realtime_test.go index c489c51..f76eff8 100644 --- a/realtime_test.go +++ b/realtime_test.go @@ -108,7 +108,7 @@ type TripUpdate struct { Canceled bool } -func buildFeed(t *testing.T, tripUpdates []TripUpdate) []byte { +func buildFeed(t *testing.T, tripUpdates []TripUpdate) [][]byte { entity := make([]*p.FeedEntity, 0, len(tripUpdates)) for _, tripUpdate := range tripUpdates { @@ -190,7 +190,7 @@ func buildFeed(t *testing.T, tripUpdates []TripUpdate) []byte { panic(err) } - return data + return [][]byte{data} } // A simple Static fixture. Trips t1 and t2 cover the same three @@ -247,8 +247,6 @@ func TestRealtimeNoChanges(t *testing.T) { // This realtime feed has updates for stops on trip t1, but // none of them modify the departure time from what's already // scheduled. - static, reader := SimpleStaticFixture(t) - rt := gtfs.NewRealtime(static, reader) feed := buildFeed(t, []TripUpdate{ { TripID: "t1", @@ -266,10 +264,9 @@ func TestRealtimeNoChanges(t *testing.T) { }, }, }) - - // Load realtime data - err := rt.LoadData(context.Background(), [][]byte{feed}) - assert.Equal(t, nil, err) + static, reader := SimpleStaticFixture(t) + rt, err := gtfs.NewRealtime(context.Background(), static, reader, feed) + require.NoError(t, err) // Check s1 departures, err := rt.Departures("s1", time.Date(2020, 1, 15, 23, 0, 0, 0, time.UTC), 10*time.Minute, -1, "", -1, nil) @@ -362,8 +359,6 @@ func TestRealtimeDelayWithPropagation(t *testing.T) { // Delays on s2! For trip t1 there are no updates for s3, so // s1 delay should propagate. For trip t2, train is expected // to catch up fully before s3. - static, reader := SimpleStaticFixture(t) - rt := gtfs.NewRealtime(static, reader) feed := buildFeed(t, []TripUpdate{ { TripID: "t1", @@ -390,9 +385,9 @@ func TestRealtimeDelayWithPropagation(t *testing.T) { }, }, }) - - err := rt.LoadData(context.Background(), [][]byte{feed}) - assert.Equal(t, nil, err) + static, reader := SimpleStaticFixture(t) + rt, err := gtfs.NewRealtime(context.Background(), static, reader, feed) + require.NoError(t, err) // Check s1 departures, err := rt.Departures("s1", time.Date(2020, 1, 15, 23, 0, 0, 0, time.UTC), 10*time.Minute, -1, "", -1, nil) @@ -464,8 +459,6 @@ func TestRealtimeNoData(t *testing.T) { // t1-s2, a real delay appears, which should be propagated to // t1-s3 since no later update contradicts it. // For trip 2 we have a delay at s1 - storage, reader := SimpleStaticFixture(t) - rt := gtfs.NewRealtime(storage, reader) feed := buildFeed(t, []TripUpdate{ { // For trip t1 we've NO_DATA at s1, which @@ -511,9 +504,9 @@ func TestRealtimeNoData(t *testing.T) { }, }, }) - - err := rt.LoadData(context.Background(), [][]byte{feed}) - assert.Equal(t, nil, err) + static, reader := SimpleStaticFixture(t) + rt, err := gtfs.NewRealtime(context.Background(), static, reader, feed) + require.NoError(t, err) // Check s1 departures, err := rt.Departures("s1", time.Date(2020, 1, 15, 23, 0, 0, 0, time.UTC), 20*time.Minute, -1, "", -1, nil) @@ -582,9 +575,6 @@ func TestRealtimeNoData(t *testing.T) { // Verifies that StopTimeUpdate with SKIPPED stops work. func TestRealtimeSkippedStop(t *testing.T) { - static, reader := SimpleStaticFixture(t) - rt := gtfs.NewRealtime(static, reader) - feed := buildFeed(t, []TripUpdate{ { // Trip t1 skips stops s1 and s3. A delay is @@ -621,9 +611,9 @@ func TestRealtimeSkippedStop(t *testing.T) { }, }, }) - - err := rt.LoadData(context.Background(), [][]byte{feed}) - assert.Equal(t, nil, err) + static, reader := SimpleStaticFixture(t) + rt, err := gtfs.NewRealtime(context.Background(), static, reader, feed) + require.NoError(t, err) // Check s1. Expect t1 to skip past. t2 is delayed 30s. departures, err := rt.Departures("s1", time.Date(2020, 1, 15, 23, 0, 0, 0, time.UTC), 20*time.Minute, -1, "", -1, nil) @@ -669,8 +659,6 @@ func TestRealtimeSkippedStop(t *testing.T) { func TestRealtimeCanceledTrip(t *testing.T) { // Trip t1 is canceled, t2 runs with a delay from s2. - static, reader := SimpleStaticFixture(t) - rt := gtfs.NewRealtime(static, reader) feed := buildFeed(t, []TripUpdate{ { TripID: "t1", @@ -687,9 +675,9 @@ func TestRealtimeCanceledTrip(t *testing.T) { }, }, }) - - err := rt.LoadData(context.Background(), [][]byte{feed}) - assert.Equal(t, nil, err) + static, reader := SimpleStaticFixture(t) + rt, err := gtfs.NewRealtime(context.Background(), static, reader, feed) + require.NoError(t, err) departures, err := rt.Departures("s1", time.Date(2020, 1, 15, 23, 0, 0, 0, time.UTC), 10*time.Minute, -1, "", -1, nil) assert.Equal(t, nil, err) @@ -740,7 +728,6 @@ func TestRealtimeCanceledTrip(t *testing.T) { }, departures) // All trips are canceled! - rt = gtfs.NewRealtime(static, reader) feed = buildFeed(t, []TripUpdate{ { TripID: "t1", @@ -755,8 +742,8 @@ func TestRealtimeCanceledTrip(t *testing.T) { Canceled: true, }, }) - err = rt.LoadData(context.Background(), [][]byte{feed}) - assert.Equal(t, nil, err) + rt, err = gtfs.NewRealtime(context.Background(), static, reader, feed) + require.NoError(t, err) departures, err = rt.Departures("s1", time.Date(2020, 1, 15, 23, 0, 0, 0, time.UTC), 20*time.Minute, -1, "", -1, nil) assert.Equal(t, nil, err) @@ -779,8 +766,6 @@ func TestRealtimeCanceledTrip(t *testing.T) { func TestRealtimeTimeWindowing(t *testing.T) { // Delay some stuff, cancel some stuff. - static, reader := SimpleStaticFixture(t) - rt := gtfs.NewRealtime(static, reader) feed := buildFeed(t, []TripUpdate{ { TripID: "t1", @@ -807,8 +792,9 @@ func TestRealtimeTimeWindowing(t *testing.T) { }, }, }) - err := rt.LoadData(context.Background(), [][]byte{feed}) - assert.Equal(t, nil, err) + static, reader := SimpleStaticFixture(t) + rt, err := gtfs.NewRealtime(context.Background(), static, reader, feed) + require.NoError(t, err) // This should produce the following schedule for t1 and t2: // s1 - t1 23:00:00 @@ -925,8 +911,6 @@ func TestRealtimeTripWithLoop(t *testing.T) { }, }) - rt := gtfs.NewRealtime(static, reader) - // Let's drop in a delay at the 5th stop, skip stop 8 and stop // propagating the delay at stop 11. feed := buildFeed(t, []TripUpdate{ @@ -953,9 +937,8 @@ func TestRealtimeTripWithLoop(t *testing.T) { }, }, }) - - err := rt.LoadData(context.Background(), [][]byte{feed}) - assert.Equal(t, nil, err) + rt, err := gtfs.NewRealtime(context.Background(), static, reader, feed) + require.NoError(t, err) departures, err := rt.Departures("s1", time.Date(2020, 1, 15, 23, 0, 0, 0, time.UTC), 10*time.Minute, -1, "", -1, nil) assert.Equal(t, nil, err) @@ -1099,8 +1082,6 @@ func TestRealtimeDepartureFiltering(t *testing.T) { }, }) - rt := gtfs.NewRealtime(static, reader) - // 1 second delay on the BusSouth route feed := buildFeed(t, []TripUpdate{ { @@ -1126,9 +1107,8 @@ func TestRealtimeDepartureFiltering(t *testing.T) { }, }, }) - - err := rt.LoadData(context.Background(), [][]byte{feed}) - assert.Equal(t, nil, err) + rt, err := gtfs.NewRealtime(context.Background(), static, reader, feed) + require.NoError(t, err) // From center we have 2 departures on separate routes departures, err := rt.Departures( @@ -1267,6 +1247,7 @@ func TestRealtimeDepartureFiltering(t *testing.T) { assert.Equal(t, []gtfs.Departure{}, departures) } + func TestRealtimeLoadError(t *testing.T) { static, reader := SimpleStaticFixture(t) @@ -1280,7 +1261,8 @@ func TestRealtimeLoadError(t *testing.T) { }, }) require.NoError(t, err) - assert.NoError(t, gtfs.NewRealtime(static, reader).LoadData(context.Background(), [][]byte{data})) + _, err = gtfs.NewRealtime(context.Background(), static, reader, [][]byte{data}) + assert.NoError(t, err) // This one is not (bad version) data, err = proto.Marshal(&p.FeedMessage{ @@ -1291,19 +1273,16 @@ func TestRealtimeLoadError(t *testing.T) { }, }) require.NoError(t, err) - assert.Error(t, gtfs.NewRealtime(static, reader).LoadData(context.Background(), [][]byte{data})) + _, err = gtfs.NewRealtime(context.Background(), static, reader, [][]byte{data}) + assert.Error(t, err) } // Static departures can get pushed into a realtime request window by // an update with delay. func TestRealtimeUpdatePushingDepartureIntoWindow(t *testing.T) { - static, reader := SimpleStaticFixture(t) - rt := gtfs.NewRealtime(static, reader) - // Along trip t1, we normally have stops s1,...,s4 at 23:00, // ..., 23:03. This update adds a big delay to s3, and has s1 // departing way early. - feed := buildFeed(t, []TripUpdate{ { TripID: "t1", @@ -1321,7 +1300,9 @@ func TestRealtimeUpdatePushingDepartureIntoWindow(t *testing.T) { }, }, }) - require.NoError(t, rt.LoadData(context.Background(), [][]byte{feed})) + static, reader := SimpleStaticFixture(t) + rt, err := gtfs.NewRealtime(context.Background(), static, reader, feed) + require.NoError(t, err) // stop s1 is early, and can be found around 22:00 departures, err := rt.Departures( @@ -1372,10 +1353,6 @@ func TestRealtimeUpdatePushingDepartureIntoWindow(t *testing.T) { // "timepoint" stop_times, but at the time of this writing we're // assuming all stop_times are exact times. func TestRealtimeArrivalRecovery(t *testing.T) { - - static, reader := SimpleStaticFixture(t) - rt := gtfs.NewRealtime(static, reader) - // Delay each departure by 30s at first stop feed := buildFeed(t, []TripUpdate{ { @@ -1424,8 +1401,9 @@ func TestRealtimeArrivalRecovery(t *testing.T) { }, }, }) - - require.NoError(t, rt.LoadData(context.Background(), [][]byte{feed})) + static, reader := SimpleStaticFixture(t) + rt, err := gtfs.NewRealtime(context.Background(), static, reader, feed) + require.NoError(t, err) // Check the delays on the first stop departures, err := rt.Departures("s1", time.Date(2020, 1, 15, 23, 0, 0, 0, time.UTC), 30*time.Minute, -1, "", -1, nil)