diff --git a/client/eth/client.go b/client/eth/client.go index 9933181..3f5b193 100644 --- a/client/eth/client.go +++ b/client/eth/client.go @@ -52,13 +52,13 @@ type Reader interface { { "pending": { - "1": { + 1: { // transaction details... }, ... }, "queued": { - "3": { + 3: { // transaction details... }, ... @@ -66,7 +66,7 @@ type Reader interface { } */ TxPoolContentFrom(ctx context.Context, address common.Address) ( - map[string]map[string]*ethcoretypes.Transaction, error, + map[string]map[uint64]*ethcoretypes.Transaction, error, ) /* @@ -76,19 +76,19 @@ type Reader interface { { "pending": { "0x12345": { - "1": "0x12345789: 1 wei + 2 gas x 3 wei" + 1: "0x12345789: 1 wei + 2 gas x 3 wei" }, ... }, "queued": { "0x67890": { - "2": "0x12345789: 1 wei + 2 gas x 3 wei" + 2: "0x12345789: 1 wei + 2 gas x 3 wei" }, ... } } */ - TxPoolInspect(ctx context.Context) (map[string]map[common.Address]map[string]string, error) + TxPoolInspect(ctx context.Context) (map[string]map[common.Address]map[uint64]string, error) } type Writer interface { @@ -129,7 +129,7 @@ func (c *ExtendedEthClient) Close() error { if c == nil { return ErrClosed } - c.Close() + c.Client.Close() return nil } @@ -180,8 +180,8 @@ func (c *ExtendedEthClient) SubscribeFilterLogs( func (c *ExtendedEthClient) TxPoolContentFrom( ctx context.Context, address common.Address, -) (map[string]map[string]*ethcoretypes.Transaction, error) { - var result map[string]map[string]*ethcoretypes.Transaction +) (map[string]map[uint64]*ethcoretypes.Transaction, error) { + var result map[string]map[uint64]*ethcoretypes.Transaction ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout) defer cancel() if err := c.Client.Client().CallContext( @@ -194,8 +194,8 @@ func (c *ExtendedEthClient) TxPoolContentFrom( func (c *ExtendedEthClient) TxPoolInspect( ctx context.Context, -) (map[string]map[common.Address]map[string]string, error) { - var result map[string]map[common.Address]map[string]string +) (map[string]map[common.Address]map[uint64]string, error) { + var result map[string]map[common.Address]map[uint64]string ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout) defer cancel() if err := c.Client.Client().CallContext( diff --git a/client/eth/client_provider.go b/client/eth/client_provider.go index a0d0f96..dc1958e 100644 --- a/client/eth/client_provider.go +++ b/client/eth/client_provider.go @@ -261,13 +261,13 @@ Example response: { "pending": { - "1": { + 1: { // transaction details... }, ... }, "queued": { - "3": { + 3: { // transaction details... }, ... @@ -275,7 +275,7 @@ Example response: } */ func (c *ChainProviderImpl) TxPoolContentFrom(ctx context.Context, address common.Address) ( - map[string]map[string]*types.Transaction, error, + map[string]map[uint64]*types.Transaction, error, ) { if client, ok := c.GetHTTP(); ok { ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout) @@ -292,20 +292,20 @@ Example response: { "pending": { "0x12345": { - "1": "0x12345789: 1 wei + 2 gas x 3 wei" + 1: "0x12345789: 1 wei + 2 gas x 3 wei" }, ... }, "queued": { "0x67890": { - "2": "0x12345789: 1 wei + 2 gas x 3 wei" + 2: "0x12345789: 1 wei + 2 gas x 3 wei" }, ... } } */ func (c *ChainProviderImpl) TxPoolInspect(ctx context.Context) ( - map[string]map[common.Address]map[string]string, error, + map[string]map[common.Address]map[uint64]string, error, ) { if client, ok := c.GetHTTP(); ok { ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout) diff --git a/client/eth/client_test.go b/client/eth/client_test.go new file mode 100644 index 0000000..ab39233 --- /dev/null +++ b/client/eth/client_test.go @@ -0,0 +1,156 @@ +package eth_test + +import ( + "context" + "os" + "testing" + "time" + + "github.com/berachain/offchain-sdk/client/eth" + "github.com/stretchr/testify/assert" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + ethcoretypes "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" +) + +// This file tests the methods on the extended eth client. +// +// Note that the following must be set up for these tests to run: +// - the chain RPC is available at env var `ETH_RPC_URL` +// - OR the chain WS RPC is available at env var `ETH_RPC_URL_WS` +// - the RPC endpoint must return within 5 seconds or tests will timeout +// - [Optional for `txPoolContentFrom`] a wallet to query the mempool for at env var `ETH_ADDR` + +const ( + TestModeHTTP int = iota + TestModeWS + TestModeEither +) + +// NOTE: requires Ethereum chain rpc url at env var `ETH_RPC_URL` or `ETH_RPC_URL_WS`. +func setUp(testMode int, t *testing.T) (*eth.ExtendedEthClient, error) { + rpcTimeout := 5 * time.Second + ctxWithTimeout, cancel := context.WithTimeout(context.Background(), rpcTimeout) + defer cancel() + + var ethRPC string + switch testMode { + case TestModeWS: + ethRPC = os.Getenv("ETH_RPC_URL_WS") + case TestModeHTTP: + ethRPC = os.Getenv("ETH_RPC_URL") + case TestModeEither: + if ethRPC = os.Getenv("ETH_RPC_URL_WS"); ethRPC == "" { + ethRPC = os.Getenv("ETH_RPC_URL") + } + default: + panic("invalid test mode") + } + if ethRPC == "" { + t.Skipf("Skipping test: no eth rpc url provided") + } + + ethClient, err := ethclient.DialContext(ctxWithTimeout, ethRPC) + if err != nil { + return nil, err + } + + eec := eth.NewExtendedEthClient(ethClient, rpcTimeout) + return eec, nil +} + +// NOTE: requires Ethereum chain rpc url at env var `ETH_RPC_URL` or `ETH_RPC_URL_WS`. +func TestClose(t *testing.T) { + eec, err := setUp(TestModeEither, t) + assert.NoError(t, err) + + err = eec.Close() + assert.NoError(t, err) +} + +// NOTE: requires Ethereum chain rpc url at env var `ETH_RPC_URL` or `ETH_RPC_URL_WS`. +func TestHealth(t *testing.T) { + eec, err := setUp(TestModeEither, t) + assert.NoError(t, err) + + health := eec.Health() + assert.True(t, health) +} + +// NOTE: requires Ethereum chain rpc url at env var `ETH_RPC_URL`. +func TestGetReceipts(t *testing.T) { + eec, err := setUp(TestModeHTTP, t) + assert.NoError(t, err) + + ctx := context.Background() + txs := ethcoretypes.Transactions{} + + receipts, err := eec.GetReceipts(ctx, txs) + assert.NoError(t, err) + assert.Empty(t, receipts) +} + +// NOTE: requires Ethereum chain rpc url at env var `ETH_RPC_URL_WS`. +func TestSubscribeNewHead(t *testing.T) { + eec, err := setUp(TestModeWS, t) + assert.NoError(t, err) + + ctx := context.Background() + + ch, sub, err := eec.SubscribeNewHead(ctx) + assert.NoError(t, err) + assert.NotNil(t, ch) + assert.NotNil(t, sub) + + assert.NotPanics(t, func() { sub.Unsubscribe() }) +} + +// NOTE: requires Ethereum chain rpc url at env var `ETH_RPC_URL_WS`. +func TestSubscribeFilterLogs(t *testing.T) { + eec, err := setUp(TestModeWS, t) + assert.NoError(t, err) + + ctx := context.Background() + query := ethereum.FilterQuery{} + ch := make(chan ethcoretypes.Log) + + sub, err := eec.SubscribeFilterLogs(ctx, query, ch) + assert.NoError(t, err) + assert.NotNil(t, sub) + + assert.NotPanics(t, func() { sub.Unsubscribe() }) +} + +// NOTE: requires Ethereum chain rpc url at env var `ETH_RPC_URL` AND a wallet to query the +// txpool for at `ETH_ADDR`. +func TestTxPoolContentFrom(t *testing.T) { + eec, err := setUp(TestModeHTTP, t) + assert.NoError(t, err) + + ctx := context.Background() + addrStr := os.Getenv("ETH_ADDR") + if addrStr == "" { + t.Skipf("Skipping test: no eth address provided") + } + address := common.HexToAddress(addrStr) + + result, err := eec.TxPoolContentFrom(ctx, address) + assert.NoError(t, err) + assert.NotNil(t, result) + + t.Log("result", result) +} + +// NOTE: requires Ethereum chain rpc url at env var `ETH_RPC_URL`. +func TestTxPoolInspect(t *testing.T) { + eec, err := setUp(TestModeHTTP, t) + assert.NoError(t, err) + + ctx := context.Background() + + result, err := eec.TxPoolInspect(ctx) + assert.NoError(t, err) + assert.NotNil(t, result) +} diff --git a/core/transactor/README.md b/core/transactor/README.md new file mode 100644 index 0000000..68664f6 --- /dev/null +++ b/core/transactor/README.md @@ -0,0 +1,15 @@ +# Transactor + +The transactor is the component of the offchain-sdk system that manages sending transactions from 1 particular wallet. + +## Features + +TODO + +## Usage + +TODO + +### Configuraton + +TODO diff --git a/core/transactor/config.go b/core/transactor/config.go index 4522240..4b6f649 100644 --- a/core/transactor/config.go +++ b/core/transactor/config.go @@ -16,16 +16,16 @@ type Config struct { WaitFullBatchTimeout bool // How long to wait to retrieve txs from the queue if it is empty (ideally quick <= 1s). EmptyQueueDelay time.Duration + // What the `requireSuccess` flag should be set for calls to `multicall`, if batching txs. + MulticallRequireSuccess bool // Maximum duration allowed for the tx to be signed (increase this if using a remote signer) SignTxTimeout time.Duration // How long to wait for the pending nonce (ideally 1 block time). PendingNonceInterval time.Duration - // How long to wait for a tx to hit the mempool (ideally 1-2 block time). - InMempoolTimeout time.Duration - // How long to wait for a tx to be mined/confirmed by the chain. - TxReceiptTimeout time.Duration + // How long to wait for a tx to hit the mempool and/or be confirmed by the chain. + TxWaitingTimeout time.Duration // Whether we should resend txs that are stale (not confirmed after the receipt timeout). ResendStaleTxs bool diff --git a/core/transactor/factory/batcher/multicall3.go b/core/transactor/factory/batcher/multicall3.go index ce880d1..a2221fb 100644 --- a/core/transactor/factory/batcher/multicall3.go +++ b/core/transactor/factory/batcher/multicall3.go @@ -39,7 +39,9 @@ func NewMulticall3(address common.Address) *Multicall3 { } // BatchRequests creates a batched transaction request for the given call requests. -func (mc *Multicall3) BatchRequests(callReqs ...*ethereum.CallMsg) *types.Request { +func (mc *Multicall3) BatchRequests( + requireSuccess bool, callReqs ...*ethereum.CallMsg, +) *types.Request { var ( calls = make([]bindings.Multicall3Call, len(callReqs)) totalValue = big.NewInt(0) @@ -73,7 +75,7 @@ func (mc *Multicall3) BatchRequests(callReqs ...*ethereum.CallMsg) *types.Reques txRequest, _ := mc.packer.CreateRequest( "", mc.contractAddress, totalValue, gasTipCap, gasFeeCap, gasLimit, - tryAggregate, false, calls, + tryAggregate, requireSuccess, calls, ) return txRequest } @@ -81,12 +83,12 @@ func (mc *Multicall3) BatchRequests(callReqs ...*ethereum.CallMsg) *types.Reques // BatchCallRequests uses the Multicall3 contract to create a batched call request for the given // call messages and return the batched call result data for each call, as a `[]Multicall3Result`. func (mc *Multicall3) BatchCallRequests( - ctx context.Context, from common.Address, callReqs ...*ethereum.CallMsg, + ctx context.Context, from common.Address, requireSuccess bool, callReqs ...*ethereum.CallMsg, ) (any, error) { sCtx := sdk.UnwrapContext(ctx) // get the batched tx (call) requests - batchedCall := mc.BatchRequests(callReqs...) + batchedCall := mc.BatchRequests(requireSuccess, callReqs...) batchedCall.From = from // call the multicall3 contract with the batched call request diff --git a/core/transactor/factory/batcher/multicall3_test.go b/core/transactor/factory/batcher/multicall3_test.go index 25c50f8..48702ed 100644 --- a/core/transactor/factory/batcher/multicall3_test.go +++ b/core/transactor/factory/batcher/multicall3_test.go @@ -46,7 +46,7 @@ func TestMulticall3(t *testing.T) { } // batch and send the calls to the chain - resp, err := multicaller.BatchCallRequests(sCtx, empty, call1.CallMsg, call2.CallMsg) + resp, err := multicaller.BatchCallRequests(sCtx, empty, false, call1.CallMsg, call2.CallMsg) if err != nil { t.Fatal(err) } diff --git a/core/transactor/factory/batcher/payable_multicall.go b/core/transactor/factory/batcher/payable_multicall.go index d833496..3a303cd 100644 --- a/core/transactor/factory/batcher/payable_multicall.go +++ b/core/transactor/factory/batcher/payable_multicall.go @@ -8,6 +8,7 @@ import ( "strings" "github.com/berachain/offchain-sdk/contracts/bindings" + "github.com/berachain/offchain-sdk/core/transactor/factory" "github.com/berachain/offchain-sdk/core/transactor/types" sdk "github.com/berachain/offchain-sdk/types" @@ -17,6 +18,8 @@ import ( const multicall = `multicall` +var _ factory.Batcher = (*Multicall3)(nil) + // Corresponding to the PayableMulticall contract in contracts/lib/transient-goodies/src // (https://github.com/berachain/transient-goodies/blob/try-aggregate/src/PayableMulticallable.sol) type PayableMulticall struct { @@ -33,7 +36,9 @@ func NewPayableMulticall(address common.Address) *PayableMulticall { } // BatchRequests creates a batched transaction request for the given call requests. -func (mc *PayableMulticall) BatchRequests(callReqs ...*ethereum.CallMsg) *types.Request { +func (mc *PayableMulticall) BatchRequests( + requireSuccess bool, callReqs ...*ethereum.CallMsg, +) *types.Request { var ( calls = make([][]byte, len(callReqs)) totalValue = big.NewInt(0) @@ -65,7 +70,7 @@ func (mc *PayableMulticall) BatchRequests(callReqs ...*ethereum.CallMsg) *types. txRequest, _ := mc.packer.CreateRequest( "", mc.contractAddress, totalValue, gasTipCap, gasFeeCap, gasLimit, - multicall, false, calls, + multicall, requireSuccess, calls, ) return txRequest } @@ -73,12 +78,12 @@ func (mc *PayableMulticall) BatchRequests(callReqs ...*ethereum.CallMsg) *types. // BatchCallRequests uses the PayableMulticall contract to create a batched call request for the // given call messages and return the batched call result data for each call, as a `[][]byte`. func (mc *PayableMulticall) BatchCallRequests( - ctx context.Context, from common.Address, callReqs ...*ethereum.CallMsg, + ctx context.Context, from common.Address, requireSuccess bool, callReqs ...*ethereum.CallMsg, ) (any, error) { sCtx := sdk.UnwrapContext(ctx) // get the batched tx (call) requests - batchedCall := mc.BatchRequests(callReqs...) + batchedCall := mc.BatchRequests(requireSuccess, callReqs...) batchedCall.From = from // call the multicall3 contract with the batched call request diff --git a/core/transactor/factory/batcher/payable_multicall_test.go b/core/transactor/factory/batcher/payable_multicall_test.go index 5ab9054..5e608ce 100644 --- a/core/transactor/factory/batcher/payable_multicall_test.go +++ b/core/transactor/factory/batcher/payable_multicall_test.go @@ -57,7 +57,7 @@ func TestPayableMulticall(t *testing.T) { // batch and send the calls to the chain resp, err := multicaller.BatchCallRequests( - sCtx, walletAddr, call1.CallMsg, call2.CallMsg, call3.CallMsg, + sCtx, walletAddr, false, call1.CallMsg, call2.CallMsg, call3.CallMsg, ) if err != nil { t.Fatal(err) diff --git a/core/transactor/factory/factory.go b/core/transactor/factory/factory.go index df2d0c5..b68ab35 100644 --- a/core/transactor/factory/factory.go +++ b/core/transactor/factory/factory.go @@ -17,10 +17,11 @@ import ( // Factory is a transaction factory that builds 1559 transactions with the configured signer. type Factory struct { - noncer Noncer - signer kmstypes.TxSigner - signTxTimeout time.Duration - batcher Batcher + noncer Noncer + signer kmstypes.TxSigner + signTxTimeout time.Duration + batcher Batcher + defaultRequireSuccess bool // require success for all transactions in a batch // caches ethClient eth.Client @@ -31,13 +32,15 @@ type Factory struct { // New creates a new factory instance. func New( noncer Noncer, batcher Batcher, signer kmstypes.TxSigner, signTxTimeout time.Duration, + defaultRequireSuccess bool, ) *Factory { return &Factory{ - noncer: noncer, - signer: signer, - signTxTimeout: signTxTimeout, - batcher: batcher, - signerAddress: signer.Address(), + noncer: noncer, + signer: signer, + signTxTimeout: signTxTimeout, + batcher: batcher, + defaultRequireSuccess: defaultRequireSuccess, + signerAddress: signer.Address(), } } @@ -57,7 +60,7 @@ func (f *Factory) BuildTransactionFromRequests( return f.buildTransaction(ctx, requests[0], 0) default: // len(txReqs) > 1 then build a multicall transaction. - ar := f.batcher.BatchRequests(requests...) + ar := f.batcher.BatchRequests(f.defaultRequireSuccess, requests...) // Build the transaction to include the calldata. // ar.To should be the Multicall3 contract address diff --git a/core/transactor/factory/types.go b/core/transactor/factory/types.go index f60ca36..29fcc30 100644 --- a/core/transactor/factory/types.go +++ b/core/transactor/factory/types.go @@ -17,10 +17,11 @@ type Noncer interface { // Batcher is an interface for batching requests, commonly implemented by multicallers. type Batcher interface { // BatchRequests creates a batched transaction request for the given call requests. - BatchRequests(callReqs ...*ethereum.CallMsg) *types.Request + BatchRequests(requireSuccess bool, callReqs ...*ethereum.CallMsg) *types.Request // BatchCallRequests returns multicall results after executing the given call requests. BatchCallRequests( - ctx context.Context, from common.Address, callReqs ...*ethereum.CallMsg, + ctx context.Context, from common.Address, requireSuccess bool, + callReqs ...*ethereum.CallMsg, ) (any, error) } diff --git a/core/transactor/loop.go b/core/transactor/loop.go index cd28df4..2b4930e 100644 --- a/core/transactor/loop.go +++ b/core/transactor/loop.go @@ -21,7 +21,7 @@ func (t *TxrV2) mainLoop(ctx context.Context) { requests := t.retrieveBatch(ctx) if len(requests) == 0 { // We didn't get any transactions, so we wait for more. - t.logger.Info("no tx requests to process....") + t.logger.Debug("no tx requests to process...") time.Sleep(t.cfg.EmptyQueueDelay) continue } diff --git a/core/transactor/tracker/mempool.go b/core/transactor/tracker/mempool.go new file mode 100644 index 0000000..922eaa3 --- /dev/null +++ b/core/transactor/tracker/mempool.go @@ -0,0 +1,43 @@ +package tracker + +import ( + "context" + + "github.com/berachain/offchain-sdk/client/eth" + + "github.com/ethereum/go-ethereum/common" +) + +// getPendingNoncesFor returns the nonces that are currently pending in the mempool for the given +// sender. +func getPendingNoncesFor( + ctx context.Context, ethClient eth.Client, sender common.Address, +) (map[uint64]struct{}, error) { + contentFrom, err := ethClient.TxPoolContentFrom(ctx, sender) + if err != nil { + return nil, err + } + + pending := make(map[uint64]struct{}) + for nonce := range contentFrom["pending"] { + pending[nonce] = struct{}{} + } + return pending, nil +} + +// getQueuedNoncesFor returns the nonces that are currently queued in the mempool for the given +// sender. +func getQueuedNoncesFor( + ctx context.Context, ethClient eth.Client, sender common.Address, +) (map[uint64]struct{}, error) { + contentFrom, err := ethClient.TxPoolContentFrom(ctx, sender) + if err != nil { + return nil, err + } + + queued := make(map[uint64]struct{}) + for nonce := range contentFrom["queued"] { + queued[nonce] = struct{}{} + } + return queued, nil +} diff --git a/core/transactor/tracker/noncer.go b/core/transactor/tracker/noncer.go index 8432db2..8386ee9 100644 --- a/core/transactor/tracker/noncer.go +++ b/core/transactor/tracker/noncer.go @@ -2,7 +2,6 @@ package tracker import ( "context" - "strconv" "sync" "time" @@ -11,8 +10,12 @@ import ( "github.com/huandu/skiplist" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/lru" ) +// noncesCapacity is the capacity of the in-mempool nonces cache. +const noncesCapacity = 100 + // Noncer is a struct that manages nonces for transactions. type Noncer struct { sender common.Address // The address of the sender. @@ -20,8 +23,7 @@ type Noncer struct { // mempool state latestPendingNonce uint64 - // TODO: purge old nonces from the map to avoid infinite memory growth - inMempoolNonces map[uint64]struct{} + inMempoolNonces *lru.Cache[uint64, struct{}] // "in-process" nonces acquired map[uint64]struct{} // The set of acquired nonces. @@ -36,7 +38,7 @@ type Noncer struct { func NewNoncer(sender common.Address, refreshInterval time.Duration) *Noncer { return &Noncer{ sender: sender, - inMempoolNonces: make(map[uint64]struct{}), + inMempoolNonces: lru.NewCache[uint64, struct{}](noncesCapacity), acquired: make(map[uint64]struct{}), inFlight: skiplist.New(skiplist.Uint64), refreshInterval: refreshInterval, @@ -68,21 +70,23 @@ func (n *Noncer) refreshNonces(ctx context.Context) { n.mu.Lock() defer n.mu.Unlock() + // Update the latest pending nonce. if pendingNonce, err := n.ethClient.PendingNonceAt(ctx, n.sender); err == nil { // This should already be in sync with latest pending nonce according to the chain. n.latestPendingNonce = pendingNonce // TODO: handle case where stored & chain pending nonce is out of sync? } - // Use txpool.inspect instead of txpool.content. Less data to fetch. - if content, err := n.ethClient.TxPoolInspect(ctx); err == nil { - for nonceStr := range content["pending"][n.sender] { - nonce, _ := strconv.ParseUint(nonceStr, 10, 64) - n.inMempoolNonces[nonce] = struct{}{} + // Get all the nonces in the mempool, to notify whether a tx at a given nonce is replacing + // an existing mempool tx. + if pendingNonces, err := getPendingNoncesFor(ctx, n.ethClient, n.sender); err == nil { + for nonce := range pendingNonces { + n.inMempoolNonces.Add(nonce, struct{}{}) } - for nonceStr := range content["queued"][n.sender] { - nonce, _ := strconv.ParseUint(nonceStr, 10, 64) - n.inMempoolNonces[nonce] = struct{}{} + } + if queuedNonces, err := getQueuedNoncesFor(ctx, n.ethClient, n.sender); err == nil { + for nonce := range queuedNonces { + n.inMempoolNonces.Add(nonce, struct{}{}) } } } @@ -96,10 +100,9 @@ func (n *Noncer) Acquire() (uint64, bool) { // Get the next available nonce from the inFlight list, if any. var ( - nonce uint64 - isReplacing bool - front = n.inFlight.Front() - back = n.inFlight.Back() + nonce uint64 + front = n.inFlight.Front() + back = n.inFlight.Back() ) if front != nil && back != nil { // Iterate through the inFlight objects to ensure there are no gaps @@ -116,13 +119,8 @@ func (n *Noncer) Acquire() (uint64, bool) { } n.acquired[nonce] = struct{}{} - // Set isReplacing to true only if the next nonce is already pending in the mempool. - if _, inMempool := n.inMempoolNonces[nonce]; inMempool { - delete(n.inMempoolNonces, nonce) - isReplacing = true - } - - return nonce, isReplacing + // Tx is "replacing" only if the returned nonce is already pending/queued in the mempool. + return nonce, n.inMempoolNonces.Remove(nonce) } // RemoveAcquired removes a nonce from the acquired list, when a transaction is unable to be sent. diff --git a/core/transactor/tracker/tracker.go b/core/transactor/tracker/tracker.go index f103268..ea23eec 100644 --- a/core/transactor/tracker/tracker.go +++ b/core/transactor/tracker/tracker.go @@ -2,7 +2,6 @@ package tracker import ( "context" - "strconv" "time" "github.com/berachain/offchain-sdk/client/eth" @@ -12,7 +11,7 @@ import ( coretypes "github.com/ethereum/go-ethereum/core/types" ) -const retryBackoff = 500 * time.Millisecond +const retryBackoff = 1 * time.Second // Tracker is a component that keeps track of the transactions that are already sent to the chain. type Tracker struct { @@ -20,8 +19,7 @@ type Tracker struct { dispatcher *event.Dispatcher[*Response] senderAddr common.Address // tx sender address - inMempoolTimeout time.Duration // for hitting mempool - staleTimeout time.Duration // for a tx receipt + waitingTimeout time.Duration // how long to spin for a tx status ethClient eth.Client } @@ -29,14 +27,13 @@ type Tracker struct { // New creates a new transaction tracker. func New( noncer *Noncer, dispatcher *event.Dispatcher[*Response], sender common.Address, - inMempoolTimeout, staleTimeout time.Duration, + txWaitingTimeout time.Duration, ) *Tracker { return &Tracker{ - noncer: noncer, - dispatcher: dispatcher, - senderAddr: sender, - inMempoolTimeout: inMempoolTimeout, - staleTimeout: staleTimeout, + noncer: noncer, + dispatcher: dispatcher, + senderAddr: sender, + waitingTimeout: txWaitingTimeout, } } @@ -50,11 +47,13 @@ func (t *Tracker) Track(ctx context.Context, resp *Response) { go t.trackStatus(ctx, resp) } -// trackStatus polls the for transaction status and updates the in-flight list. +// trackStatus polls the for transaction status (waits for it to reach the mempool or be confirmed) +// and updates the in-flight list. func (t *Tracker) trackStatus(ctx context.Context, resp *Response) { var ( - txHash = resp.Hash() - timer = time.NewTimer(t.inMempoolTimeout) + txHash = resp.Hash() + timer = time.NewTimer(t.waitingTimeout) + isPending bool ) defer timer.Stop() @@ -66,96 +65,35 @@ func (t *Tracker) trackStatus(ctx context.Context, resp *Response) { // If the context is done, it could be due to cancellation or other reasons. return case <-timer.C: - // Not found in mempool, wait for it to be mined or go stale. - t.waitMined(ctx, resp, false) + // Not found after waitingTimeout, mark it stale. + t.markExpired(resp, isPending) return default: - // Check the mempool again. - if t.checkMempool(ctx, resp) { - return - } - - // Check for the receipt again. - if receipt, err := t.ethClient.TransactionReceipt(ctx, txHash); err == nil { - t.markConfirmed(resp, receipt) - return - } - - // If not found anywhere, wait for a backoff and try again. + // Wait for a backoff before trying again. time.Sleep(retryBackoff) - } - } -} - -// checkMempool marks the tx according to its state in the mempool. Returns true if found. -func (t *Tracker) checkMempool(ctx context.Context, resp *Response) bool { - content, err := t.ethClient.TxPoolContentFrom(ctx, t.senderAddr) - if err != nil { - return false - } - txNonce := strconv.FormatUint(resp.Nonce(), 10) - if senderTxs, ok := content["pending"]; ok { - if _, isPending := senderTxs[txNonce]; isPending { - t.markPending(ctx, resp) - return true - } - } - - if senderTxs, ok := content["queued"]; ok { - if _, isQueued := senderTxs[txNonce]; isQueued { - // mark the transaction as expired, but it does exist in the mempool. - t.markExpired(resp, false) - return true - } - } - - return false -} -// waitMined waits for a receipt until the transaction is either confirmed or marked stale. -func (t *Tracker) waitMined(ctx context.Context, resp *Response, isAlreadyPending bool) { - var ( - txHash = resp.Hash() - receipt *coretypes.Receipt - err error - timer = time.NewTimer(t.staleTimeout) - ) - defer timer.Stop() + // Check in the pending mempool, only if we know it's not already pending. + if !isPending { + if pendingNonces, err := getPendingNoncesFor( + ctx, t.ethClient, t.senderAddr, + ); err == nil { + if _, isPending = pendingNonces[resp.Nonce()]; isPending { + // Remove from the noncer inFlight set since we know the tx has reached + // the mempool as executable/pending. Now waiting for confirmation. + t.noncer.RemoveInFlight(resp.Nonce()) + } + } + } - // Loop until the context is done, the transaction status is determined, or the timeout is - // reached. - for { - select { - case <-ctx.Done(): - // If the context is done, it could be due to cancellation or other reasons. - return - case <-timer.C: - // If the timeout is reached, mark the transaction as expired (the tx has been lost and - // not found anywhere if isAlreadyPending == false). - t.markExpired(resp, isAlreadyPending) - return - default: - // Else check for the receipt again. - if receipt, err = t.ethClient.TransactionReceipt(ctx, txHash); err == nil { + // Check for the receipt. + if receipt, err := t.ethClient.TransactionReceipt(ctx, txHash); err == nil { t.markConfirmed(resp, receipt) return } - - // on any error, search for the receipt after a backoff - time.Sleep(retryBackoff) } } } -// markPending marks the transaction as pending. The transaction is sitting in the "pending" set of -// the mempool --> up to the chain to confirm, remove from inflight. -func (t *Tracker) markPending(ctx context.Context, resp *Response) { - // Remove from the noncer inFlight set since we know the tx has reached the mempool as - // executable/pending. - t.noncer.RemoveInFlight(resp.Nonce()) - t.waitMined(ctx, resp, true) -} - // markConfirmed is called once a transaction has been included in the canonical chain. func (t *Tracker) markConfirmed(resp *Response, receipt *coretypes.Receipt) { // Set the contract address field on the receipt since geth doesn't do this. diff --git a/core/transactor/transactor.go b/core/transactor/transactor.go index f9ada73..eaedd7d 100644 --- a/core/transactor/transactor.go +++ b/core/transactor/transactor.go @@ -61,11 +61,9 @@ func NewTransactor(cfg Config, signer kmstypes.TxSigner, batcher factory.Batcher // Build the transactor components. noncer := tracker.NewNoncer(signer.Address(), cfg.PendingNonceInterval) - factory := factory.New(noncer, batcher, signer, cfg.SignTxTimeout) + factory := factory.New(noncer, batcher, signer, cfg.SignTxTimeout, cfg.MulticallRequireSuccess) dispatcher := event.NewDispatcher[*tracker.Response]() - tracker := tracker.New( - noncer, dispatcher, signer.Address(), cfg.InMempoolTimeout, cfg.TxReceiptTimeout, - ) + tracker := tracker.New(noncer, dispatcher, signer.Address(), cfg.TxWaitingTimeout) return &TxrV2{ cfg: cfg,