Skip to content

Commit

Permalink
wip: Add SubscribeTransactionStatus
Browse files Browse the repository at this point in the history
  • Loading branch information
IronGauntlets committed Dec 27, 2024
1 parent 20c0587 commit 4b94556
Show file tree
Hide file tree
Showing 3 changed files with 208 additions and 84 deletions.
232 changes: 178 additions & 54 deletions rpc/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package rpc
import (
"context"
"encoding/json"
"errors"
"fmt"

"github.com/NethermindEth/juno/blockchain"
"github.com/NethermindEth/juno/core"
Expand Down Expand Up @@ -38,6 +40,10 @@ func (h *Handler) SubscribeEvents(ctx context.Context, fromAddr *felt.Felt, keys
return nil, ErrTooManyKeysInFilter
}

if blockID != nil && blockID.Pending {
return nil, ErrCallOnPending
}

requestedHeader, headHeader, rpcErr := h.resolveBlockRange(blockID)
if rpcErr != nil {
return nil, rpcErr
Expand Down Expand Up @@ -93,82 +99,204 @@ func (h *Handler) SubscribeEvents(ctx context.Context, fromAddr *felt.Felt, keys
// SubscribeTxnStatus subscribes to status changes of a transaction. It checks for updates each time a new block is added.
// Subsequent updates are sent only when the transaction status changes.
// The optional block_id parameter is ignored, as status changes are not stored and historical data cannot be sent.
func (h *Handler) SubscribeTxnStatus(ctx context.Context, txHash felt.Felt, _ *BlockID) (*SubscriptionID, *jsonrpc.Error) {
var (
lastKnownStatus, lastSendStatus *TransactionStatus
wrapResult = func(s *TransactionStatus) *NewTransactionStatus {
return &NewTransactionStatus{
TransactionHash: &txHash,
Status: s,
}
}
)

//
//nolint:funlen,gocyclo
func (h *Handler) SubscribeTxnStatus(ctx context.Context, txHash felt.Felt, blockID *BlockID) (*SubscriptionID,
*jsonrpc.Error,
) {
w, ok := jsonrpc.ConnFromContext(ctx)
if !ok {
return nil, jsonrpc.Err(jsonrpc.MethodNotFound, nil)
}

// resolveBlockRange is only used to make sure that the requested block id is not older than 1024 block and check
// if the requested block is found. The range is inconsequential since we assume the provided transaction hash
// of a transaction is included in the block range: latest/pending - 1024.
_, _, rpcErr := h.resolveBlockRange(blockID)
if rpcErr != nil {
return nil, rpcErr
}

fmt.Println("Codeunder test ------ Inside subscribeTxnStatus")

id := h.idgen()
subscriptionCtx, subscriptionCtxCancel := context.WithCancel(ctx)
sub := &subscription{
cancel: subscriptionCtxCancel,
conn: w,
}

lastKnownStatus, rpcErr := h.TransactionStatus(subscriptionCtx, txHash)
if rpcErr != nil {
h.log.Errorw("Failed to get Tx status", "txHash", &txHash, "rpcErr", rpcErr)
return nil, rpcErr
}

h.mu.Lock()
h.subscriptions[id] = sub
h.mu.Unlock()

headerSub := h.newHeads.Subscribe()
l2HeadSub := h.newHeads.Subscribe()
l1HeadSub := h.l1Heads.Subscribe()
reorgSub := h.reorgs.Subscribe()

sub.wg.Go(func() {
fmt.Println("Codeunder test ------ inside big go routine")
defer func() {
h.unsubscribe(sub, id)
headerSub.Unsubscribe()
l2HeadSub.Unsubscribe()
l1HeadSub.Unsubscribe()
reorgSub.Unsubscribe()
}()

if err := h.sendTxnStatus(sub.conn, wrapResult(lastKnownStatus), id); err != nil {
h.log.Errorw("Error while sending Txn status", "txHash", txHash, "err", err)
return
}
lastSendStatus = lastKnownStatus
var wg conc.WaitGroup
receipt, rpcErr := h.TransactionReceiptByHash(txHash)

// Check if the requested transaction is already final.
// A transaction is considered to be final if it has been rejected or accepted on l1
if rpcErr == nil {
if receipt.FinalityStatus == TxnAcceptedOnL1 {
s := &TransactionStatus{
Finality: TxnStatus(receipt.FinalityStatus),
Execution: receipt.ExecutionStatus,
FailureReason: receipt.RevertReason,
}

for {
select {
case <-subscriptionCtx.Done():
return
case <-headerSub.Recv():
lastKnownStatus, rpcErr = h.TransactionStatus(subscriptionCtx, txHash)
if rpcErr != nil {
h.log.Errorw("Failed to get Tx status", "txHash", txHash, "rpcErr", rpcErr)
return
err := h.sendTxnStatus(w, SubscriptionTransactionStatus{&txHash, *s}, id)
if err != nil {
h.log.Errorw("Error while sending Txn status", "txHash", txHash, "err", err)
}
return
}
} else if rpcErr == ErrTxnHashNotFound {
s, err := h.fetchTxnStatusFromFeeder(subscriptionCtx, txHash)
if err != nil {
h.log.Errorw("Error while fetching Txn status from feeder", "txHash", txHash, "err", err)
return
}

if *lastKnownStatus != *lastSendStatus {
if err := h.sendTxnStatus(sub.conn, wrapResult(lastKnownStatus), id); err != nil {
h.log.Errorw("Error while sending Txn status", "txHash", txHash, "err", err)
return
}
lastSendStatus = lastKnownStatus
if s.Finality == TxnStatusRejected {
err := h.sendTxnStatus(w, SubscriptionTransactionStatus{&txHash, *s}, id)
if err != nil {
h.log.Errorw("Error while sending Txn status", "txHash", txHash, "err", err)
}
return
}
} else {
h.log.Errorw("Unexpected error occurred while fetching transaction receipt", "txHash", txHash, "err",
rpcErr.Message)
fmt.Println("Codeunder test ------ Returing because of an error")
return
}

// Stop when final status reached and notified
if isFinal(lastSendStatus) {
fmt.Println("Codeunder test ------ After initial checks")
// At this point, the transaction has not reached finality.
var curStatus *TransactionStatus
wg.Go(func() {
fmt.Println("Codeunder test ------ Inside long running go routine")
for {
select {
case <-subscriptionCtx.Done():
fmt.Println("Codeunder test ------ Returing after context is done")
return
case <-l2HeadSub.Recv():
// A new block has been added to the DB, hence, check if transaction has reached l2 finality,
// if not, check feeder.
// We could use a separate timer to periodically check for the transaction status at feeder
// gateway, however, for the time being new l2 head update is sufficient.
fmt.Println("Codeunder test ------ if condigion check", curStatus == nil || curStatus.
Finality < TxnStatusAcceptedOnL2)
if curStatus == nil || curStatus.Finality < TxnStatusAcceptedOnL2 {
fmt.Println("Codeunder test ------ Inside l2Head if statement")
receipt, rpcErr := h.TransactionReceiptByHash(txHash)
if rpcErr == nil {
fmt.Println("Codeunder test ------ Inside rpcErr == nil")
fmt.Println("Codeunder test ------ Is curStatus nil", curStatus == nil, txHash.String())
curStatus = &TransactionStatus{
Finality: TxnStatus(receipt.FinalityStatus),
Execution: receipt.ExecutionStatus,
FailureReason: receipt.RevertReason,
}

fmt.Println(curStatus.Finality)

err := h.sendTxnStatus(w, SubscriptionTransactionStatus{&txHash, *curStatus}, id)
if err != nil {
h.log.Errorw("Error while sending Txn status", "txHash", txHash, "err", err)
return
}
if curStatus.Finality == TxnStatusAcceptedOnL1 {
return
}
fmt.Println("Codeunder test ------ about to exit rpcErr == nil")
} else if rpcErr == ErrTxnHashNotFound {
feederTxStatus, err := h.fetchTxnStatusFromFeeder(subscriptionCtx, txHash)
if err != nil {
h.log.Errorw("Error while fetching Txn status from feeder", "txHash", txHash, "err", err)
return
}

if feederTxStatus.Finality == TxnStatusRejected {
err := h.sendTxnStatus(w, SubscriptionTransactionStatus{&txHash, *feederTxStatus}, id)
if err != nil {
h.log.Errorw("Error while sending Txn status", "txHash", txHash, "err", err)
}
return
}

if feederTxStatus.Finality > curStatus.Finality {
curStatus = feederTxStatus

err := h.sendTxnStatus(w, SubscriptionTransactionStatus{&txHash, *curStatus}, id)
if err != nil {
h.log.Errorw("Error while sending Txn status", "txHash", txHash, "err", err)
return
}
}
}
}
case <-l1HeadSub.Recv():
fmt.Println("Codeunder test ------ Returing after context is done")
receipt, rpcErr := h.TransactionReceiptByHash(txHash)
if rpcErr == nil && receipt.FinalityStatus == TxnAcceptedOnL1 {
s := &TransactionStatus{
Finality: TxnStatus(receipt.FinalityStatus),
Execution: receipt.ExecutionStatus,
FailureReason: receipt.RevertReason,
}

err := h.sendTxnStatus(w, SubscriptionTransactionStatus{&txHash, *s}, id)
if err != nil {
h.log.Errorw("Error while sending Txn status", "txHash", txHash, "err", err)
}
return
}
}
}
}
})

wg.Go(func() {
h.processReorgs(subscriptionCtx, reorgSub, w, id)
})
})

fmt.Println("Codeunder test ------ Just before returning")

return &SubscriptionID{ID: id}, nil
}

func (h *Handler) fetchTxnStatusFromFeeder(ctx context.Context, txHash felt.Felt) (*TransactionStatus, error) {
if h.feederClient == nil {
return nil, errors.New("feedClient is nil")
}

txStatus, err := h.feederClient.Transaction(ctx, &txHash)
if err != nil {
return nil, err
}

status, err := adaptTransactionStatus(txStatus)
if err != nil {
return nil, err
}

return status, nil
}

func (h *Handler) processEvents(ctx context.Context, w jsonrpc.Conn, id, from, to uint64, fromAddr *felt.Felt, keys [][]felt.Felt) {
filter, err := h.bcReader.EventFilter(fromAddr, keys)
if err != nil {
Expand Down Expand Up @@ -255,6 +383,10 @@ func (h *Handler) SubscribeNewHeads(ctx context.Context, blockID *BlockID) (*Sub
return nil, jsonrpc.Err(jsonrpc.MethodNotFound, nil)
}

if blockID != nil && blockID.Pending {
return nil, ErrCallOnPending
}

startHeader, latestHeader, rpcErr := h.resolveBlockRange(blockID)
if rpcErr != nil {
return nil, rpcErr
Expand Down Expand Up @@ -443,10 +575,6 @@ func (h *Handler) resolveBlockRange(blockID *BlockID) (*core.Header, *core.Heade
return latestHeader, latestHeader, nil
}

if blockID.Pending {
return nil, nil, ErrCallOnPending
}

startHeader, rpcErr := h.blockHeaderByID(blockID)
if rpcErr != nil {
return nil, nil, rpcErr
Expand Down Expand Up @@ -581,13 +709,13 @@ func (h *Handler) Unsubscribe(ctx context.Context, id uint64) (bool, *jsonrpc.Er
return true, nil
}

type NewTransactionStatus struct {
TransactionHash *felt.Felt `json:"transaction_hash"`
Status *TransactionStatus `json:"status"`
type SubscriptionTransactionStatus struct {
TransactionHash *felt.Felt `json:"transaction_hash"`
Status TransactionStatus `json:"status"`
}

// sendTxnStatus creates a response and sends it to the client
func (h *Handler) sendTxnStatus(w jsonrpc.Conn, status *NewTransactionStatus, id uint64) error {
func (h *Handler) sendTxnStatus(w jsonrpc.Conn, status SubscriptionTransactionStatus, id uint64) error {
resp, err := json.Marshal(SubscriptionResponse{
Version: "2.0",
Method: "starknet_subscriptionTransactionsStatus",
Expand All @@ -603,7 +731,3 @@ func (h *Handler) sendTxnStatus(w jsonrpc.Conn, status *NewTransactionStatus, id
_, err = w.Write(resp)
return err
}

func isFinal(status *TransactionStatus) bool {
return status.Finality == TxnStatusRejected || status.Finality == TxnStatusAcceptedOnL1
}
Loading

0 comments on commit 4b94556

Please sign in to comment.