Skip to content

Commit

Permalink
Merge pull request #1302 from GeorgeTsagk/sync-stats-cache-fix
Browse files Browse the repository at this point in the history
Aggregate Universe Stats: always store result of querySyncStats
  • Loading branch information
ffranr authored Jan 22, 2025
2 parents f340f7f + 5aeb9f5 commit 563efdc
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 6 deletions.
51 changes: 45 additions & 6 deletions tapdb/universe_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,14 +575,50 @@ func (u *UniverseStats) AggregateSyncStats(

log.Debugf("Populating aggregate sync stats")

dbStats, err := u.querySyncStats(ctx)
if err != nil {
var (
resChan = make(chan universe.AggregateStats, 1)
errChan = make(chan error, 1)
)

// We'll fire the db query in a separate go routine, with an undefined
// timeout. This way, we'll always retrieve the result regardless of
// what happens in the current context. This way we're always waiting
// for the call to complete and caching the result even if this
// function's result is an error.
go func() {
// Note: we have the statsMtx held, so even if a burst of
// requests took place in an un-cached state, this call would
// not be triggered multiple times.
dbStats, err := u.querySyncStats(context.Background())
if err != nil {
errChan <- err
return
}

log.Debugf("Retrieved aggregate sync stats: %+v", dbStats)

// We'll store the DB stats so that it can be read from cache
// later.
u.statsSnapshot.Store(&dbStats)

resChan <- dbStats
}()

var dbStats universe.AggregateStats

select {
case <-ctx.Done():
log.Debugf("Client context timeout before retrieving " +
"aggregate sync stats")
return dbStats, ctx.Err()

case err := <-errChan:
log.Debugf("Error while querying aggregate sync stats: %v", err)
return dbStats, err
}

// We'll store the DB stats then start our time after function to wipe
// the stats pointer so we'll refresh it after a period of time.
u.statsSnapshot.Store(&dbStats)
case res := <-resChan:
dbStats = res
}

// Reset the timer so we'll refresh again after the cache duration.
if u.statsRefresh != nil && !u.statsRefresh.Stop() {
Expand All @@ -591,6 +627,9 @@ func (u *UniverseStats) AggregateSyncStats(
default:
}
}

log.Debugf("Refreshing sync stats cache in %v", u.opts.cacheDuration)

u.statsRefresh = time.AfterFunc(
u.opts.cacheDuration, u.populateSyncStatsCache,
)
Expand Down
54 changes: 54 additions & 0 deletions tapdb/universe_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,60 @@ func TestUniverseStatsEvents(t *testing.T) {
})
}

// TestUniverseStatsAsyncCache tests that the cache of the universe aggregate
// stats is asynchronously populated regardless of what the outcome of the
// RPC call is.
func TestUniverseStatsAsyncCache(t *testing.T) {
t.Parallel()

db := NewTestDB(t)

yesterday := time.Now().UTC().Add(-24 * time.Hour)
testClock := clock.NewTestClock(yesterday)
statsDB, _ := newUniverseStatsWithDB(db.BaseDB, testClock)

const numTranches = 3

sh := newUniStatsHarness(t, numTranches, db.BaseDB, statsDB)

// Record the number of groups in this asset.
var numGroups uint64
for i := 0; i < numTranches; i++ {
if sh.universeLeaves[i].Leaf.GroupKey != nil {
numGroups++
}
}

// First let's make sure the cache is empty. This should be the case as
// no calls have been made so far.
val := sh.db.statsSnapshot.Load()
require.Nil(t, val)

const (
quickTimeoutDuration = time.Microsecond * 1
defaultTick = time.Millisecond * 250
)

// We now create a client context with a very quick timeout. This is
// meant to quickly fail the RPC call.
ctx, cancel := context.WithTimeout(
context.Background(), quickTimeoutDuration,
)
defer cancel()

// The tiny timeout duration should make the following call result in a
// context deadline related error.
_, err := sh.db.AggregateSyncStats(ctx)
require.ErrorContains(t, err, "context deadline exceeded")

// Regardless of the above call failing, the cache should asynchronously
// get updated in the background, so let's wait until a value is loaded.
require.Eventually(t, func() bool {
val := sh.db.statsSnapshot.Load()
return val != nil
}, DefaultStoreTimeout, defaultTick)
}

// TestUniverseQuerySyncStatsSorting tests that we're able to properly sort the
// response using any of the available params.
func TestUniverseQuerySyncStatsSorting(t *testing.T) {
Expand Down

0 comments on commit 563efdc

Please sign in to comment.