Skip to content

Commit

Permalink
allow more optionality
Browse files Browse the repository at this point in the history
  • Loading branch information
calbera committed Mar 19, 2024
1 parent 098bbb2 commit 0d71d1e
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 19 deletions.
26 changes: 18 additions & 8 deletions core/transactor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,22 @@ import (
)

type Config struct {
Multicall3Address string
TxReceiptTimeout time.Duration // how long to wait for a tx to be mined (~2 block time)
InMempoolTimeout time.Duration // how long to wait for a tx to hit the mempool (1 block time)
PendingNonceTimeout time.Duration // how long to wait for the pending nonce (1 block time)
EmptyQueueDelay time.Duration // how long to wait if the queue is empty (quick <= 1s)
TxBatchSize int
TxBatchTimeout time.Duration // how long to wait for a batch to be flushed (1 block time)
CallTxTimeout time.Duration // how long to wait for a eth call result
Multicall3Address string

// How large an individual batched tx will be (uses multicall if > 1).
TxBatchSize int
// How long to wait for a batch to be flushed (ideally 1 block time).
TxBatchTimeout time.Duration
// How long to wait if the queue is empty (ideally quick <= 1s).
EmptyQueueDelay time.Duration
// Whether we wait the full batch timeout before firing txs. False means we will fire as soon
// as we reach the desired batch size.
WaitBatchTimeout bool

// How long to wait for the pending nonce (ideally 1 block time).
PendingNonceInterval time.Duration
// How long to wait for a tx to hit the mempool (ideally 1-2 block time).
InMempoolTimeout time.Duration
// How long to wait for a tx to be mined/confirmed by the chain.
TxReceiptTimeout time.Duration
}
35 changes: 24 additions & 11 deletions core/transactor/transactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type TxrV2 struct {
func NewTransactor(
cfg Config, queue queuetypes.Queue[*types.TxRequest], signer kmstypes.TxSigner,
) *TxrV2 {
noncer := tracker.NewNoncer(signer.Address(), cfg.PendingNonceTimeout)
noncer := tracker.NewNoncer(signer.Address(), cfg.PendingNonceInterval)
factory := factory.New(
noncer, signer,
factory.NewMulticall3Batcher(common.HexToAddress(cfg.Multicall3Address)),
Expand Down Expand Up @@ -170,25 +170,38 @@ func (t *TxrV2) mainLoop(ctx context.Context) {
}
}

// retrieveBatch retrieves a batch of transaction requests from the queue.
// It waits until it hits the max batch size or the timeout.
// retrieveBatch retrieves a batch of transaction requests from the queue. It waits until 1) it
// hits the batch timeout or 2) max batch size only if waitBatchTimeout is false.
func (t *TxrV2) retrieveBatch() ([]string, []time.Time, []*types.TxRequest) {
var retMsgIDs []string
var timesFired []time.Time
var batch []*types.TxRequest
startTime := time.Now()

// Retrieve the delta between the max total batch size.
for len(batch) < t.cfg.TxBatchSize && time.Since(startTime) < t.cfg.TxBatchTimeout {
msgIDs, txReq, times, err := t.requests.ReceiveMany(int32(t.cfg.TxBatchSize - len(batch)))
var (
retMsgIDs []string
timesFired []time.Time
batch []*types.TxRequest
startTime = time.Now()
timeRemaining = t.cfg.TxBatchTimeout - time.Since(startTime)
)

// Loop until the batch tx timeout expires.
for ; timeRemaining > 0; timeRemaining = t.cfg.TxBatchTimeout - time.Since(startTime) {
txsRemaining := int32(t.cfg.TxBatchSize - len(batch))
if txsRemaining == 0 { // if we reached max batch size, we can break out of the loop.
if t.cfg.WaitBatchTimeout {
time.Sleep(timeRemaining)
}
break
}

msgIDs, txReq, times, err := t.requests.ReceiveMany(txsRemaining)
if err != nil {
t.logger.Error("failed to receive tx request", "err", err)
continue
}

retMsgIDs = append(retMsgIDs, msgIDs...)
timesFired = append(timesFired, times...)
batch = append(batch, txReq...)
}

return retMsgIDs, timesFired, batch
}

Expand Down

0 comments on commit 0d71d1e

Please sign in to comment.