From 44a1499388558170dcdb93638e39838583fa2751 Mon Sep 17 00:00:00 2001 From: Cal Bera Date: Tue, 19 Mar 2024 01:30:48 -0400 Subject: [PATCH] more cleanupg --- core/transactor/config.go | 5 ++- core/transactor/sender/sender.go | 55 +++++++++++------------------ core/transactor/tracker/noncer.go | 5 +-- core/transactor/tracker/tracker.go | 20 +++++------ core/transactor/transactor.go | 56 ++++++++++++++++-------------- types/queue/sqs/sqs.go | 6 ++-- 6 files changed, 72 insertions(+), 75 deletions(-) diff --git a/core/transactor/config.go b/core/transactor/config.go index 3250ab5..0aa0f3f 100644 --- a/core/transactor/config.go +++ b/core/transactor/config.go @@ -15,7 +15,7 @@ type Config struct { EmptyQueueDelay time.Duration // Whether we wait the full batch timeout before firing txs. False means we will fire as soon // as we reach the desired batch size. - WaitBatchTimeout bool + WaitFullBatchTimeout bool // How long to wait for the pending nonce (ideally 1 block time). PendingNonceInterval time.Duration @@ -23,4 +23,7 @@ type Config struct { InMempoolTimeout time.Duration // How long to wait for a tx to be mined/confirmed by the chain. TxReceiptTimeout time.Duration + + // How often to post a snapshot of the transactor system status (ideally 1 block time). + StatusUpdateInterval time.Duration } diff --git a/core/transactor/sender/sender.go b/core/transactor/sender/sender.go index 7e77b97..f6d512f 100644 --- a/core/transactor/sender/sender.go +++ b/core/transactor/sender/sender.go @@ -2,6 +2,7 @@ package sender import ( "context" + "sync" "time" "github.com/berachain/offchain-sdk/client/eth" @@ -14,24 +15,21 @@ import ( // Sender struct holds the transaction replacement and retry policies. type Sender struct { factory Factory // factory to sign new transactions - tracker Tracker // tracker to track sent transactions txReplacementPolicy TxReplacementPolicy // policy to replace transactions retryPolicy RetryPolicy // policy to retry transactions - sendingTxs map[string]struct{} // msgs that are currently sending (or retrying) + sendingTxs sync.Map // msgs that are currently sending (or retrying) chain eth.Client logger log.Logger } // New creates a new Sender with default replacement and exponential retry policies. -func New(factory Factory, tracker Tracker) *Sender { +func New(factory Factory) *Sender { return &Sender{ - tracker: tracker, factory: factory, txReplacementPolicy: &DefaultTxReplacementPolicy{nf: factory}, retryPolicy: &ExpoRetryPolicy{}, // TODO: choose from config. - sendingTxs: make(map[string]struct{}), } } @@ -42,48 +40,41 @@ func (s *Sender) Setup(chain eth.Client, logger log.Logger) { // If a msgID IsSending (true is returned), the preconfirmed state is "StateSending". func (s *Sender) IsSending(msgID string) bool { - _, ok := s.sendingTxs[msgID] + _, ok := s.sendingTxs.Load(msgID) return ok } -// SendTransaction sends a transaction using the Ethereum client. If the transaction fails, -// it retries based on the retry policy, only once (further retries will not retry again). If -// sending is successful, it uses the tracker to track the transaction. -func (s *Sender) SendTransactionAndTrack( - ctx context.Context, tx *coretypes.Transaction, - msgIDs []string, timesFired []time.Time, shouldRetry bool, +// SendTransaction sends a transaction using the Ethereum client. If the transaction fails to send, +// it retries based on the confi retry policy. +func (s *Sender) SendTransaction( + ctx context.Context, tx *coretypes.Transaction, msgIDs []string, ) error { - // Try sending the transaction. for _, msgID := range msgIDs { - s.sendingTxs[msgID] = struct{}{} - } - if err := s.chain.SendTransaction(ctx, tx); err != nil { - if shouldRetry { // If sending the transaction fails, retry according to the retry policy. - go s.retryTxWithPolicy(ctx, tx, msgIDs, timesFired, err) - } - return err + s.sendingTxs.Store(msgID, struct{}{}) } - // If no error on sending, start tracking the transaction. + // Try sending the transaction (with retry if applicable). + err := s.retryTxWithPolicy(ctx, tx) + for _, msgID := range msgIDs { - delete(s.sendingTxs, msgID) + s.sendingTxs.Delete(msgID) } - s.tracker.Track(ctx, tx, msgIDs, timesFired) - return nil + + return err } -// retryTxWithPolicy retries sending tx according to the retry policy. Specifically handles two +// retryTxWithPolicy (re)tries sending tx according to the retry policy. Specifically handles two // common errors on sending a transaction (NonceTooLow, ReplaceUnderpriced) by replacing the tx // appropriately. -func (s *Sender) retryTxWithPolicy( - ctx context.Context, tx *coretypes.Transaction, - msgIDs []string, timesFired []time.Time, err error, -) { +func (s *Sender) retryTxWithPolicy(ctx context.Context, tx *coretypes.Transaction) error { for { + // (Re)try sending the transaction. + err := s.chain.SendTransaction(ctx, tx) + // Check the policy to see if we should retry this transaction. retry, backoff := s.retryPolicy.Get(tx, err) if !retry { - return + return err } time.Sleep(backoff) // Retry after recommended backoff. @@ -109,10 +100,6 @@ func (s *Sender) retryTxWithPolicy( ctx, tx.Nonce(), types.NewTxRequestFromTx(tx), ); err != nil { s.logger.Error("failed to sign replacement transaction", "err", err) - continue } - - // Retry sending the transaction. - err = s.SendTransactionAndTrack(ctx, tx, msgIDs, timesFired, false) } } diff --git a/core/transactor/tracker/noncer.go b/core/transactor/tracker/noncer.go index 4745add..c5539dc 100644 --- a/core/transactor/tracker/noncer.go +++ b/core/transactor/tracker/noncer.go @@ -40,11 +40,12 @@ func NewNoncer(sender common.Address, pendingNonceTimeout time.Duration) *Noncer } } -func (n *Noncer) SetClient(ethClient eth.Client) { +func (n *Noncer) Start(ctx context.Context, ethClient eth.Client) { n.ethClient = ethClient + go n.refreshLoop(ctx) } -func (n *Noncer) RefreshLoop(ctx context.Context) { +func (n *Noncer) refreshLoop(ctx context.Context) { n.refreshNonces(ctx) ticker := time.NewTicker(n.refreshInterval) diff --git a/core/transactor/tracker/tracker.go b/core/transactor/tracker/tracker.go index 7e036b3..bd9bfdf 100644 --- a/core/transactor/tracker/tracker.go +++ b/core/transactor/tracker/tracker.go @@ -2,6 +2,7 @@ package tracker import ( "context" + "sync" "time" "github.com/berachain/offchain-sdk/client/eth" @@ -18,7 +19,7 @@ type Tracker struct { staleTimeout time.Duration // for a tx receipt inMempoolTimeout time.Duration // for hitting mempool dispatcher *event.Dispatcher[*InFlightTx] - inFlightTxs map[string]struct{} // msgs that have been sent, but not confirmed + inFlightTxs sync.Map // msgs that have been sent, but not confirmed ethClient eth.Client } @@ -32,7 +33,6 @@ func New( staleTimeout: staleTimeout, inMempoolTimeout: inMempoolTimeout, dispatcher: dispatcher, - inFlightTxs: make(map[string]struct{}), } } @@ -40,24 +40,24 @@ func (t *Tracker) SetClient(chain eth.Client) { t.ethClient = chain } +// If a msgID IsInFlight (true is returned), the preconfirmed state is "StateInFlight". +func (t *Tracker) IsInFlight(msgID string) bool { + _, ok := t.inFlightTxs.Load(msgID) + return ok +} + // Track adds a transaction to the in-flight list and waits for a status. func (t *Tracker) Track( ctx context.Context, tx *coretypes.Transaction, msgIDs []string, timesFired []time.Time, ) { for _, msgID := range msgIDs { - t.inFlightTxs[msgID] = struct{}{} + t.inFlightTxs.Store(msgID, struct{}{}) } inFlightTx := &InFlightTx{Transaction: tx, MsgIDs: msgIDs, TimesFired: timesFired} t.noncer.SetInFlight(inFlightTx) go t.trackStatus(ctx, inFlightTx) } -// If a msgID IsInFlight (true is returned), the preconfirmed state is "StateInFlight". -func (t *Tracker) IsInFlight(msgID string) bool { - _, ok := t.inFlightTxs[msgID] - return ok -} - // trackStatus polls the for transaction status and updates the in-flight list. func (t *Tracker) trackStatus(ctx context.Context, tx *InFlightTx) { var ( @@ -168,7 +168,7 @@ func (t *Tracker) markStale(tx *InFlightTx, isPending bool) { func (t *Tracker) dispatchTx(tx *InFlightTx) { t.noncer.RemoveInFlight(tx) for _, msgID := range tx.MsgIDs { - delete(t.inFlightTxs, msgID) + t.inFlightTxs.Delete(msgID) } t.dispatcher.Dispatch(tx) } diff --git a/core/transactor/transactor.go b/core/transactor/transactor.go index 1b68b64..1ca8f8d 100644 --- a/core/transactor/transactor.go +++ b/core/transactor/transactor.go @@ -5,7 +5,6 @@ import ( "sync" "time" - "github.com/berachain/offchain-sdk/client/eth" "github.com/berachain/offchain-sdk/core/transactor/event" "github.com/berachain/offchain-sdk/core/transactor/factory" "github.com/berachain/offchain-sdk/core/transactor/sender" @@ -28,7 +27,6 @@ type TxrV2 struct { factory *factory.Factory noncer *tracker.Noncer dispatcher *event.Dispatcher[*tracker.InFlightTx] - chain eth.Client logger log.Logger mu sync.Mutex } @@ -50,7 +48,7 @@ func NewTransactor( dispatcher: dispatcher, cfg: cfg, factory: factory, - sender: sender.New(factory, tracker), + sender: sender.New(factory), tracker: tracker, noncer: noncer, requests: queue, @@ -66,7 +64,7 @@ func (t *TxrV2) RegistryKey() string { // TODO: deprecate off being a job. func (t *TxrV2) Setup(ctx context.Context) error { sCtx := sdk.UnwrapContext(ctx) - t.chain = sCtx.Chain() + chain := sCtx.Chain() t.logger = sCtx.Logger() // Register the transactor as a subscriber to the tracker. @@ -78,12 +76,13 @@ func (t *TxrV2) Setup(ctx context.Context) error { }() t.dispatcher.Subscribe(ch) - // TODO: need lock on nonce to support more than one - t.noncer.SetClient(t.chain) - t.factory.SetClient(t.chain) - t.sender.Setup(t.chain, t.logger) - t.tracker.SetClient(t.chain) - t.Start(sCtx) + // Setup and start all the transactor components. + t.factory.SetClient(chain) + t.sender.Setup(chain, t.logger) + t.tracker.SetClient(chain) + t.noncer.Start(ctx, chain) + go t.mainLoop(ctx) + return nil } @@ -92,18 +91,18 @@ func (t *TxrV2) Setup(ctx context.Context) error { func (t *TxrV2) Execute(_ context.Context, _ any) (any, error) { acquired, inFlight := t.noncer.Stats() t.logger.Info( - "🧠 system status", "waiting-tx", acquired, "in-flight-tx", - inFlight, "pending-requests", t.requests.Len(), + "🧠 system status", + "waiting-tx", acquired, "in-flight-tx", inFlight, "pending-requests", t.requests.Len(), ) return nil, nil //nolint:nilnil // its okay. } // IntervalTime implements job.Polling. -func (t *TxrV2) IntervalTime(_ context.Context) time.Duration { - return 5 * time.Second //nolint:gomnd // TODO: read from config. +func (t *TxrV2) IntervalTime(context.Context) time.Duration { + return t.cfg.StatusUpdateInterval } -// SubscribeTxResults sends the tx results (inflight) to the given channel. +// SubscribeTxResults sends the tx results, once confirmed, to the given subscriber. func (t *TxrV2) SubscribeTxResults(ctx context.Context, subscriber tracker.Subscriber) { ch := make(chan *tracker.InFlightTx) go func() { @@ -136,7 +135,6 @@ func (t *TxrV2) GetPreconfirmedState(msgID string) tracker.PreconfirmState { // Start starts the transactor. func (t *TxrV2) Start(ctx context.Context) { - go t.noncer.RefreshLoop(ctx) go t.mainLoop(ctx) } @@ -157,21 +155,21 @@ func (t *TxrV2) mainLoop(ctx context.Context) { continue } - // We got a batch, so we send it and track it. - // We must first wait for the previous sending to finish. + // We got a batch, so we send it and track it. But first wait for the previous sending + // to finish. t.mu.Lock() go func() { - defer t.mu.Unlock() if err := t.sendAndTrack(ctx, msgIDs, timesFired, batch...); err != nil { t.logger.Error("failed to process batch", "msgs", msgIDs, "err", err) } + t.mu.Unlock() }() } } } // retrieveBatch retrieves a batch of transaction requests from the queue. It waits until 1) it -// hits the batch timeout or 2) max batch size only if waitBatchTimeout is false. +// hits the batch timeout or 2) tx batch size is reached only if waitFullBatchTimeout is false. func (t *TxrV2) retrieveBatch() ([]string, []time.Time, []*types.TxRequest) { var ( retMsgIDs []string @@ -183,15 +181,18 @@ func (t *TxrV2) retrieveBatch() ([]string, []time.Time, []*types.TxRequest) { // Loop until the batch tx timeout expires. for ; timeRemaining > 0; timeRemaining = t.cfg.TxBatchTimeout - time.Since(startTime) { - txsRemaining := int32(t.cfg.TxBatchSize - len(batch)) - if txsRemaining == 0 { // if we reached max batch size, we can break out of the loop. - if t.cfg.WaitBatchTimeout { + txsRemaining := t.cfg.TxBatchSize - len(batch) + + // If we reached max batch size, we can break out of the loop. + if txsRemaining == 0 { + // Sleep for the time remaining if we want to wait for the full batch timeout. + if t.cfg.WaitFullBatchTimeout { time.Sleep(timeRemaining) } break } - msgIDs, txReq, times, err := t.requests.ReceiveMany(txsRemaining) + msgIDs, txReq, times, err := t.requests.ReceiveMany(int32(txsRemaining)) if err != nil { t.logger.Error("failed to receive tx request", "err", err) continue @@ -216,11 +217,14 @@ func (t *TxrV2) sendAndTrack( return err } - // Send the transaction to the chain and track it async. - if err = t.sender.SendTransactionAndTrack(ctx, tx, msgIDs, timesFired, true); err != nil { + // Send the transaction to the chain. + if err = t.sender.SendTransaction(ctx, tx, msgIDs); err != nil { return err } + // Track the transaction status async. + t.tracker.Track(ctx, tx, msgIDs, timesFired) + t.logger.Debug("📡 sent transaction", "tx-hash", tx.Hash().Hex(), "tx-reqs", len(batch)) return nil } diff --git a/types/queue/sqs/sqs.go b/types/queue/sqs/sqs.go index d6666ad..d75cda2 100644 --- a/types/queue/sqs/sqs.go +++ b/types/queue/sqs/sqs.go @@ -7,6 +7,8 @@ import ( "sync" "time" + goutils "github.com/berachain/go-utils/utils" + "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/sqs" @@ -131,7 +133,7 @@ func (q *Queue[T]) Receive() (string, T, time.Time, bool) { // TODO memory growth atm. q.inProcess[*resp.Messages[0].MessageId] = *resp.Messages[0].ReceiptHandle - return *resp.Messages[0].MessageId, t, timeInserted.(time.Time), true + return *resp.Messages[0].MessageId, t, goutils.MustGetAs[time.Time](timeInserted), true } func (q *Queue[T]) ReceiveMany(num int32) ([]string, []T, []time.Time, error) { @@ -178,7 +180,7 @@ func (q *Queue[T]) ReceiveMany(num int32) ([]string, []T, []time.Time, error) { msgIDs[i] = *m.MessageId ts[i] = t - timesInserted[i] = timeInserted.(time.Time) //nolint:errcheck // always time.Time type. + timesInserted[i] = goutils.MustGetAs[time.Time](timeInserted) } return msgIDs, ts, timesInserted, nil