Skip to content

Commit

Permalink
fix(transactor): Fix the mempool check for tx status (#80)
Browse files Browse the repository at this point in the history
* fix pending mempool check

* cleanup

* rename

* some refactoring

* comments

* more notes on the config
  • Loading branch information
calbera authored Mar 22, 2024
1 parent 475e82f commit bb9c6f7
Show file tree
Hide file tree
Showing 13 changed files with 275 additions and 229 deletions.
8 changes: 7 additions & 1 deletion core/transactor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ import (
)

type Config struct {
// Hex string address of the multicall contract to be used for batched txs.
// Hex string address of the multicall contract to be used for batched txs. Currently
// configured to use the following `public`, `payable` function on this contract:
// call data signature: `tryAggregate(bool,(address,bytes))`
// returns: `([](bool,bytes))`
Multicall3Address string

// How large an individual batched tx will be (uses multicall contract if > 1).
Expand All @@ -20,6 +23,9 @@ type Config struct {
// How long to wait to retrieve txs from the queue if it is empty (ideally quick <= 1s).
EmptyQueueDelay time.Duration

// Maximum duration allowed for the tx to be signed (increase this if using a remote signer)
SignTxTimeout time.Duration

// How long to wait for the pending nonce (ideally 1 block time).
PendingNonceInterval time.Duration
// How long to wait for a tx to hit the mempool (ideally 1-2 block time).
Expand Down
40 changes: 13 additions & 27 deletions core/transactor/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,30 @@ import (
coretypes "github.com/ethereum/go-ethereum/core/types"
)

const signTxTimeout = 2 * time.Second // TODO: read from config.

// Factory is a transaction factory that builds 1559 transactions with the configured signer.
type Factory struct {
noncer Noncer
signer kmstypes.TxSigner
signerAddress common.Address
signTxTimeout time.Duration
mc3Batcher *Multicall3Batcher

// caches
ethClient eth.Client
chainID *big.Int
ethClient eth.Client
chainID *big.Int
signerAddress common.Address
}

// New creates a new factory instance.
func New(noncer Noncer, signer kmstypes.TxSigner, mc3Batcher *Multicall3Batcher) *Factory {
func New(
noncer Noncer, mc3Batcher *Multicall3Batcher,
signer kmstypes.TxSigner, signTxTimeout time.Duration,
) *Factory {
return &Factory{
noncer: noncer,
signer: signer,
signerAddress: signer.Address(),
signTxTimeout: signTxTimeout,
mc3Batcher: mc3Batcher,
signerAddress: signer.Address(),
}
}

Expand Down Expand Up @@ -91,11 +94,6 @@ func (f *Factory) buildTransaction(
var isReplacing bool
if nonce == 0 {
nonce, isReplacing = f.noncer.Acquire()
defer func() {
if err != nil {
f.noncer.RemoveAcquired(nonce)
}
}()
}

// start building the 1559 transaction
Expand Down Expand Up @@ -143,30 +141,18 @@ func (f *Factory) buildTransaction(
}
}

// bump gas (if necessary) and sign the transaction.
// bump gas (if necessary)
tx := coretypes.NewTx(txData)
if isReplacing {
tx = sender.BumpGas(tx)
}
tx, err = f.SignTransaction(ctx, tx)
return tx, err
}

// signTransaction signs a transaction with the configured signer.
func (f *Factory) SignTransaction(
ctx context.Context, tx *coretypes.Transaction,
) (*coretypes.Transaction, error) {
ctxWithTimeout, cancel := context.WithTimeout(ctx, signTxTimeout)
// sign the transaction
ctxWithTimeout, cancel := context.WithTimeout(ctx, f.signTxTimeout)
signer, err := f.signer.SignerFunc(ctxWithTimeout, tx.ChainId())
cancel()
if err != nil {
return nil, err
}
return signer(f.signerAddress, tx)
}

// GetNextNonce lets the noncer know that the old nonce could not be sent and acquires a new one.
func (f *Factory) GetNextNonce(oldNonce uint64) (uint64, bool) {
f.noncer.RemoveAcquired(oldNonce)
return f.noncer.Acquire()
}
5 changes: 2 additions & 3 deletions core/transactor/factory/types.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package factory

// Noncer is an interface for acquiring nonces.
// Noncer is an interface for acquiring fresh nonces.
type Noncer interface {
Acquire() (nonce uint64, isReplacing bool)
RemoveAcquired(uint64)
Acquire() (uint64, bool)
}
122 changes: 122 additions & 0 deletions core/transactor/loop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package transactor

import (
"context"
"time"

"github.com/berachain/offchain-sdk/core/transactor/tracker"
"github.com/berachain/offchain-sdk/core/transactor/types"

"github.com/ethereum/go-ethereum"
)

// mainLoop is the main transaction sending / batching loop.
func (t *TxrV2) mainLoop(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
// Attempt the retrieve a batch from the queue.
requests := t.retrieveBatch(ctx)
if len(requests) == 0 {
// We didn't get any transactions, so we wait for more.
t.logger.Info("no tx requests to process....")
time.Sleep(t.cfg.EmptyQueueDelay)
continue
}

// We got a batch, so we can build and fire, after the previous fire has finished.
t.senderMu.Lock()
go func() {
defer t.senderMu.Unlock()

t.fire(
ctx,
&tracker.Response{MsgIDs: requests.MsgIDs(), InitialTimes: requests.Times()},
true, requests.Messages()...,
)
}()
}
}
}

// retrieveBatch retrieves a batch of transaction requests from the queue. It waits until 1) it
// hits the batch timeout or 2) tx batch size is reached only if waitFullBatchTimeout is false.
func (t *TxrV2) retrieveBatch(ctx context.Context) types.Requests {
var (
requests types.Requests
timer = time.NewTimer(t.cfg.TxBatchTimeout)
)
defer timer.Stop()

// Loop until the batch tx timeout expires.
for {
select {
case <-ctx.Done():
return nil
case <-timer.C:
return requests
default:
txsRemaining := t.cfg.TxBatchSize - len(requests)

// If we reached max batch size, we can break out of the loop.
if txsRemaining == 0 {
// Wait until the timer hits if we want to wait for the full batch timeout.
if t.cfg.WaitFullBatchTimeout {
<-timer.C
}
return requests
}

// Get at most txsRemaining tx requests from the queue.
msgIDs, txReqs, err := t.requests.ReceiveMany(int32(txsRemaining))
if err != nil {
t.logger.Error("failed to receive tx request", "err", err)
continue
}

// Update the batched tx requests.
for i, txReq := range txReqs {
if t.cfg.UseQueueMessageID {
txReq.MsgID = msgIDs[i]
}
requests = append(requests, txReq)
}
}
}
}

// fire processes the tracked tx response. If requested to build, it will first batch the messages.
// Then it sends the batch as one tx and asynchronously tracks the tx for its status.
// NOTE: if toBuild is false, resp.Transaction must be a valid, non-nil tx.
func (t *TxrV2) fire(
ctx context.Context, resp *tracker.Response, toBuild bool, msgs ...*ethereum.CallMsg,
) {
defer func() {
// If there was an error in building or sending the tx, let the subscribers know.
if resp.Status() == tracker.StatusError {
t.dispatcher.Dispatch(resp)
}
}()

if toBuild {
// Call the factory to build the (batched) transaction.
t.markState(types.StateBuilding, resp.MsgIDs...)
resp.Transaction, resp.Error = t.factory.BuildTransactionFromRequests(ctx, msgs...)
if resp.Error != nil {
return
}
}

// Call the sender to send the transaction to the chain.
t.markState(types.StateSending, resp.MsgIDs...)
if resp.Error = t.sender.SendTransaction(ctx, resp.Transaction); resp.Error != nil {
return
}
t.logger.Debug("📡 sent transaction", "hash", resp.Hash().Hex(), "reqs", len(resp.MsgIDs))

// Call the tracker to track the transaction async.
t.markState(types.StateInFlight, resp.MsgIDs...)
t.tracker.Track(ctx, resp)
}
23 changes: 15 additions & 8 deletions core/transactor/sender/replacement.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,33 @@ import (
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/txpool"
coretypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
)

var _ TxReplacementPolicy = (*DefaultTxReplacementPolicy)(nil)
var _ txReplacementPolicy = (*defaultTxReplacementPolicy)(nil)

// DefaultTxReplacementPolicy is the default transaction replacement policy. It bumps the gas price
// defaultTxReplacementPolicy is the default transaction replacement policy. It bumps the gas price
// by 15% (only 10% is required but we add a buffer to be safe) and generates a replacement 1559
// dynamic fee transaction.
type DefaultTxReplacementPolicy struct {
nf Factory
type defaultTxReplacementPolicy struct {
noncer Noncer
}

func (d *DefaultTxReplacementPolicy) GetNew(
func (d *defaultTxReplacementPolicy) GetNew(
tx *coretypes.Transaction, err error,
) *coretypes.Transaction {
) (*coretypes.Transaction, error) {
// If the sender is out of balance, return the error.
if errors.Is(err, vm.ErrInsufficientBalance) ||
(err != nil && strings.Contains(err.Error(), "insufficient balance for transfer")) {
return nil, err
}

// Replace the nonce if the nonce was too low.
var shouldBumpGas bool
if errors.Is(err, core.ErrNonceTooLow) ||
(err != nil && strings.Contains(err.Error(), "nonce too low")) {
var newNonce uint64
newNonce, shouldBumpGas = d.nf.GetNextNonce(tx.Nonce())
newNonce, shouldBumpGas = d.noncer.Acquire()
tx = SetNonce(tx, newNonce)
}

Expand All @@ -36,5 +43,5 @@ func (d *DefaultTxReplacementPolicy) GetNew(
tx = BumpGas(tx)
}

return tx
return tx, nil
}
20 changes: 10 additions & 10 deletions core/transactor/sender/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,26 @@ const (
)

var (
_ RetryPolicy = (*NoRetryPolicy)(nil)
_ RetryPolicy = (*ExpoRetryPolicy)(nil)
_ retryPolicy = (*noRetryPolicy)(nil)
_ retryPolicy = (*expoRetryPolicy)(nil)
)

// NoRetryPolicy does not retry transactions.
type NoRetryPolicy struct{}
// noRetryPolicy does not retry transactions.
type noRetryPolicy struct{}

func (*NoRetryPolicy) Get(*coretypes.Transaction, error) (bool, time.Duration) {
func (*noRetryPolicy) Get(*coretypes.Transaction, error) (bool, time.Duration) {
return false, 0
}

func (*NoRetryPolicy) UpdateTxModified(common.Hash, common.Hash) {}
func (*noRetryPolicy) UpdateTxModified(common.Hash, common.Hash) {}

// ExpoRetryPolicy is a RetryPolicy that does an exponential backoff until maxRetries is
// expoRetryPolicy is a RetryPolicy that does an exponential backoff until maxRetries is
// reached. This does not assume anything about whether the specifc tx should be retried.
type ExpoRetryPolicy struct {
type expoRetryPolicy struct {
retries sync.Map
}

func (erp *ExpoRetryPolicy) Get(tx *coretypes.Transaction, err error) (bool, time.Duration) {
func (erp *expoRetryPolicy) Get(tx *coretypes.Transaction, err error) (bool, time.Duration) {
var (
txHash = tx.Hash()
tri *txRetryInfo
Expand Down Expand Up @@ -75,7 +75,7 @@ func (erp *ExpoRetryPolicy) Get(tx *coretypes.Transaction, err error) (bool, tim
return true, waitTime
}

func (erp *ExpoRetryPolicy) UpdateTxModified(oldTx, newTx common.Hash) {
func (erp *expoRetryPolicy) UpdateTxModified(oldTx, newTx common.Hash) {
if txri, found := erp.retries.Load(oldTx); found {
erp.retries.Delete(oldTx)
erp.retries.Store(newTx, txri)
Expand Down
22 changes: 13 additions & 9 deletions core/transactor/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,22 @@ import (
coretypes "github.com/ethereum/go-ethereum/core/types"
)

// Sender is a component that sends transactions to the chain.
// Sender is a component that sends (and retries) transactions to the chain.
type Sender struct {
factory Factory // factory to sign new transactions
txReplacementPolicy TxReplacementPolicy // policy to replace transactions
retryPolicy RetryPolicy // policy to retry transactions
factory Factory // used to rebuild transactions, if necessary
txReplacementPolicy txReplacementPolicy // policy to replace transactions
retryPolicy retryPolicy // policy to retry transactions

chain eth.Client
logger log.Logger
}

// New creates a new Sender with default replacement and exponential retry policies.
func New(factory Factory) *Sender {
func New(factory Factory, noncer Noncer) *Sender {
return &Sender{
factory: factory,
txReplacementPolicy: &DefaultTxReplacementPolicy{nf: factory},
retryPolicy: &ExpoRetryPolicy{}, // TODO: choose from config.
txReplacementPolicy: &defaultTxReplacementPolicy{noncer: noncer},
retryPolicy: &expoRetryPolicy{}, // TODO: choose from config.
}
}

Expand Down Expand Up @@ -61,7 +61,10 @@ func (s *Sender) retryTxWithPolicy(ctx context.Context, tx *coretypes.Transactio
s.logger.Error("failed to send tx, retrying...", "hash", currTx, "err", err)

// Get the replacement tx if necessary.
tx = s.txReplacementPolicy.GetNew(tx, err)
if tx, err = s.txReplacementPolicy.GetNew(tx, err); err != nil {
s.logger.Error("failed to get replacement tx", "err", err)
return err
}

// Update the retry policy if the transaction has been changed and log.
if newTx := tx.Hash(); newTx != currTx {
Expand All @@ -77,7 +80,8 @@ func (s *Sender) retryTxWithPolicy(ctx context.Context, tx *coretypes.Transactio
if tx, err = s.factory.RebuildTransactionFromRequest(
ctx, types.CallMsgFromTx(tx), tx.Nonce(),
); err != nil {
s.logger.Error("failed to sign replacement transaction", "err", err)
s.logger.Error("failed to build replacement transaction", "err", err)
return err
}
}
}
Loading

0 comments on commit bb9c6f7

Please sign in to comment.