Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(transactor): Cleanup retry policy and handle error cases #61

Merged
merged 11 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions baseapp/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,9 @@ func withRetry(task func() bool, logger log.Logger) func() {
if retry := task(); retry {
// Exponential backoff with jitter.
jitter, _ := rand.Int(rand.Reader, big.NewInt(jitterRange))
if jitter == nil {
jitter = new(big.Int)
}
sleep := backoff + time.Duration(jitter.Int64())*time.Millisecond
logger.Info(fmt.Sprintf("retrying task in %s...", sleep))
time.Sleep(sleep)
Expand Down
17 changes: 10 additions & 7 deletions client/eth/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,18 @@ type ConnectionPoolImpl struct {
}

type ConnectionPoolConfig struct {
EthHTTPURLs []string
EthWSURLs []string
DefaultTimeout time.Duration
EthHTTPURLs []string
EthWSURLs []string
DefaultTimeout time.Duration
HealthCheckInterval time.Duration
}

func DefaultConnectPoolConfig() *ConnectionPoolConfig {
return &ConnectionPoolConfig{
EthHTTPURLs: []string{"http://localhost:8545"},
EthWSURLs: []string{"ws://localhost:8546"},
EthHTTPURLs: []string{"http://localhost:8545"},
EthWSURLs: []string{"ws://localhost:8546"},
DefaultTimeout: 5 * time.Second, //nolint:gomnd // fix later.
HealthCheckInterval: 5 * time.Second, //nolint:gomnd // fix later.
}
}

Expand Down Expand Up @@ -87,14 +90,14 @@ func (c *ConnectionPoolImpl) Dial(string) error {

func (c *ConnectionPoolImpl) DialContext(ctx context.Context, _ string) error {
for _, url := range c.config.EthHTTPURLs {
client := NewHealthCheckedClient(c.logger)
client := NewHealthCheckedClient(c.config.HealthCheckInterval, c.logger)
if err := client.DialContext(ctx, url); err != nil {
return err
}
c.cache.Add(url, client)
}
for _, url := range c.config.EthWSURLs {
client := NewHealthCheckedClient(c.logger)
client := NewHealthCheckedClient(c.config.HealthCheckInterval, c.logger)
if err := client.DialContext(ctx, url); err != nil {
return err
}
Expand Down
6 changes: 4 additions & 2 deletions client/eth/health_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ type HealthCheckedClient struct {
mu sync.Mutex
}

func NewHealthCheckedClient(logger log.Logger) *HealthCheckedClient {
func NewHealthCheckedClient(
healthCheckInterval time.Duration, logger log.Logger,
) *HealthCheckedClient {
return &HealthCheckedClient{
logger: logger,
healthCheckInterval: 5 * time.Second, //nolint:gomnd // todo paramaterize.
healthCheckInterval: healthCheckInterval,
}
}

Expand Down
8 changes: 1 addition & 7 deletions core/transactor/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,7 @@ func (f *Factory) BuildTransaction(
if txReq.Gas > 0 {
txData.Gas = txReq.Gas
} else {
if txData.Gas, err = ethClient.EstimateGas(ctx, ethereum.CallMsg{
From: f.signerAddress,
To: txData.To,
GasFeeCap: txData.GasFeeCap,
Value: txData.Value,
Data: txData.Data,
}); err != nil {
if txData.Gas, err = ethClient.EstimateGas(ctx, ethereum.CallMsg(*txReq)); err != nil {
return nil, err
}
}
Expand Down
58 changes: 51 additions & 7 deletions core/transactor/sender/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,62 @@ package sender

import (
"context"
"crypto/rand"
"math/big"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
coretypes "github.com/ethereum/go-ethereum/core/types"
)

type (
RetryPolicy func(ctx context.Context,
tx *coretypes.Transaction, err error) (bool, time.Duration)
const (
maxRetriesPerTx = 3 // TODO: read from config.
backoffStart = 5 * time.Second // TODO: read from config.
backoffMultiplier = 2
maxBackoff = 1 * time.Minute
jitterRange = 1000
calbera marked this conversation as resolved.
Show resolved Hide resolved
)

func DefaultRetryPolicy(
_ context.Context, _ *coretypes.Transaction, _ error,
) (bool, time.Duration) {
return false, 5 //nolint:gomnd // todo fix later.
// A RetryPolicy is used to determine if a transaction should be retried and how long to wait
// before retrying again.
type RetryPolicy func(context.Context, *coretypes.Transaction, error) (bool, time.Duration)

// NoRetryPolicy does not retry transactions.
func NoRetryPolicy(context.Context, *coretypes.Transaction, error) (bool, time.Duration) {
return false, backoffStart
}

// NewExponentialRetryPolicy returns a RetryPolicy that does an exponential backoff until
// maxRetries is reached. This does not assume anything about whether the specifc tx should be
// retried.
func NewExponentialRetryPolicy() RetryPolicy {
backoff := backoffStart
retriesMu := &sync.Mutex{}
retries := make(map[common.Hash]int)
calbera marked this conversation as resolved.
Show resolved Hide resolved

return func(ctx context.Context, tx *coretypes.Transaction, err error) (bool, time.Duration) {
retriesMu.Lock()
defer retriesMu.Unlock()

txHash := tx.Hash()
if retries[txHash] >= maxRetriesPerTx {
delete(retries, txHash)
return NoRetryPolicy(ctx, tx, err)
}
retries[txHash]++

// Exponential backoff with jitter.
jitter, _ := rand.Int(rand.Reader, big.NewInt(jitterRange))
if jitter == nil {
jitter = new(big.Int)
}

waitTime := backoff + time.Duration(jitter.Int64())*time.Millisecond
if backoff *= backoffMultiplier; backoff > maxBackoff {
backoff = maxBackoff
}

return true, waitTime
}
}
127 changes: 72 additions & 55 deletions core/transactor/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sender
import (
"context"
"errors"
"strings"
"time"

"github.com/berachain/offchain-sdk/core/transactor/tracker"
Expand Down Expand Up @@ -34,10 +35,10 @@ type Sender struct {
// New creates a new Sender with default replacement and retry policies.
func New(factory Factory, noncer Noncer) *Sender {
return &Sender{
noncer: noncer, // noncer to acquire nonces
factory: factory, // factory to sign transactions
txReplacementPolicy: DefaultTxReplacementPolicy, // default transaction replacement policy
retryPolicy: DefaultRetryPolicy, // default retry policy
noncer: noncer, // noncer to acquire nonces
factory: factory, // factory to sign transactions
txReplacementPolicy: DefaultTxReplacementPolicy, // default tx replacement policy
retryPolicy: NewExponentialRetryPolicy(), // exponential backoff retry policy
}
}

Expand All @@ -47,27 +48,10 @@ func (s *Sender) SendTransaction(ctx context.Context, tx *coretypes.Transaction)
sCtx := sdk.UnwrapContext(ctx) // unwrap the context to get the SDK context
ethClient := sCtx.Chain() // get the Ethereum client from the SDK context

// TODO: needs to be resigned by factory.
// TODO: this returns the revert error message, handle it.
if err := ethClient.SendTransaction(ctx, tx); err != nil { // if sending the transaction fails
sCtx.Logger().Error(
"failed to send tx transaction", "hash", tx.Hash(), "err", err, // log the error
)

// Send the replacement transaction.
price := tx.GasPrice()
tx = s.txReplacementPolicy(ctx, tx)
sCtx.Logger().Info(
"retrying with new gas limit", "old", price, "new", tx.GasPrice(), "nonce", tx.Nonce(),
)
if retry, backoff := s.retryPolicy(ctx, tx, err); retry {
time.Sleep(backoff) // wait for the backoff time
if err = s.SendTransaction(ctx, tx); err != nil { // retry sending the transaction
return err // if it fails again, return the error
}
}

// if the retry policy does not allow for a retry, return the error
if err := ethClient.SendTransaction(ctx, tx); err != nil {
sCtx.Logger().Error("failed to send tx", "hash", tx.Hash(), "err", err)
// if sending the transaction fails, retry according to the retry policy
go s.retryTxWithPolicy(sCtx, tx, err)
return err
}

Expand All @@ -91,48 +75,81 @@ func (s *Sender) OnRevert(*tracker.InFlightTx, *coretypes.Receipt) error {
// transaction is replaced with a new transaction with a higher gas price as defined by the
// txReplacementPolicy.
func (s *Sender) OnStale(ctx context.Context, tx *tracker.InFlightTx) error {
replacementTx, err := s.factory.SignTransaction(s.txReplacementPolicy(ctx, tx.Transaction))
if err != nil {
sdk.UnwrapContext(ctx).Logger().Error(
"failed to sign replacement transaction", "err", err)
return err
}
return s.SendTransaction(ctx, replacementTx)
return s.retryTx(sdk.UnwrapContext(ctx), tx.Transaction)
}

// OnError is called when an error occurs while sending a transaction. In this case, the
// transaction is replaced with a new transaction with a higher gas price as defined by
// the txReplacementPolicy.
// TODO: make this more robust probably.
func (s *Sender) OnError(ctx context.Context, tx *tracker.InFlightTx, err error) {
if errors.Is(err, core.ErrNonceTooLow) {
ethTx, buildErr := s.factory.BuildTransaction(ctx, &types.TxRequest{
To: tx.To(),
Value: tx.Value(),
Data: tx.Data(),
})
if buildErr != nil {
sdk.UnwrapContext(ctx).Logger().Error(
"failed to build replacement transaction", "err", err)
sCtx := sdk.UnwrapContext(ctx)

// Assign the new transaction to the in-flight transaction.
tx.Transaction = s.handleNonceTooLow(sCtx, tx.Transaction, err)
tx.Receipt = nil

// The original tx was never sent so we remove from the in-flight list.
s.noncer.RemoveInFlight(tx)

_ = s.retryTx(sCtx, tx.Transaction)
}

// retryTxWithPolicy retries the tx according to the retry policy. If the nonce is too low, builds
// a new tx with the latest nonce from the factory & noncer.
func (s *Sender) retryTxWithPolicy(sCtx *sdk.Context, tx *coretypes.Transaction, err error) {
tx = s.handleNonceTooLow(sCtx, tx, err)

for {
retry, backoff := s.retryPolicy(sCtx, tx, err)
if !retry {
return
}
// The original tx was never sent so we remove from the in-flight list.
s.noncer.RemoveInFlight(tx)

// Assign the new transaction to the in-flight transaction.
tx.Transaction = ethTx
tx.Receipt = nil
time.Sleep(backoff) // wait for the backoff time
err = s.retryTx(sCtx, tx)
}
}

// retryTx manages the logic for replacing a tx according to the replacement policy and resending.
func (s *Sender) retryTx(sCtx *sdk.Context, tx *coretypes.Transaction) error {
replacementTx := s.txReplacementPolicy(sCtx, tx)
sCtx.Logger().Debug(
"retrying with new gas and nonce",
"old", tx.GasPrice(), "new", replacementTx.GasPrice(), "nonce", tx.Nonce(),
)

replacementTx, err := s.factory.SignTransaction(s.txReplacementPolicy(ctx, tx.Transaction))
// sign the tx with the new gas price
signedTx, err := s.factory.SignTransaction(replacementTx)
if err != nil {
sdk.UnwrapContext(ctx).Logger().Error(
"failed to sign replacement transaction", "err", err)
return
sCtx.Logger().Error("failed to sign replacement transaction", "err", err)
return err
}

// retry sending the transaction
return s.SendTransaction(sCtx, signedTx)
}

// handleNonceTooLow will replace a transaction with a new one if the nonce is too low.
func (s *Sender) handleNonceTooLow(
sCtx *sdk.Context, tx *coretypes.Transaction, err error,
) *coretypes.Transaction {
if !(errors.Is(err, core.ErrNonceTooLow) || strings.Contains(err.Error(), "nonce too low")) {
return tx
}
if err = s.SendTransaction(ctx, replacementTx); err != nil {
sdk.UnwrapContext(ctx).Logger().Error(
"failed to send replacement transaction", "err", err)
return

ethTx, buildErr := s.factory.BuildTransaction(sCtx, &types.TxRequest{
To: tx.To(),
Gas: tx.Gas(),
GasPrice: tx.GasPrice(),
GasFeeCap: tx.GasFeeCap(),
GasTipCap: tx.GasTipCap(),
Value: tx.Value(),
Data: tx.Data(),
})
if buildErr != nil {
sCtx.Logger().Error("failed to build replacement transaction", "err", err)
return tx
}

return ethTx
}
4 changes: 2 additions & 2 deletions core/transactor/transactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (t *TxrV2) mainLoop(ctx context.Context) {
go func() {
defer t.mu.Unlock()
if err := t.sendAndTrack(ctx, msgIDs, batch...); err != nil {
t.logger.Error("failed to process batch", "err", err)
t.logger.Error("failed to process batch", "msgs", msgIDs, "err", err)
}
}()
}
Expand Down Expand Up @@ -197,7 +197,7 @@ func (t *TxrV2) sendAndTrack(
return err
}

// t.logger.Debug("📡 sent transaction", "tx-hash", tx.Hash().Hex(), "tx-reqs", len(batch))
t.logger.Debug("📡 sent transaction", "tx-hash", tx.Hash().Hex(), "tx-reqs", len(batch))

// Spin off a goroutine to track the transaction.
t.tracker.Track(
Expand Down
3 changes: 2 additions & 1 deletion examples/listener/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ EventName = "NumberChanged(uint256)"
[App.Jobs.Poller]
Interval = "1s"

[ConnectionPool]
[App.ConnectionPool]
EthHTTPURLs = ["http://localhost:10545"]
EthWSURLs = ["ws://localhost:10546"]
DefaultTimeout = "5s"
HealthCheckInterval = "5s"

[Server.HTTP]
Port = 8080
Loading