Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(transactor): Allow tracking times of fired messages #77

Merged
merged 2 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions core/transactor/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func New(factory Factory, tracker Tracker) *Sender {
func (s *Sender) Setup(chain eth.Client, logger log.Logger) {
s.chain = chain
s.logger = logger
s.tracker.SetClient(chain)
}

// If a msgID IsSending (true is returned), the preconfirmed state is "StateSending".
Expand All @@ -51,15 +50,16 @@ func (s *Sender) IsSending(msgID string) bool {
// it retries based on the retry policy, only once (further retries will not retry again). If
// sending is successful, it uses the tracker to track the transaction.
func (s *Sender) SendTransactionAndTrack(
ctx context.Context, tx *coretypes.Transaction, msgIDs []string, shouldRetry bool,
ctx context.Context, tx *coretypes.Transaction,
msgIDs []string, timesFired []time.Time, shouldRetry bool,
) error {
// Try sending the transaction.
for _, msgID := range msgIDs {
s.sendingTxs[msgID] = struct{}{}
}
if err := s.chain.SendTransaction(ctx, tx); err != nil {
if shouldRetry { // If sending the transaction fails, retry according to the retry policy.
go s.retryTxWithPolicy(ctx, tx, msgIDs, err)
go s.retryTxWithPolicy(ctx, tx, msgIDs, timesFired, err)
}
return err
}
Expand All @@ -68,15 +68,16 @@ func (s *Sender) SendTransactionAndTrack(
for _, msgID := range msgIDs {
delete(s.sendingTxs, msgID)
}
s.tracker.Track(ctx, tx, msgIDs)
s.tracker.Track(ctx, tx, msgIDs, timesFired)
return nil
}

// retryTxWithPolicy retries sending tx according to the retry policy. Specifically handles two
// common errors on sending a transaction (NonceTooLow, ReplaceUnderpriced) by replacing the tx
// appropriately.
func (s *Sender) retryTxWithPolicy(
ctx context.Context, tx *coretypes.Transaction, msgIDs []string, err error,
ctx context.Context, tx *coretypes.Transaction,
msgIDs []string, timesFired []time.Time, err error,
) {
for {
// Check the policy to see if we should retry this transaction.
Expand Down Expand Up @@ -112,6 +113,6 @@ func (s *Sender) retryTxWithPolicy(
}

// Retry sending the transaction.
err = s.SendTransactionAndTrack(ctx, tx, msgIDs, false)
err = s.SendTransactionAndTrack(ctx, tx, msgIDs, timesFired, false)
}
}
12 changes: 5 additions & 7 deletions core/transactor/sender/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"time"

"github.com/berachain/offchain-sdk/client/eth"
"github.com/berachain/offchain-sdk/core/transactor/types"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -19,22 +18,21 @@ type (

// Tracker is an interface for tracking sent transactions.
Tracker interface {
SetClient(chain eth.Client)
Track(ctx context.Context, tx *coretypes.Transaction, msgIDs []string)
Track(context.Context, *coretypes.Transaction, []string, []time.Time)
}

// Factory is an interface for signing transactions.
Factory interface {
BuildTransactionFromRequests(
ctx context.Context, forcedNonce uint64, txReqs ...*types.TxRequest,
context.Context, uint64, ...*types.TxRequest,
) (*coretypes.Transaction, error)
GetNextNonce(oldNonce uint64) (uint64, bool)
GetNextNonce(uint64) (uint64, bool)
}

// A RetryPolicy is used to determine if a transaction should be retried and how long to wait
// before retrying again.
RetryPolicy interface {
Get(tx *coretypes.Transaction, err error) (bool, time.Duration)
UpdateTxModified(oldTx, newTx common.Hash)
Get(*coretypes.Transaction, error) (bool, time.Duration)
UpdateTxModified(common.Hash, common.Hash)
}
)
4 changes: 3 additions & 1 deletion core/transactor/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ func (t *TxrV2) OnStale(
"nonce", inFlightTx.Nonce(), "gas-price", inFlightTx.GasPrice(),
)

return t.sendAndTrack(ctx, inFlightTx.MsgIDs, types.NewTxRequestFromTx(inFlightTx))
return t.sendAndTrack(
ctx, inFlightTx.MsgIDs, inFlightTx.TimesFired, types.NewTxRequestFromTx(inFlightTx),
)
}

func (t *TxrV2) OnError(_ context.Context, tx *tracker.InFlightTx, _ error) {
Expand Down
4 changes: 2 additions & 2 deletions core/transactor/tracker/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ func (sub *Subscription) Start(ctx context.Context, ch chan *InFlightTx) error {
switch e.Status() {
case StatusSuccess:
// If the transaction was successful, call OnSuccess.
if err = sub.OnSuccess(e, e.Receipt); err != nil {
if err = sub.OnSuccess(e, e.receipt); err != nil {
sub.logger.Error("failed to handle successful tx", "err", err)
}
case StatusReverted:
// If the transaction was reverted, call OnRevert.
if err = sub.OnRevert(e, e.Receipt); err != nil {
if err = sub.OnRevert(e, e.receipt); err != nil {
sub.logger.Error("failed to handle reverted tx", "err", err)
}
case StatusStale:
Expand Down
8 changes: 5 additions & 3 deletions core/transactor/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ func (t *Tracker) SetClient(chain eth.Client) {
}

// Track adds a transaction to the in-flight list and waits for a status.
func (t *Tracker) Track(ctx context.Context, tx *coretypes.Transaction, msgIDs []string) {
func (t *Tracker) Track(
ctx context.Context, tx *coretypes.Transaction, msgIDs []string, timesFired []time.Time,
) {
for _, msgID := range msgIDs {
t.inFlightTxs[msgID] = struct{}{}
}
inFlightTx := &InFlightTx{Transaction: tx, MsgIDs: msgIDs}
inFlightTx := &InFlightTx{Transaction: tx, MsgIDs: msgIDs, TimesFired: timesFired}
t.noncer.SetInFlight(inFlightTx)
go t.trackStatus(ctx, inFlightTx)
}
Expand Down Expand Up @@ -152,7 +154,7 @@ func (t *Tracker) markConfirmed(tx *InFlightTx, receipt *coretypes.Receipt) {
receipt.ContractAddress = *contractAddr
}

tx.Receipt = receipt
tx.receipt = receipt
t.dispatchTx(tx)
}

Expand Down
12 changes: 8 additions & 4 deletions core/transactor/tracker/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tracker
import (
"context"
"strings"
"time"

coretypes "github.com/ethereum/go-ethereum/core/types"
)
Expand Down Expand Up @@ -41,8 +42,11 @@ type Subscriber interface {
// InFlightTx represents a transaction that is currently being tracked by the transactor.
type InFlightTx struct {
*coretypes.Transaction
MsgIDs []string
Receipt *coretypes.Receipt

MsgIDs []string // Message IDs that were included in the transaction.
TimesFired []time.Time // Times each message was initially fired.

receipt *coretypes.Receipt
isStale bool
}

Expand All @@ -53,14 +57,14 @@ func (t *InFlightTx) ID() string {

// Status returns the current status of a transaction owned by the transactor.
func (t *InFlightTx) Status() Status {
if t.Receipt == nil {
if t.receipt == nil {
if t.isStale {
return StatusStale
}
return StatusPending
}

if t.Receipt.Status == 1 {
if t.receipt.Status == 1 {
return StatusSuccess
}

Expand Down
21 changes: 12 additions & 9 deletions core/transactor/transactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (t *TxrV2) Setup(ctx context.Context) error {
t.noncer.SetClient(t.chain)
t.factory.SetClient(t.chain)
t.sender.Setup(t.chain, t.logger)
t.tracker.SetClient(t.chain)
t.Start(sCtx)
return nil
}
Expand Down Expand Up @@ -147,7 +148,7 @@ func (t *TxrV2) mainLoop(ctx context.Context) {
return
default:
// Attempt the retrieve a batch from the queue.
msgIDs, batch := t.retrieveBatch()
msgIDs, timesFired, batch := t.retrieveBatch()

// We didn't get any transactions, so we wait for more.
if len(batch) == 0 {
Expand All @@ -161,7 +162,7 @@ func (t *TxrV2) mainLoop(ctx context.Context) {
t.mu.Lock()
go func() {
defer t.mu.Unlock()
if err := t.sendAndTrack(ctx, msgIDs, batch...); err != nil {
if err := t.sendAndTrack(ctx, msgIDs, timesFired, batch...); err != nil {
t.logger.Error("failed to process batch", "msgs", msgIDs, "err", err)
}
}()
Expand All @@ -171,37 +172,39 @@ func (t *TxrV2) mainLoop(ctx context.Context) {

// retrieveBatch retrieves a batch of transaction requests from the queue.
// It waits until it hits the max batch size or the timeout.
func (t *TxrV2) retrieveBatch() ([]string, []*types.TxRequest) {
var batch []*types.TxRequest
func (t *TxrV2) retrieveBatch() ([]string, []time.Time, []*types.TxRequest) {
var retMsgIDs []string
var timesFired []time.Time
var batch []*types.TxRequest
startTime := time.Now()

// Retrieve the delta between the max total batch size.
for len(batch) < t.cfg.TxBatchSize && time.Since(startTime) < t.cfg.TxBatchTimeout {
msgIDs, txReq, err := t.requests.ReceiveMany(int32(t.cfg.TxBatchSize - len(batch)))
msgIDs, txReq, times, err := t.requests.ReceiveMany(int32(t.cfg.TxBatchSize - len(batch)))
if err != nil {
t.logger.Error("failed to receive tx request", "err", err)
continue
}
batch = append(batch, txReq...)
retMsgIDs = append(retMsgIDs, msgIDs...)
timesFired = append(timesFired, times...)
batch = append(batch, txReq...)
}
return retMsgIDs, batch
return retMsgIDs, timesFired, batch
}

// sendAndTrack processes a batch of transaction requests.
// It builds a transaction from the batch and sends it.
// It also tracks the transaction for future reference.
func (t *TxrV2) sendAndTrack(
ctx context.Context, msgIDs []string, batch ...*types.TxRequest,
ctx context.Context, msgIDs []string, timesFired []time.Time, batch ...*types.TxRequest,
) error {
tx, err := t.factory.BuildTransactionFromRequests(ctx, 0, batch...)
if err != nil {
return err
}

// Send the transaction to the chain and track it async.
if err = t.sender.SendTransactionAndTrack(ctx, tx, msgIDs, true); err != nil {
if err = t.sender.SendTransactionAndTrack(ctx, tx, msgIDs, timesFired, true); err != nil {
return err
}

Expand Down
37 changes: 21 additions & 16 deletions types/queue/mem/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,29 @@ package mem
import (
"container/list"
"sync"
"time"

"github.com/berachain/go-utils/utils"
"github.com/berachain/offchain-sdk/types/queue/types"
)

// Queue is a thread-safe FIFO queue implementation.
type Queue[T types.Marshallable] struct {
mu sync.RWMutex
queuedItems *list.List
msgs map[string]struct{}
mu sync.RWMutex
queuedItems *list.List
timesInserted map[string]time.Time
}

// NewQueue creates a new Queue instance.
func NewQueue[T types.Marshallable]() *Queue[T] {
return &Queue[T]{
queuedItems: list.New(),
msgs: make(map[string]struct{}),
queuedItems: list.New(),
timesInserted: make(map[string]time.Time),
}
}

func (q *Queue[T]) InQueue(messageID string) bool {
_, ok := q.msgs[messageID]
_, ok := q.timesInserted[messageID]
return ok
}

Expand All @@ -34,37 +35,39 @@ func (q *Queue[T]) Push(val T) (string, error) {
defer q.mu.Unlock()

q.queuedItems.PushBack(val)
q.msgs[val.String()] = struct{}{}
q.timesInserted[val.String()] = time.Now()

return val.String(), nil
}

// Pop returns the value at the front of the queue without removing it.
// The second return value indicates if the operation succeeded.
func (q *Queue[T]) Receive() (string, T, bool) {
func (q *Queue[T]) Receive() (string, T, time.Time, bool) {
q.mu.Lock()
defer q.mu.Unlock()

element := q.queuedItems.Front()
if element == nil {
return "", zeroValueOf[T](), false
return "", zeroValueOf[T](), time.Time{}, false
}

q.queuedItems.Remove(element)
val := utils.MustGetAs[T](element.Value)
msgID := val.String()
delete(q.msgs, msgID)
timeInserted := q.timesInserted[msgID]
delete(q.timesInserted, msgID)

return msgID, val, true
return msgID, val, timeInserted, true
}

func (q *Queue[T]) ReceiveMany(num int32) ([]string, []T, error) {
func (q *Queue[T]) ReceiveMany(num int32) ([]string, []T, []time.Time, error) {
q.mu.Lock()
defer q.mu.Unlock()

var (
txRequests []T
msgIDs []string
msgIDs []string
txRequests []T
timesInserted []time.Time
)
for i := int32(0); i < num; i++ {
element := q.queuedItems.Front()
Expand All @@ -74,11 +77,13 @@ func (q *Queue[T]) ReceiveMany(num int32) ([]string, []T, error) {
q.queuedItems.Remove(element)
val := utils.MustGetAs[T](element.Value)
msgID := val.String()
delete(q.msgs, msgID)
timeInserted := q.timesInserted[msgID]
delete(q.timesInserted, msgID)
msgIDs = append(msgIDs, msgID)
txRequests = append(txRequests, val)
timesInserted = append(timesInserted, timeInserted)
}
return msgIDs, txRequests, nil
return msgIDs, txRequests, timesInserted, nil
}

// Delete is no-op for the in-memory queue (already deleted by receiving).
Expand Down
Loading
Loading