Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(transactor): More accurate tracking of mempool upon waiting for tx confirmation #95

Merged
merged 9 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions client/eth/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,21 +52,21 @@ type Reader interface {

{
"pending": {
"1": {
1: {
// transaction details...
},
...
},
"queued": {
"3": {
3: {
// transaction details...
},
...
}
}
*/
TxPoolContentFrom(ctx context.Context, address common.Address) (
map[string]map[string]*ethcoretypes.Transaction, error,
map[string]map[uint64]*ethcoretypes.Transaction, error,
)

/*
Expand All @@ -76,19 +76,19 @@ type Reader interface {
{
"pending": {
"0x12345": {
"1": "0x12345789: 1 wei + 2 gas x 3 wei"
1: "0x12345789: 1 wei + 2 gas x 3 wei"
},
...
},
"queued": {
"0x67890": {
"2": "0x12345789: 1 wei + 2 gas x 3 wei"
2: "0x12345789: 1 wei + 2 gas x 3 wei"
},
...
}
}
*/
TxPoolInspect(ctx context.Context) (map[string]map[common.Address]map[string]string, error)
TxPoolInspect(ctx context.Context) (map[string]map[common.Address]map[uint64]string, error)
}

type Writer interface {
Expand Down Expand Up @@ -129,7 +129,7 @@ func (c *ExtendedEthClient) Close() error {
if c == nil {
return ErrClosed
}
c.Close()
c.Client.Close()
return nil
}

Expand Down Expand Up @@ -180,8 +180,8 @@ func (c *ExtendedEthClient) SubscribeFilterLogs(

func (c *ExtendedEthClient) TxPoolContentFrom(
ctx context.Context, address common.Address,
) (map[string]map[string]*ethcoretypes.Transaction, error) {
var result map[string]map[string]*ethcoretypes.Transaction
) (map[string]map[uint64]*ethcoretypes.Transaction, error) {
var result map[string]map[uint64]*ethcoretypes.Transaction
ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout)
defer cancel()
if err := c.Client.Client().CallContext(
Expand All @@ -194,8 +194,8 @@ func (c *ExtendedEthClient) TxPoolContentFrom(

func (c *ExtendedEthClient) TxPoolInspect(
ctx context.Context,
) (map[string]map[common.Address]map[string]string, error) {
var result map[string]map[common.Address]map[string]string
) (map[string]map[common.Address]map[uint64]string, error) {
var result map[string]map[common.Address]map[uint64]string
ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout)
defer cancel()
if err := c.Client.Client().CallContext(
Expand Down
12 changes: 6 additions & 6 deletions client/eth/client_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,21 +261,21 @@ Example response:

{
"pending": {
"1": {
1: {
// transaction details...
},
...
},
"queued": {
"3": {
3: {
// transaction details...
},
...
}
}
*/
func (c *ChainProviderImpl) TxPoolContentFrom(ctx context.Context, address common.Address) (
map[string]map[string]*types.Transaction, error,
map[string]map[uint64]*types.Transaction, error,
) {
if client, ok := c.GetHTTP(); ok {
ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout)
Expand All @@ -292,20 +292,20 @@ Example response:
{
"pending": {
"0x12345": {
"1": "0x12345789: 1 wei + 2 gas x 3 wei"
1: "0x12345789: 1 wei + 2 gas x 3 wei"
},
...
},
"queued": {
"0x67890": {
"2": "0x12345789: 1 wei + 2 gas x 3 wei"
2: "0x12345789: 1 wei + 2 gas x 3 wei"
},
...
}
}
*/
func (c *ChainProviderImpl) TxPoolInspect(ctx context.Context) (
map[string]map[common.Address]map[string]string, error,
map[string]map[common.Address]map[uint64]string, error,
) {
if client, ok := c.GetHTTP(); ok {
ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout)
Expand Down
156 changes: 156 additions & 0 deletions client/eth/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package eth_test

import (
"context"
"os"
"testing"
"time"

"github.com/berachain/offchain-sdk/client/eth"
"github.com/stretchr/testify/assert"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
ethcoretypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
)

// This file tests the methods on the extended eth client.
//
// Note that the following must be set up for these tests to run:
// - the chain RPC is available at env var `ETH_RPC_URL`
// - OR the chain WS RPC is available at env var `ETH_RPC_URL_WS`
// - the RPC endpoint must return within 5 seconds or tests will timeout
// - [Optional for `txPoolContentFrom`] a wallet to query the mempool for at env var `ETH_ADDR`

const (
TestModeHTTP int = iota
TestModeWS
TestModeEither
)

// NOTE: requires Ethereum chain rpc url at env var `ETH_RPC_URL` or `ETH_RPC_URL_WS`.
func setUp(testMode int, t *testing.T) (*eth.ExtendedEthClient, error) {
rpcTimeout := 5 * time.Second
ctxWithTimeout, cancel := context.WithTimeout(context.Background(), rpcTimeout)
defer cancel()

var ethRPC string
switch testMode {
case TestModeWS:
ethRPC = os.Getenv("ETH_RPC_URL_WS")
case TestModeHTTP:
ethRPC = os.Getenv("ETH_RPC_URL")
case TestModeEither:
if ethRPC = os.Getenv("ETH_RPC_URL_WS"); ethRPC == "" {
ethRPC = os.Getenv("ETH_RPC_URL")
}
default:
panic("invalid test mode")
}
if ethRPC == "" {
t.Skipf("Skipping test: no eth rpc url provided")
}

ethClient, err := ethclient.DialContext(ctxWithTimeout, ethRPC)
if err != nil {
return nil, err
}

eec := eth.NewExtendedEthClient(ethClient, rpcTimeout)
return eec, nil
}

// NOTE: requires Ethereum chain rpc url at env var `ETH_RPC_URL` or `ETH_RPC_URL_WS`.
func TestClose(t *testing.T) {
eec, err := setUp(TestModeEither, t)
assert.NoError(t, err)

err = eec.Close()
assert.NoError(t, err)
}

// NOTE: requires Ethereum chain rpc url at env var `ETH_RPC_URL` or `ETH_RPC_URL_WS`.
func TestHealth(t *testing.T) {
eec, err := setUp(TestModeEither, t)
assert.NoError(t, err)

health := eec.Health()
assert.True(t, health)
}

// NOTE: requires Ethereum chain rpc url at env var `ETH_RPC_URL`.
func TestGetReceipts(t *testing.T) {
eec, err := setUp(TestModeHTTP, t)
assert.NoError(t, err)

ctx := context.Background()
txs := ethcoretypes.Transactions{}

receipts, err := eec.GetReceipts(ctx, txs)
assert.NoError(t, err)
assert.Empty(t, receipts)
}

// NOTE: requires Ethereum chain rpc url at env var `ETH_RPC_URL_WS`.
func TestSubscribeNewHead(t *testing.T) {
eec, err := setUp(TestModeWS, t)
assert.NoError(t, err)

ctx := context.Background()

ch, sub, err := eec.SubscribeNewHead(ctx)
assert.NoError(t, err)
assert.NotNil(t, ch)
assert.NotNil(t, sub)

assert.NotPanics(t, func() { sub.Unsubscribe() })
}

// NOTE: requires Ethereum chain rpc url at env var `ETH_RPC_URL_WS`.
func TestSubscribeFilterLogs(t *testing.T) {
eec, err := setUp(TestModeWS, t)
assert.NoError(t, err)

ctx := context.Background()
query := ethereum.FilterQuery{}
ch := make(chan ethcoretypes.Log)

sub, err := eec.SubscribeFilterLogs(ctx, query, ch)
assert.NoError(t, err)
assert.NotNil(t, sub)

assert.NotPanics(t, func() { sub.Unsubscribe() })
}

// NOTE: requires Ethereum chain rpc url at env var `ETH_RPC_URL` AND a wallet to query the
// txpool for at `ETH_ADDR`.
func TestTxPoolContentFrom(t *testing.T) {
eec, err := setUp(TestModeHTTP, t)
assert.NoError(t, err)

ctx := context.Background()
addrStr := os.Getenv("ETH_ADDR")
if addrStr == "" {
t.Skipf("Skipping test: no eth address provided")
}
address := common.HexToAddress(addrStr)

result, err := eec.TxPoolContentFrom(ctx, address)
assert.NoError(t, err)
assert.NotNil(t, result)

t.Log("result", result)
}

// NOTE: requires Ethereum chain rpc url at env var `ETH_RPC_URL`.
func TestTxPoolInspect(t *testing.T) {
eec, err := setUp(TestModeHTTP, t)
assert.NoError(t, err)

ctx := context.Background()

result, err := eec.TxPoolInspect(ctx)
assert.NoError(t, err)
assert.NotNil(t, result)
}
8 changes: 4 additions & 4 deletions core/transactor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@ type Config struct {
WaitFullBatchTimeout bool
// How long to wait to retrieve txs from the queue if it is empty (ideally quick <= 1s).
EmptyQueueDelay time.Duration
// What the `requireSuccess` flag should be set for calls to `multicall`, if batching txs.
MulticallRequireSuccess bool

// Maximum duration allowed for the tx to be signed (increase this if using a remote signer)
SignTxTimeout time.Duration

// How long to wait for the pending nonce (ideally 1 block time).
PendingNonceInterval time.Duration
// How long to wait for a tx to hit the mempool (ideally 1-2 block time).
InMempoolTimeout time.Duration
// How long to wait for a tx to be mined/confirmed by the chain.
TxReceiptTimeout time.Duration
// How long to wait for a tx to hit the mempool and/or be confirmed by the chain.
TxWaitingTimeout time.Duration
// Whether we should resend txs that are stale (not confirmed after the receipt timeout).
ResendStaleTxs bool

Expand Down
10 changes: 6 additions & 4 deletions core/transactor/factory/batcher/multicall3.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func NewMulticall3(address common.Address) *Multicall3 {
}

// BatchRequests creates a batched transaction request for the given call requests.
func (mc *Multicall3) BatchRequests(callReqs ...*ethereum.CallMsg) *types.Request {
func (mc *Multicall3) BatchRequests(
requireSuccess bool, callReqs ...*ethereum.CallMsg,
) *types.Request {
var (
calls = make([]bindings.Multicall3Call, len(callReqs))
totalValue = big.NewInt(0)
Expand Down Expand Up @@ -73,20 +75,20 @@ func (mc *Multicall3) BatchRequests(callReqs ...*ethereum.CallMsg) *types.Reques

txRequest, _ := mc.packer.CreateRequest(
"", mc.contractAddress, totalValue, gasTipCap, gasFeeCap, gasLimit,
tryAggregate, false, calls,
tryAggregate, requireSuccess, calls,
)
return txRequest
}

// BatchCallRequests uses the Multicall3 contract to create a batched call request for the given
// call messages and return the batched call result data for each call, as a `[]Multicall3Result`.
func (mc *Multicall3) BatchCallRequests(
ctx context.Context, from common.Address, callReqs ...*ethereum.CallMsg,
ctx context.Context, from common.Address, requireSuccess bool, callReqs ...*ethereum.CallMsg,
) (any, error) {
sCtx := sdk.UnwrapContext(ctx)

// get the batched tx (call) requests
batchedCall := mc.BatchRequests(callReqs...)
batchedCall := mc.BatchRequests(requireSuccess, callReqs...)
batchedCall.From = from

// call the multicall3 contract with the batched call request
Expand Down
2 changes: 1 addition & 1 deletion core/transactor/factory/batcher/multicall3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestMulticall3(t *testing.T) {
}

// batch and send the calls to the chain
resp, err := multicaller.BatchCallRequests(sCtx, empty, call1.CallMsg, call2.CallMsg)
resp, err := multicaller.BatchCallRequests(sCtx, empty, false, call1.CallMsg, call2.CallMsg)
if err != nil {
t.Fatal(err)
}
Expand Down
Loading
Loading