Skip to content

Commit

Permalink
Rt tripupdate reports (#294)
Browse files Browse the repository at this point in the history
  • Loading branch information
irees authored Dec 23, 2023
1 parent e7727cc commit ea3acee
Show file tree
Hide file tree
Showing 12 changed files with 366 additions and 80 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test-and-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
- name: Produce coverage report
run: go tool cover -html=c.out -o coverage.html
- name: Save coverage report as artifact
uses: actions/upload-artifact@master
uses: actions/upload-artifact@v3
with:
name: coverage
path: coverage.html
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
BEGIN;

CREATE TABLE tl_validation_reports (
id bigserial primary key NOT NULL,
created_at timestamp without time zone DEFAULT now() NOT NULL,
updated_at timestamp without time zone DEFAULT now() NOT NULL,
reported_at timestamp without time zone NOT NULL,
feed_version_id bigint REFERENCES feed_versions(id) NOT NULL,
file text
);
CREATE INDEX ON tl_validation_reports(feed_version_id);

CREATE TABLE tl_validation_trip_update_stats (
id bigserial primary key NOT NULL,
validation_report_id bigint REFERENCES tl_validation_reports(id) NOT NULL,
agency_id text NOT NULL,
route_id text NOT NULL,
trip_scheduled_ids jsonb,
trip_scheduled_count int NOT NULL,
trip_match_count int NOT NULL
);
CREATE INDEX ON tl_validation_trip_update_stats(validation_report_id);

CREATE TABLE tl_validation_vehicle_position_stats (
id bigserial primary key NOT NULL,
validation_report_id bigint REFERENCES tl_validation_reports(id) NOT NULL,
agency_id text NOT NULL,
route_id text NOT NULL,
trip_scheduled_ids jsonb,
trip_scheduled_count int NOT NULL,
trip_match_count int NOT NULL
);
CREATE INDEX ON tl_validation_vehicle_position_stats(validation_report_id);


COMMIT;
30 changes: 29 additions & 1 deletion internal/schema/sqlite.sql
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,35 @@ CREATE TABLE gtfs_rider_categories (




CREATE TABLE tl_validation_reports (
"id" integer primary key autoincrement,
"feed_version_id" int,
"created_at" datetime DEFAULT CURRENT_TIMESTAMP NOT NULL,
"updated_at" datetime DEFAULT CURRENT_TIMESTAMP NOT NULL,
"reported_at" datetime NOT NULL,
"file" varchar(255)
);

CREATE TABLE tl_validation_trip_update_stats (
"id" integer primary key autoincrement,
"validation_report_id" int NOT NULL,
"agency_id" varchar(255) NOT NULL,
"route_id" varchar(255) NOT NULL,
"trip_scheduled_ids" varchar(255),
"trip_scheduled_count" int NOT NULL,
"trip_match_count" int NOT NULL
);


CREATE TABLE tl_validation_vehicle_position_stats (
"id" integer primary key autoincrement,
"validation_report_id" int NOT NULL,
"agency_id" varchar(255) NOT NULL,
"route_id" varchar(255) NOT NULL,
"trip_scheduled_ids" varchar(255),
"trip_scheduled_count" int NOT NULL,
"trip_match_count" int NOT NULL
);



Expand Down
3 changes: 3 additions & 0 deletions rt/rt.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ func ReadURL(address string, opts ...request.RequestOption) (*pb.FeedMessage, er
if err != nil {
return nil, err
}
if fr.FetchError != nil {
return nil, err
}
msg := pb.FeedMessage{}
data := fr.Data
if err := FlexDecode(data, &msg); err != nil {
Expand Down
40 changes: 20 additions & 20 deletions rt/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,10 +416,11 @@ func (fi *Validator) validatePosition(pos *pb.Position) (errs []error) {
}

type VehiclePositionStats struct {
RouteID string
AgencyID string
TripScheduledCount int
TripMatchCount int
RouteID string `json:"route_id"`
AgencyID string `json:"agency_id"`
TripScheduledIDs []string `json:"trip_scheduled_ids"`
TripScheduledCount int `json:"trip_scheduled_count"`
TripMatchCount int `json:"trip_match_count"`
}

func (fi *Validator) VehiclePositionStats(now time.Time, msg *pb.FeedMessage) ([]VehiclePositionStats, error) {
Expand All @@ -430,21 +431,15 @@ func (fi *Validator) VehiclePositionStats(now time.Time, msg *pb.FeedMessage) ([
continue
}
pos := vp.GetPosition()
posPt := xy.Point{Lon: float64(pos.GetLongitude()), Lat: float64(pos.GetLatitude())}
if td := vp.Trip; td != nil && pos != nil {
tripId := td.GetTripId()
trip, ok := fi.tripInfo[tripId]
shp := fi.geomCache.GetShape(trip.ShapeID)
tripHasPosition[tripId] = true
if ok && trip.ShapeID != "" && len(shp) > 0 {
// fmt.Println("Vehicle position:", posPt)
nearestPoint, _ := xy.LineClosestPoint(shp, posPt)
_ = nearestPoint
// fmt.Println("\ttrip:", tripId, "shape:", trip.ShapeID)
// fmt.Println("\tnearestPoint:", nearestPoint, "dist:", xy.DistanceHaversine(nearestPoint.Lon, nearestPoint.Lat, posPt.Lon, posPt.Lat))
}
}
}
// Return early if no VehiclePositions
if len(tripHasPosition) == 0 {
return nil, nil
}
type statAggKey struct {
RouteID string
AgencyID string
Expand All @@ -459,6 +454,7 @@ func (fi *Validator) VehiclePositionStats(now time.Time, msg *pb.FeedMessage) ([
stat := statAgg[k]
stat.AgencyID = k.AgencyID
stat.RouteID = k.RouteID
stat.TripScheduledIDs = append(stat.TripScheduledIDs, tripId)
stat.TripScheduledCount += 1
if tripHasPosition[tripId] {
stat.TripMatchCount += 1
Expand Down Expand Up @@ -522,11 +518,11 @@ func (fi *Validator) ActiveTrips(now time.Time) []string {
}

type TripUpdateStats struct {
AgencyID string `json:"agency_id"`
RouteID string `json:"route_id"`
TripScheduledCount int `json:"trip_scheduled_count"`
TripMatchCount int `json:"trip_match_count"`
Date time.Time `json:"date"`
AgencyID string `json:"agency_id"`
RouteID string `json:"route_id"`
TripScheduledIDs []string `json:"trip_scheduled_ids"`
TripScheduledCount int `json:"trip_scheduled_count"`
TripMatchCount int `json:"trip_match_count"`
}

func (fi *Validator) TripUpdateStats(now time.Time, msg *pb.FeedMessage) ([]TripUpdateStats, error) {
Expand All @@ -538,6 +534,10 @@ func (fi *Validator) TripUpdateStats(now time.Time, msg *pb.FeedMessage) ([]Trip
}
tripHasUpdate[tu.GetTrip().GetTripId()] = true
}
// Return early if no TripUpdates
if len(tripHasUpdate) == 0 {
return nil, nil
}
type statAggKey struct {
AgencyID string
RouteID string
Expand All @@ -552,7 +552,7 @@ func (fi *Validator) TripUpdateStats(now time.Time, msg *pb.FeedMessage) ([]Trip
stat := statAgg[k]
stat.AgencyID = k.AgencyID
stat.RouteID = k.RouteID
stat.Date = now
stat.TripScheduledIDs = append(stat.TripScheduledIDs, tripId)
stat.TripScheduledCount += 1
if tripHasUpdate[tripId] {
stat.TripMatchCount += 1
Expand Down
8 changes: 8 additions & 0 deletions tl/tt/strings.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ import (
// Strings helps read and write []String as JSON
type Strings []String

func NewStrings(v []string) Strings {
s := Strings{}
for _, a := range v {
s = append(s, NewString(a))
}
return s
}

func (r Strings) Value() (driver.Value, error) {
return json.Marshal(r)
}
Expand Down
3 changes: 3 additions & 0 deletions tldb/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ func (adapter *PostgresAdapter) Insert(ent interface{}) (int, error) {
if err != nil {
return 0, err
}
if v, ok := ent.(canSetID); ok {
v.SetID(int(eid.Int64))
}
return int(eid.Int64), err
}

Expand Down
3 changes: 3 additions & 0 deletions tldb/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ func (adapter *SQLiteAdapter) Insert(ent interface{}) (int, error) {
return 0, err
}
eid, err := result.LastInsertId()
if v, ok := ent.(canSetID); ok {
v.SetID(int(eid))
}
return int(eid), err
}

Expand Down
113 changes: 113 additions & 0 deletions validator/result.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
package validator

import (
"bytes"
"context"
"encoding/json"
"fmt"
"time"

"github.com/interline-io/transitland-lib/copier"
"github.com/interline-io/transitland-lib/dmfr"
"github.com/interline-io/transitland-lib/dmfr/store"
"github.com/interline-io/transitland-lib/rt"
"github.com/interline-io/transitland-lib/tl"
"github.com/interline-io/transitland-lib/tl/tt"
"github.com/interline-io/transitland-lib/tldb"
)

// Result contains a validation report result,
Expand Down Expand Up @@ -33,3 +42,107 @@ type RealtimeResult struct {
VehiclePositionStats []rt.VehiclePositionStats `json:"vehicle_position_stats"`
Errors []error
}

func (r *Result) Key() string {
return fmt.Sprintf("report-%s-%d.json", r.SHA1, time.Now().In(time.UTC).Unix())
}

func SaveValidationReport(atx tldb.Adapter, result *Result, reportedAt time.Time, fvid int, saveStatic bool, saveRealtimeStats bool, reportStorage string) error {
// Save validation reports
validationReport := ValidationReport{}
validationReport.FeedVersionID = fvid
validationReport.ReportedAt = tt.NewTime(reportedAt)

// Save JSON
if reportStorage != "" {
validationReport.File = tt.NewString(result.Key())
store, err := store.GetStore(reportStorage)
if err != nil {
return err
}
jj, err := json.Marshal(result)
if err != nil {
return err
}
jb := bytes.NewReader(jj)
if err := store.Upload(context.Background(), validationReport.File.Val, tl.Secret{}, jb); err != nil {
return err
}
}

// Save record
if _, err := atx.Insert(&validationReport); err != nil {
return err
}

// Save additional stats
if saveRealtimeStats {
for _, r := range result.Realtime {
for _, s := range r.TripUpdateStats {
tripReport := ValidationReportTripUpdateStat{
ValidationReportID: validationReport.ID,
AgencyID: s.AgencyID,
RouteID: s.RouteID,
TripScheduledCount: s.TripScheduledCount,
TripMatchCount: s.TripMatchCount,
TripScheduledIDs: tt.NewStrings(s.TripScheduledIDs),
}
if _, err := atx.Insert(&tripReport); err != nil {
return err
}
}
for _, s := range r.VehiclePositionStats {
vpReport := ValidationReportTripUpdateStat{
ValidationReportID: validationReport.ID,
AgencyID: s.AgencyID,
RouteID: s.RouteID,
TripScheduledCount: s.TripScheduledCount,
TripMatchCount: s.TripMatchCount,
TripScheduledIDs: tt.NewStrings(s.TripScheduledIDs),
}
if _, err := atx.Insert(&vpReport); err != nil {
return err
}
}
}
}
return nil
}

type ValidationReport struct {
ReportedAt tt.Time
File tt.String
tl.BaseEntity
}

func (e *ValidationReport) TableName() string {
return "tl_validation_reports"
}

type ValidationReportTripUpdateStat struct {
ValidationReportID int
AgencyID string
RouteID string
TripScheduledCount int
TripMatchCount int
TripScheduledIDs tt.Strings `db:"trip_scheduled_ids"`
tl.DatabaseEntity
}

func (e *ValidationReportTripUpdateStat) TableName() string {
return "tl_validation_trip_update_stats"
}

type ValidationReportVehiclePositionStat struct {
ValidationReportID int
AgencyID string
RouteID string
TripScheduledCount int
TripMatchCount int
TripScheduledIDs tt.Strings `db:"trip_scheduled_ids"`
tl.DatabaseEntity
}

func (e *ValidationReportVehiclePositionStat) TableName() string {
return "tl_validation_vehicle_position_stats"
}
52 changes: 52 additions & 0 deletions validator/result_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package validator

import (
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"testing"
"time"

"github.com/interline-io/transitland-lib/internal/testdb"
"github.com/interline-io/transitland-lib/internal/testutil"
"github.com/interline-io/transitland-lib/tlcsv"
"github.com/interline-io/transitland-lib/tldb"
)

func TestSaveValidationReport(t *testing.T) {
reader, err := tlcsv.NewReader(testutil.RelPath("test/data/rt/ct.zip"))
if err != nil {
t.Fatal(err)
}
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
buf, err := os.ReadFile(testutil.RelPath(filepath.Join("test/data/rt", r.URL.Path)))
if err != nil {
t.Error(err)
}
w.Write(buf)
}))

tz, _ := time.LoadLocation("America/Los_Angeles")
now := time.Date(2023, 11, 7, 17, 05, 0, 0, tz)
opts := Options{
IncludeRealtimeJson: true,
EvaluateAt: now,
ValidateRealtimeMessages: []string{
ts.URL + "/ct-trip-updates.pb.json",
ts.URL + "/ct-vehicle-positions.pb.json",
},
}

v, _ := NewValidator(reader, opts)
result, err := v.Validate()
if err != nil {
t.Fatal(err)
}
testdb.TempSqlite(func(atx tldb.Adapter) error {
if err := SaveValidationReport(atx, result, now, 1, true, true, ""); err != nil {
t.Fatal(err)
}
return nil
})
}
Loading

0 comments on commit ea3acee

Please sign in to comment.