Skip to content

Commit

Permalink
feat(jobs): Add stale timeout check to subscription jobs (#96)
Browse files Browse the repository at this point in the history
* stale timeout check

* fix timer reset

* format

* tag

* bet

* num retries

* unsubscribe on stale
  • Loading branch information
calbera authored May 30, 2024
1 parent 4ba0e32 commit 4f6189f
Show file tree
Hide file tree
Showing 3 changed files with 272 additions and 125 deletions.
143 changes: 18 additions & 125 deletions baseapp/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,11 @@ package baseapp

import (
"context"
"crypto/rand"
"errors"
"fmt"
"math/big"
"reflect"
"sync"
"time"

"github.com/berachain/offchain-sdk/job"
workertypes "github.com/berachain/offchain-sdk/job/types"
"github.com/berachain/offchain-sdk/log"
sdk "github.com/berachain/offchain-sdk/types"
"github.com/berachain/offchain-sdk/worker"
Expand All @@ -24,11 +19,6 @@ const (

executorName = "job-executor"
executorPromName = "job_executor"

maxBackoff = 2 * time.Minute
backoffStart = 1 * time.Second
backoffBase = 2
jitterRange = 1000
)

// JobManager handles the job and worker lifecycle.
Expand All @@ -49,6 +39,8 @@ type JobManager struct {
// are fed jobs by the job producers.
executorCfg *worker.PoolConfig
jobExecutors *worker.Pool

// TODO: introduce telemetry.Metrics to this struct and the BaseApp.
}

// NewManager creates a new manager.
Expand Down Expand Up @@ -95,8 +87,7 @@ func (jm *JobManager) Logger(ctx context.Context) log.Logger {
return sdk.UnwrapContext(ctx).Logger().With("namespace", "job-manager")
}

// Start calls `Setup` on the jobs in the registry as well as spins up
// the worker pools.
// Start calls `Setup` on the jobs in the registry as well as spins up the worker pools.
func (jm *JobManager) Start(ctx context.Context) {
// We pass in the context in order to handle cancelling the workers. We pass the
// standard go context and not an sdk.Context here since the context here is just used
Expand All @@ -106,8 +97,7 @@ func (jm *JobManager) Start(ctx context.Context) {
jm.jobProducers = worker.NewPool(ctx, logger, jm.producerCfg)
}

// Stop calls `Teardown` on the jobs in the registry as well as
// shut's down all the worker pools.
// Stop calls `Teardown` on the jobs in the registry as well as shut's down all the worker pools.
func (jm *JobManager) Stop() {
var wg sync.WaitGroup

Expand All @@ -119,7 +109,7 @@ func (jm *JobManager) Stop() {
jm.jobProducers = nil
}()

// Shutdown executors and call Teardown().
// Shutdown executors and call Teardown() if a job has one.
wg.Add(1)
go func() {
defer wg.Done()
Expand All @@ -134,29 +124,14 @@ func (jm *JobManager) Stop() {
jm.jobExecutors = nil
}()

// Wait for both to finish. (Should we add a timeout?)
// Wait for both to finish.
wg.Wait()
}

func (jm *JobManager) runProducer(ctx context.Context, j job.Basic) bool {
// Handle migrated jobs.
if wrappedJob := job.WrapJob(j); wrappedJob != nil {
jm.jobProducers.Submit(
func() {
if err := wrappedJob.Producer(
ctx, jm.jobExecutors,
); !errors.Is(err, context.Canceled) && err != nil {
jm.Logger(ctx).Error("error in job producer", "err", err)
}
},
)
return true
}
return false
}

// RunProducers sets up each job and runs its producer.
func (jm *JobManager) RunProducers(gctx context.Context) { //nolint:gocognit // todo fix.
func (jm *JobManager) RunProducers(gctx context.Context) {
ctx := jm.ctxFactory.NewSDKContext(gctx)

// Load all jobs in registry in the order they were registered.
orderedJobs, err := jm.jobRegistry.IterateInOrder()
if err != nil {
Expand All @@ -165,108 +140,26 @@ func (jm *JobManager) RunProducers(gctx context.Context) { //nolint:gocognit //

for _, jobID := range orderedJobs.Keys() {
j := jm.jobRegistry.Get(jobID)
ctx := jm.ctxFactory.NewSDKContext(gctx)

// Run the setup for the job if it has one.
if sj, ok := j.(job.HasSetup); ok {
if err = sj.Setup(ctx); err != nil {
panic(err)
}
}

if jm.runProducer(ctx, j) { //nolint:nestif // todo fix.
continue
// Submit the job to the job producers based on the job's type. Use retries if the job uses
// a subscription.
if wrappedJob := job.WrapJob(j); wrappedJob != nil {
jm.jobProducers.Submit(jm.producerTask(ctx, wrappedJob))
} else if subJob, ok := j.(job.Subscribable); ok {
jm.jobProducers.Submit(func() {
ch := subJob.Subscribe(ctx)
for {
select {
case val := <-ch:
jm.jobExecutors.Submit(workertypes.NewPayload(ctx, subJob, val).Execute)
case <-ctx.Done():
return
default:
continue
}
}
})
jm.jobProducers.Submit(jm.withRetry(jm.retryableSubscriber(ctx, subJob)))
} else if ethSubJob, ok := j.(job.EthSubscribable); ok { //nolint:govet // todo fix.
jm.jobProducers.Submit(withRetry(func() bool {
//nolint:govet // todo fix.
sub, ch, err := ethSubJob.Subscribe(ctx)
if err != nil {
jm.Logger(ctx).Error("error subscribing block header", "err", err)
return true
}
jm.Logger(ctx).Info("(re)subscribed to eth subscription", "job", j.RegistryKey())

for {
select {
case <-ctx.Done():
ethSubJob.Unsubscribe(ctx)
return false
case err = <-sub.Err():
jm.Logger(ctx).Error("error in subscription", "err", err)
ethSubJob.Unsubscribe(ctx)
// retry
return true
case val := <-ch:
jm.jobExecutors.Submit(workertypes.NewPayload(ctx, ethSubJob, val).Execute)
continue
}
}
}, jm.Logger(ctx)))
jm.jobProducers.Submit(jm.withRetry(jm.retryableEthSubscriber(ctx, ethSubJob)))
} else if blockHeaderJob, ok := j.(job.BlockHeaderSub); ok { //nolint:govet // todo fix.
//nolint:govet // todo fix.
jm.jobProducers.Submit(withRetry(func() bool {
sub, ch, err := blockHeaderJob.Subscribe(ctx)
if err != nil {
jm.Logger(ctx).Error("error subscribing block header", "err", err)
return true
}
jm.Logger(ctx).Info("(re)subscribed to block header sub", "job", j.RegistryKey())

for {
select {
case <-ctx.Done():
blockHeaderJob.Unsubscribe(ctx)
return false
case err = <-sub.Err():
jm.Logger(ctx).Error("error in subscription", "err", err)
blockHeaderJob.Unsubscribe(ctx)
return true
case val := <-ch:
jm.jobExecutors.Submit(workertypes.NewPayload(ctx, blockHeaderJob, val).Execute)
continue
}
}
}, jm.Logger(ctx)))
jm.jobProducers.Submit(jm.withRetry(jm.retryableHeaderSubscriber(ctx, blockHeaderJob)))
} else {
panic(fmt.Sprintf("unknown job type %s", reflect.TypeOf(j)))
}
}
}

// withRetry is a wrapper that retries a task with exponential backoff.
func withRetry(task func() bool, logger log.Logger) func() {
return func() {
backoff := backoffStart

for {
if retry := task(); retry {
// Exponential backoff with jitter.
jitter, _ := rand.Int(rand.Reader, big.NewInt(jitterRange))
if jitter == nil {
jitter = new(big.Int)
}
sleep := backoff + time.Duration(jitter.Int64())*time.Millisecond
logger.Info(fmt.Sprintf("retrying task in %s...", sleep))
time.Sleep(sleep)
backoff *= backoffBase
if backoff > maxBackoff {
backoff = maxBackoff
}
continue
}
break
}
}
}
43 changes: 43 additions & 0 deletions baseapp/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package baseapp

import (
"crypto/rand"
"math/big"
"time"
)

// Retry parameters.
const (
maxBackoff = 2 * time.Minute
backoffStart = 1 * time.Second
backoffBase = 2
jitterRange = 1000
subscriptionStaleTimeout = 1 * time.Hour
)

// withRetry is a wrapper that retries a task with exponential backoff.
func (jm *JobManager) withRetry(task func() bool) func() {
return func() {
backoff := backoffStart

for {
shouldRetry := task()
if !shouldRetry {
return
}

// Exponential backoff with jitter.
jitter, _ := rand.Int(rand.Reader, big.NewInt(jitterRange))
if jitter == nil {
jitter = new(big.Int)
}
sleep := backoff + time.Duration(jitter.Int64())*time.Millisecond
time.Sleep(sleep)

backoff *= backoffBase
if backoff > maxBackoff {
backoff = maxBackoff
}
}
}
}
Loading

0 comments on commit 4f6189f

Please sign in to comment.