Skip to content

Commit

Permalink
more cleanupg
Browse files Browse the repository at this point in the history
  • Loading branch information
calbera committed Mar 19, 2024
1 parent 0d71d1e commit 44a1499
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 75 deletions.
5 changes: 4 additions & 1 deletion core/transactor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@ type Config struct {
EmptyQueueDelay time.Duration
// Whether we wait the full batch timeout before firing txs. False means we will fire as soon
// as we reach the desired batch size.
WaitBatchTimeout bool
WaitFullBatchTimeout bool

// How long to wait for the pending nonce (ideally 1 block time).
PendingNonceInterval time.Duration
// How long to wait for a tx to hit the mempool (ideally 1-2 block time).
InMempoolTimeout time.Duration
// How long to wait for a tx to be mined/confirmed by the chain.
TxReceiptTimeout time.Duration

// How often to post a snapshot of the transactor system status (ideally 1 block time).
StatusUpdateInterval time.Duration
}
55 changes: 21 additions & 34 deletions core/transactor/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sender

import (
"context"
"sync"
"time"

"github.com/berachain/offchain-sdk/client/eth"
Expand All @@ -14,24 +15,21 @@ import (
// Sender struct holds the transaction replacement and retry policies.
type Sender struct {
factory Factory // factory to sign new transactions
tracker Tracker // tracker to track sent transactions
txReplacementPolicy TxReplacementPolicy // policy to replace transactions
retryPolicy RetryPolicy // policy to retry transactions

sendingTxs map[string]struct{} // msgs that are currently sending (or retrying)
sendingTxs sync.Map // msgs that are currently sending (or retrying)

chain eth.Client
logger log.Logger
}

// New creates a new Sender with default replacement and exponential retry policies.
func New(factory Factory, tracker Tracker) *Sender {
func New(factory Factory) *Sender {
return &Sender{
tracker: tracker,
factory: factory,
txReplacementPolicy: &DefaultTxReplacementPolicy{nf: factory},
retryPolicy: &ExpoRetryPolicy{}, // TODO: choose from config.
sendingTxs: make(map[string]struct{}),
}
}

Expand All @@ -42,48 +40,41 @@ func (s *Sender) Setup(chain eth.Client, logger log.Logger) {

// If a msgID IsSending (true is returned), the preconfirmed state is "StateSending".
func (s *Sender) IsSending(msgID string) bool {
_, ok := s.sendingTxs[msgID]
_, ok := s.sendingTxs.Load(msgID)
return ok
}

// SendTransaction sends a transaction using the Ethereum client. If the transaction fails,
// it retries based on the retry policy, only once (further retries will not retry again). If
// sending is successful, it uses the tracker to track the transaction.
func (s *Sender) SendTransactionAndTrack(
ctx context.Context, tx *coretypes.Transaction,
msgIDs []string, timesFired []time.Time, shouldRetry bool,
// SendTransaction sends a transaction using the Ethereum client. If the transaction fails to send,
// it retries based on the confi retry policy.
func (s *Sender) SendTransaction(
ctx context.Context, tx *coretypes.Transaction, msgIDs []string,
) error {
// Try sending the transaction.
for _, msgID := range msgIDs {
s.sendingTxs[msgID] = struct{}{}
}
if err := s.chain.SendTransaction(ctx, tx); err != nil {
if shouldRetry { // If sending the transaction fails, retry according to the retry policy.
go s.retryTxWithPolicy(ctx, tx, msgIDs, timesFired, err)
}
return err
s.sendingTxs.Store(msgID, struct{}{})
}

// If no error on sending, start tracking the transaction.
// Try sending the transaction (with retry if applicable).
err := s.retryTxWithPolicy(ctx, tx)

for _, msgID := range msgIDs {
delete(s.sendingTxs, msgID)
s.sendingTxs.Delete(msgID)
}
s.tracker.Track(ctx, tx, msgIDs, timesFired)
return nil

return err
}

// retryTxWithPolicy retries sending tx according to the retry policy. Specifically handles two
// retryTxWithPolicy (re)tries sending tx according to the retry policy. Specifically handles two
// common errors on sending a transaction (NonceTooLow, ReplaceUnderpriced) by replacing the tx
// appropriately.
func (s *Sender) retryTxWithPolicy(
ctx context.Context, tx *coretypes.Transaction,
msgIDs []string, timesFired []time.Time, err error,
) {
func (s *Sender) retryTxWithPolicy(ctx context.Context, tx *coretypes.Transaction) error {
for {
// (Re)try sending the transaction.
err := s.chain.SendTransaction(ctx, tx)

// Check the policy to see if we should retry this transaction.
retry, backoff := s.retryPolicy.Get(tx, err)
if !retry {
return
return err
}
time.Sleep(backoff) // Retry after recommended backoff.

Expand All @@ -109,10 +100,6 @@ func (s *Sender) retryTxWithPolicy(
ctx, tx.Nonce(), types.NewTxRequestFromTx(tx),
); err != nil {
s.logger.Error("failed to sign replacement transaction", "err", err)
continue
}

// Retry sending the transaction.
err = s.SendTransactionAndTrack(ctx, tx, msgIDs, timesFired, false)
}
}
5 changes: 3 additions & 2 deletions core/transactor/tracker/noncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@ func NewNoncer(sender common.Address, pendingNonceTimeout time.Duration) *Noncer
}
}

func (n *Noncer) SetClient(ethClient eth.Client) {
func (n *Noncer) Start(ctx context.Context, ethClient eth.Client) {
n.ethClient = ethClient
go n.refreshLoop(ctx)
}

func (n *Noncer) RefreshLoop(ctx context.Context) {
func (n *Noncer) refreshLoop(ctx context.Context) {
n.refreshNonces(ctx)

ticker := time.NewTicker(n.refreshInterval)
Expand Down
20 changes: 10 additions & 10 deletions core/transactor/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package tracker

import (
"context"
"sync"
"time"

"github.com/berachain/offchain-sdk/client/eth"
Expand All @@ -18,7 +19,7 @@ type Tracker struct {
staleTimeout time.Duration // for a tx receipt
inMempoolTimeout time.Duration // for hitting mempool
dispatcher *event.Dispatcher[*InFlightTx]
inFlightTxs map[string]struct{} // msgs that have been sent, but not confirmed
inFlightTxs sync.Map // msgs that have been sent, but not confirmed
ethClient eth.Client
}

Expand All @@ -32,32 +33,31 @@ func New(
staleTimeout: staleTimeout,
inMempoolTimeout: inMempoolTimeout,
dispatcher: dispatcher,
inFlightTxs: make(map[string]struct{}),
}
}

func (t *Tracker) SetClient(chain eth.Client) {
t.ethClient = chain
}

// If a msgID IsInFlight (true is returned), the preconfirmed state is "StateInFlight".
func (t *Tracker) IsInFlight(msgID string) bool {
_, ok := t.inFlightTxs.Load(msgID)
return ok
}

// Track adds a transaction to the in-flight list and waits for a status.
func (t *Tracker) Track(
ctx context.Context, tx *coretypes.Transaction, msgIDs []string, timesFired []time.Time,
) {
for _, msgID := range msgIDs {
t.inFlightTxs[msgID] = struct{}{}
t.inFlightTxs.Store(msgID, struct{}{})
}
inFlightTx := &InFlightTx{Transaction: tx, MsgIDs: msgIDs, TimesFired: timesFired}
t.noncer.SetInFlight(inFlightTx)
go t.trackStatus(ctx, inFlightTx)
}

// If a msgID IsInFlight (true is returned), the preconfirmed state is "StateInFlight".
func (t *Tracker) IsInFlight(msgID string) bool {
_, ok := t.inFlightTxs[msgID]
return ok
}

// trackStatus polls the for transaction status and updates the in-flight list.
func (t *Tracker) trackStatus(ctx context.Context, tx *InFlightTx) {
var (
Expand Down Expand Up @@ -168,7 +168,7 @@ func (t *Tracker) markStale(tx *InFlightTx, isPending bool) {
func (t *Tracker) dispatchTx(tx *InFlightTx) {
t.noncer.RemoveInFlight(tx)
for _, msgID := range tx.MsgIDs {
delete(t.inFlightTxs, msgID)
t.inFlightTxs.Delete(msgID)
}
t.dispatcher.Dispatch(tx)
}
56 changes: 30 additions & 26 deletions core/transactor/transactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"sync"
"time"

"github.com/berachain/offchain-sdk/client/eth"
"github.com/berachain/offchain-sdk/core/transactor/event"
"github.com/berachain/offchain-sdk/core/transactor/factory"
"github.com/berachain/offchain-sdk/core/transactor/sender"
Expand All @@ -28,7 +27,6 @@ type TxrV2 struct {
factory *factory.Factory
noncer *tracker.Noncer
dispatcher *event.Dispatcher[*tracker.InFlightTx]
chain eth.Client
logger log.Logger
mu sync.Mutex
}
Expand All @@ -50,7 +48,7 @@ func NewTransactor(
dispatcher: dispatcher,
cfg: cfg,
factory: factory,
sender: sender.New(factory, tracker),
sender: sender.New(factory),
tracker: tracker,
noncer: noncer,
requests: queue,
Expand All @@ -66,7 +64,7 @@ func (t *TxrV2) RegistryKey() string {
// TODO: deprecate off being a job.
func (t *TxrV2) Setup(ctx context.Context) error {
sCtx := sdk.UnwrapContext(ctx)
t.chain = sCtx.Chain()
chain := sCtx.Chain()
t.logger = sCtx.Logger()

// Register the transactor as a subscriber to the tracker.
Expand All @@ -78,12 +76,13 @@ func (t *TxrV2) Setup(ctx context.Context) error {
}()
t.dispatcher.Subscribe(ch)

// TODO: need lock on nonce to support more than one
t.noncer.SetClient(t.chain)
t.factory.SetClient(t.chain)
t.sender.Setup(t.chain, t.logger)
t.tracker.SetClient(t.chain)
t.Start(sCtx)
// Setup and start all the transactor components.
t.factory.SetClient(chain)
t.sender.Setup(chain, t.logger)
t.tracker.SetClient(chain)
t.noncer.Start(ctx, chain)
go t.mainLoop(ctx)

return nil
}

Expand All @@ -92,18 +91,18 @@ func (t *TxrV2) Setup(ctx context.Context) error {
func (t *TxrV2) Execute(_ context.Context, _ any) (any, error) {
acquired, inFlight := t.noncer.Stats()
t.logger.Info(
"🧠 system status", "waiting-tx", acquired, "in-flight-tx",
inFlight, "pending-requests", t.requests.Len(),
"🧠 system status",
"waiting-tx", acquired, "in-flight-tx", inFlight, "pending-requests", t.requests.Len(),
)
return nil, nil //nolint:nilnil // its okay.
}

// IntervalTime implements job.Polling.
func (t *TxrV2) IntervalTime(_ context.Context) time.Duration {
return 5 * time.Second //nolint:gomnd // TODO: read from config.
func (t *TxrV2) IntervalTime(context.Context) time.Duration {
return t.cfg.StatusUpdateInterval
}

// SubscribeTxResults sends the tx results (inflight) to the given channel.
// SubscribeTxResults sends the tx results, once confirmed, to the given subscriber.
func (t *TxrV2) SubscribeTxResults(ctx context.Context, subscriber tracker.Subscriber) {
ch := make(chan *tracker.InFlightTx)
go func() {
Expand Down Expand Up @@ -136,7 +135,6 @@ func (t *TxrV2) GetPreconfirmedState(msgID string) tracker.PreconfirmState {

// Start starts the transactor.
func (t *TxrV2) Start(ctx context.Context) {
go t.noncer.RefreshLoop(ctx)
go t.mainLoop(ctx)
}

Expand All @@ -157,21 +155,21 @@ func (t *TxrV2) mainLoop(ctx context.Context) {
continue
}

// We got a batch, so we send it and track it.
// We must first wait for the previous sending to finish.
// We got a batch, so we send it and track it. But first wait for the previous sending
// to finish.
t.mu.Lock()
go func() {
defer t.mu.Unlock()
if err := t.sendAndTrack(ctx, msgIDs, timesFired, batch...); err != nil {
t.logger.Error("failed to process batch", "msgs", msgIDs, "err", err)
}
t.mu.Unlock()
}()
}
}
}

// retrieveBatch retrieves a batch of transaction requests from the queue. It waits until 1) it
// hits the batch timeout or 2) max batch size only if waitBatchTimeout is false.
// hits the batch timeout or 2) tx batch size is reached only if waitFullBatchTimeout is false.
func (t *TxrV2) retrieveBatch() ([]string, []time.Time, []*types.TxRequest) {
var (
retMsgIDs []string
Expand All @@ -183,15 +181,18 @@ func (t *TxrV2) retrieveBatch() ([]string, []time.Time, []*types.TxRequest) {

// Loop until the batch tx timeout expires.
for ; timeRemaining > 0; timeRemaining = t.cfg.TxBatchTimeout - time.Since(startTime) {
txsRemaining := int32(t.cfg.TxBatchSize - len(batch))
if txsRemaining == 0 { // if we reached max batch size, we can break out of the loop.
if t.cfg.WaitBatchTimeout {
txsRemaining := t.cfg.TxBatchSize - len(batch)

// If we reached max batch size, we can break out of the loop.
if txsRemaining == 0 {
// Sleep for the time remaining if we want to wait for the full batch timeout.
if t.cfg.WaitFullBatchTimeout {
time.Sleep(timeRemaining)
}
break
}

msgIDs, txReq, times, err := t.requests.ReceiveMany(txsRemaining)
msgIDs, txReq, times, err := t.requests.ReceiveMany(int32(txsRemaining))
if err != nil {
t.logger.Error("failed to receive tx request", "err", err)
continue
Expand All @@ -216,11 +217,14 @@ func (t *TxrV2) sendAndTrack(
return err
}

// Send the transaction to the chain and track it async.
if err = t.sender.SendTransactionAndTrack(ctx, tx, msgIDs, timesFired, true); err != nil {
// Send the transaction to the chain.
if err = t.sender.SendTransaction(ctx, tx, msgIDs); err != nil {
return err
}

// Track the transaction status async.
t.tracker.Track(ctx, tx, msgIDs, timesFired)

t.logger.Debug("📡 sent transaction", "tx-hash", tx.Hash().Hex(), "tx-reqs", len(batch))
return nil
}
6 changes: 4 additions & 2 deletions types/queue/sqs/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"sync"
"time"

goutils "github.com/berachain/go-utils/utils"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sqs"
Expand Down Expand Up @@ -131,7 +133,7 @@ func (q *Queue[T]) Receive() (string, T, time.Time, bool) {
// TODO memory growth atm.
q.inProcess[*resp.Messages[0].MessageId] = *resp.Messages[0].ReceiptHandle

return *resp.Messages[0].MessageId, t, timeInserted.(time.Time), true
return *resp.Messages[0].MessageId, t, goutils.MustGetAs[time.Time](timeInserted), true
}

func (q *Queue[T]) ReceiveMany(num int32) ([]string, []T, []time.Time, error) {
Expand Down Expand Up @@ -178,7 +180,7 @@ func (q *Queue[T]) ReceiveMany(num int32) ([]string, []T, []time.Time, error) {

msgIDs[i] = *m.MessageId
ts[i] = t
timesInserted[i] = timeInserted.(time.Time) //nolint:errcheck // always time.Time type.
timesInserted[i] = goutils.MustGetAs[time.Time](timeInserted)
}

return msgIDs, ts, timesInserted, nil
Expand Down

0 comments on commit 44a1499

Please sign in to comment.