From 3aa7229dd3bc9ad2d552dcb9077f1a054adffb81 Mon Sep 17 00:00:00 2001 From: Hugo Demeyere Date: Mon, 17 Feb 2025 13:47:10 +0100 Subject: [PATCH] Refactor: Reuse v6 juno_unsubscribe handler for v7 and v8 --- rpc/handlers.go | 4 +- rpc/v6/events_test.go | 164 ++++++++++++++++++++++++++++++----- rpc/v7/events.go | 22 ----- rpc/v8/handlers.go | 5 -- rpc/v8/subscriptions.go | 21 ----- rpc/v8/subscriptions_test.go | 101 --------------------- 6 files changed, 145 insertions(+), 172 deletions(-) diff --git a/rpc/handlers.go b/rpc/handlers.go index 495d5a126f..2f7113acac 100644 --- a/rpc/handlers.go +++ b/rpc/handlers.go @@ -251,7 +251,7 @@ func (h *Handler) MethodsV0_8() ([]jsonrpc.Method, string) { //nolint: funlen { Name: "starknet_unsubscribe", Params: []jsonrpc.Parameter{{Name: "subscription_id"}}, - Handler: h.rpcv8Handler.Unsubscribe, + Handler: h.rpcv6Handler.Unsubscribe, }, { Name: "starknet_getBlockWithReceipts", @@ -434,7 +434,7 @@ func (h *Handler) MethodsV0_7() ([]jsonrpc.Method, string) { //nolint: funlen { Name: "juno_unsubscribe", Params: []jsonrpc.Parameter{{Name: "id"}}, - Handler: h.rpcv7Handler.Unsubscribe, + Handler: h.rpcv6Handler.Unsubscribe, }, }, "/v0_7" } diff --git a/rpc/v6/events_test.go b/rpc/v6/events_test.go index 921657cee5..ac6f9398c6 100644 --- a/rpc/v6/events_test.go +++ b/rpc/v6/events_test.go @@ -1,7 +1,9 @@ -package rpcv6_test +package rpcv6 import ( "context" + "io" + "net" "testing" "github.com/NethermindEth/juno/blockchain" @@ -9,14 +11,32 @@ import ( "github.com/NethermindEth/juno/core" "github.com/NethermindEth/juno/core/felt" "github.com/NethermindEth/juno/db/pebble" + "github.com/NethermindEth/juno/jsonrpc" + "github.com/NethermindEth/juno/mocks" rpccore "github.com/NethermindEth/juno/rpc/rpccore" - rpc "github.com/NethermindEth/juno/rpc/v6" adaptfeeder "github.com/NethermindEth/juno/starknetdata/feeder" "github.com/NethermindEth/juno/utils" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" ) +type fakeConn struct { + w io.Writer +} + +func (fc *fakeConn) Write(p []byte) (int, error) { + return fc.w.Write(p) +} + +func (fc *fakeConn) Equal(other jsonrpc.Conn) bool { + fc2, ok := other.(*fakeConn) + if !ok { + return false + } + return fc.w == fc2.w +} + func TestEvents(t *testing.T) { var pendingB *core.Block pendingBlockFn := func() *core.Block { @@ -44,16 +64,16 @@ func TestEvents(t *testing.T) { } } - handler := rpc.New(chain, nil, nil, "", n, utils.NewNopZapLogger()) + handler := New(chain, nil, nil, "", n, utils.NewNopZapLogger()) from := utils.HexToFelt(t, "0x49d36570d4e46f48e99674bd3fcc84644ddd6b96f7c741b1562b82f9e004dc7") - args := rpc.EventsArg{ - EventFilter: rpc.EventFilter{ - FromBlock: &rpc.BlockID{Number: 0}, - ToBlock: &rpc.BlockID{Latest: true}, + args := EventsArg{ + EventFilter: EventFilter{ + FromBlock: &BlockID{Number: 0}, + ToBlock: &BlockID{Latest: true}, Address: from, Keys: [][]felt.Felt{}, }, - ResultPageRequest: rpc.ResultPageRequest{ + ResultPageRequest: ResultPageRequest{ ChunkSize: 100, ContinuationToken: "", }, @@ -61,14 +81,14 @@ func TestEvents(t *testing.T) { t.Run("filter non-existent", func(t *testing.T) { t.Run("block number", func(t *testing.T) { - args.ToBlock = &rpc.BlockID{Number: 55} + args.ToBlock = &BlockID{Number: 55} events, err := handler.Events(args) require.Nil(t, err) require.Len(t, events.Events, 5) }) t.Run("block hash", func(t *testing.T) { - args.ToBlock = &rpc.BlockID{Hash: new(felt.Felt).SetUint64(55)} + args.ToBlock = &BlockID{Hash: new(felt.Felt).SetUint64(55)} _, err := handler.Events(args) require.Equal(t, rpccore.ErrBlockNotFound, err) }) @@ -76,29 +96,29 @@ func TestEvents(t *testing.T) { t.Run("filter with no from_block", func(t *testing.T) { args.FromBlock = nil - args.ToBlock = &rpc.BlockID{Latest: true} + args.ToBlock = &BlockID{Latest: true} _, err := handler.Events(args) require.Nil(t, err) }) t.Run("filter with no to_block", func(t *testing.T) { - args.FromBlock = &rpc.BlockID{Number: 0} + args.FromBlock = &BlockID{Number: 0} args.ToBlock = nil _, err := handler.Events(args) require.Nil(t, err) }) t.Run("filter with no address", func(t *testing.T) { - args.ToBlock = &rpc.BlockID{Latest: true} + args.ToBlock = &BlockID{Latest: true} args.Address = nil _, err := handler.Events(args) require.Nil(t, err) }) t.Run("filter with no keys", func(t *testing.T) { - var allEvents []*rpc.EmittedEvent + var allEvents []*EmittedEvent t.Run("get canonical events without pagination", func(t *testing.T) { - args.ToBlock = &rpc.BlockID{Latest: true} + args.ToBlock = &BlockID{Latest: true} args.Address = from events, err := handler.Events(args) require.Nil(t, err) @@ -108,7 +128,7 @@ func TestEvents(t *testing.T) { }) t.Run("accumulate events with pagination", func(t *testing.T) { - var accEvents []*rpc.EmittedEvent + var accEvents []*EmittedEvent args.ChunkSize = 1 for range len(allEvents) + 1 { @@ -186,12 +206,12 @@ func TestEvents(t *testing.T) { }) t.Run("get pending events without pagination", func(t *testing.T) { - args = rpc.EventsArg{ - EventFilter: rpc.EventFilter{ - FromBlock: &rpc.BlockID{Pending: true}, - ToBlock: &rpc.BlockID{Pending: true}, + args = EventsArg{ + EventFilter: EventFilter{ + FromBlock: &BlockID{Pending: true}, + ToBlock: &BlockID{Pending: true}, }, - ResultPageRequest: rpc.ResultPageRequest{ + ResultPageRequest: ResultPageRequest{ ChunkSize: 100, ContinuationToken: "", }, @@ -206,3 +226,105 @@ func TestEvents(t *testing.T) { assert.Equal(t, utils.HexToFelt(t, "0x785c2ada3f53fbc66078d47715c27718f92e6e48b96372b36e5197de69b82b5"), events.Events[0].TransactionHash) }) } + +func TestUnsubscribe(t *testing.T) { + log := utils.NewNopZapLogger() + n := utils.Ptr(utils.Sepolia) + + t.Run("error when no connection in context", func(t *testing.T) { + mockCtrl := gomock.NewController(t) + t.Cleanup(mockCtrl.Finish) + + mockChain := mocks.NewMockReader(mockCtrl) + mockSyncer := mocks.NewMockSyncReader(mockCtrl) + handler := New(mockChain, mockSyncer, nil, "", n, log) + + success, rpcErr := handler.Unsubscribe(context.Background(), 1) + assert.False(t, success) + assert.Equal(t, jsonrpc.Err(jsonrpc.MethodNotFound, nil), rpcErr) + }) + + t.Run("error when subscription ID doesn't exist", func(t *testing.T) { + mockCtrl := gomock.NewController(t) + t.Cleanup(mockCtrl.Finish) + + mockChain := mocks.NewMockReader(mockCtrl) + mockSyncer := mocks.NewMockSyncReader(mockCtrl) + handler := New(mockChain, mockSyncer, nil, "", n, log) + + serverConn, _ := net.Pipe() + t.Cleanup(func() { + require.NoError(t, serverConn.Close()) + }) + + ctx := context.WithValue(context.Background(), jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) + success, rpcErr := handler.Unsubscribe(ctx, 999) + assert.False(t, success) + assert.Equal(t, rpccore.ErrInvalidSubscriptionID, rpcErr) + }) + + t.Run("return false when connection doesn't match", func(t *testing.T) { + mockCtrl := gomock.NewController(t) + t.Cleanup(mockCtrl.Finish) + + mockChain := mocks.NewMockReader(mockCtrl) + mockSyncer := mocks.NewMockSyncReader(mockCtrl) + handler := New(mockChain, mockSyncer, nil, "", n, log) + + // Create original subscription + serverConn1, _ := net.Pipe() + t.Cleanup(func() { + require.NoError(t, serverConn1.Close()) + }) + + subCtx := context.WithValue(context.Background(), jsonrpc.ConnKey{}, &fakeConn{w: serverConn1}) + _, subscriptionCtxCancel := context.WithCancel(subCtx) + sub := &subscription{ + cancel: subscriptionCtxCancel, + conn: &fakeConn{w: serverConn1}, + } + handler.subscriptions.Store(uint64(1), sub) + + // Try to unsubscribe with different connection + serverConn2, _ := net.Pipe() + t.Cleanup(func() { + require.NoError(t, serverConn2.Close()) + }) + + unsubCtx := context.WithValue(context.Background(), jsonrpc.ConnKey{}, &fakeConn{w: serverConn2}) + success, rpcErr := handler.Unsubscribe(unsubCtx, 1) + assert.False(t, success) + assert.NotNil(t, rpcErr) + }) + + t.Run("successful unsubscribe", func(t *testing.T) { + mockCtrl := gomock.NewController(t) + t.Cleanup(mockCtrl.Finish) + + mockChain := mocks.NewMockReader(mockCtrl) + mockSyncer := mocks.NewMockSyncReader(mockCtrl) + handler := New(mockChain, mockSyncer, nil, "", n, log) + + serverConn, _ := net.Pipe() + t.Cleanup(func() { + require.NoError(t, serverConn.Close()) + }) + + conn := &fakeConn{w: serverConn} + subCtx := context.WithValue(context.Background(), jsonrpc.ConnKey{}, conn) + _, subscriptionCtxCancel := context.WithCancel(subCtx) + sub := &subscription{ + cancel: subscriptionCtxCancel, + conn: conn, + } + handler.subscriptions.Store(uint64(1), sub) + + success, rpcErr := handler.Unsubscribe(subCtx, 1) + assert.True(t, success) + assert.Nil(t, rpcErr) + + // Verify subscription was removed + _, exists := handler.subscriptions.Load(uint64(1)) + assert.False(t, exists) + }) +} diff --git a/rpc/v7/events.go b/rpc/v7/events.go index caf1beaa20..ad56b64737 100644 --- a/rpc/v7/events.go +++ b/rpc/v7/events.go @@ -6,7 +6,6 @@ import ( "github.com/NethermindEth/juno/core/felt" "github.com/NethermindEth/juno/jsonrpc" - "github.com/NethermindEth/juno/rpc/rpccore" ) type Event struct { @@ -64,27 +63,6 @@ func (h *Handler) SubscribeNewHeads(ctx context.Context) (uint64, *jsonrpc.Error return id, nil } -func (h *Handler) Unsubscribe(ctx context.Context, id uint64) (bool, *jsonrpc.Error) { - w, ok := jsonrpc.ConnFromContext(ctx) - if !ok { - return false, jsonrpc.Err(jsonrpc.MethodNotFound, nil) - } - sub, ok := h.subscriptions.Load(id) - if !ok { - return false, rpccore.ErrInvalidSubscriptionID - } - - subs := sub.(*subscription) - if !subs.conn.Equal(w) { - return false, rpccore.ErrInvalidSubscriptionID - } - - subs.cancel() - subs.wg.Wait() // Let the subscription finish before responding. - h.subscriptions.Delete(id) - return true, nil -} - // unsubscribe assumes h.mu is unlocked. It releases all subscription resources. func (h *Handler) unsubscribe(sub *subscription, id uint64) { sub.cancel() diff --git a/rpc/v8/handlers.go b/rpc/v8/handlers.go index 614849373c..97a8c4954d 100644 --- a/rpc/v8/handlers.go +++ b/rpc/v8/handlers.go @@ -270,11 +270,6 @@ func (h *Handler) methods() ([]jsonrpc.Method, string) { //nolint: funlen Params: []jsonrpc.Parameter{{Name: "transaction_details", Optional: true}, {Name: "sender_address", Optional: true}}, Handler: h.SubscribePendingTxs, }, - { - Name: "starknet_unsubscribe", - Params: []jsonrpc.Parameter{{Name: "subscription_id"}}, - Handler: h.Unsubscribe, - }, { Name: "starknet_getBlockWithReceipts", Params: []jsonrpc.Parameter{{Name: "block_id"}}, diff --git a/rpc/v8/subscriptions.go b/rpc/v8/subscriptions.go index 00e1afae3c..b16439199e 100644 --- a/rpc/v8/subscriptions.go +++ b/rpc/v8/subscriptions.go @@ -741,27 +741,6 @@ func (h *Handler) sendReorg(w jsonrpc.Conn, reorg *sync.ReorgBlockRange, id uint return err } -func (h *Handler) Unsubscribe(ctx context.Context, id uint64) (bool, *jsonrpc.Error) { - w, ok := jsonrpc.ConnFromContext(ctx) - if !ok { - return false, jsonrpc.Err(jsonrpc.MethodNotFound, nil) - } - sub, ok := h.subscriptions.Load(id) - if !ok { - return false, rpccore.ErrInvalidSubscriptionID - } - - subs := sub.(*subscription) - if !subs.conn.Equal(w) { - return false, rpccore.ErrInvalidSubscriptionID - } - - subs.cancel() - subs.wg.Wait() // Let the subscription finish before responding. - h.subscriptions.Delete(id) - return true, nil -} - type SubscriptionTransactionStatus struct { TransactionHash *felt.Felt `json:"transaction_hash"` Status TransactionStatus `json:"status"` diff --git a/rpc/v8/subscriptions_test.go b/rpc/v8/subscriptions_test.go index 2e2bc19de9..f7b7fc2209 100644 --- a/rpc/v8/subscriptions_test.go +++ b/rpc/v8/subscriptions_test.go @@ -991,107 +991,6 @@ func TestSubscribePendingTxs(t *testing.T) { }) } -func TestUnsubscribe(t *testing.T) { - log := utils.NewNopZapLogger() - - t.Run("error when no connection in context", func(t *testing.T) { - mockCtrl := gomock.NewController(t) - t.Cleanup(mockCtrl.Finish) - - mockChain := mocks.NewMockReader(mockCtrl) - mockSyncer := mocks.NewMockSyncReader(mockCtrl) - handler := New(mockChain, mockSyncer, nil, "", log) - - success, rpcErr := handler.Unsubscribe(context.Background(), 1) - assert.False(t, success) - assert.Equal(t, jsonrpc.Err(jsonrpc.MethodNotFound, nil), rpcErr) - }) - - t.Run("error when subscription ID doesn't exist", func(t *testing.T) { - mockCtrl := gomock.NewController(t) - t.Cleanup(mockCtrl.Finish) - - mockChain := mocks.NewMockReader(mockCtrl) - mockSyncer := mocks.NewMockSyncReader(mockCtrl) - handler := New(mockChain, mockSyncer, nil, "", log) - - serverConn, _ := net.Pipe() - t.Cleanup(func() { - require.NoError(t, serverConn.Close()) - }) - - ctx := context.WithValue(context.Background(), jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) - success, rpcErr := handler.Unsubscribe(ctx, 999) - assert.False(t, success) - assert.Equal(t, rpccore.ErrInvalidSubscriptionID, rpcErr) - }) - - t.Run("return false when connection doesn't match", func(t *testing.T) { - mockCtrl := gomock.NewController(t) - t.Cleanup(mockCtrl.Finish) - - mockChain := mocks.NewMockReader(mockCtrl) - mockSyncer := mocks.NewMockSyncReader(mockCtrl) - handler := New(mockChain, mockSyncer, nil, "", log) - - // Create original subscription - serverConn1, _ := net.Pipe() - t.Cleanup(func() { - require.NoError(t, serverConn1.Close()) - }) - - subCtx := context.WithValue(context.Background(), jsonrpc.ConnKey{}, &fakeConn{w: serverConn1}) - _, subscriptionCtxCancel := context.WithCancel(subCtx) - sub := &subscription{ - cancel: subscriptionCtxCancel, - conn: &fakeConn{w: serverConn1}, - } - handler.subscriptions.Store(uint64(1), sub) - - // Try to unsubscribe with different connection - serverConn2, _ := net.Pipe() - t.Cleanup(func() { - require.NoError(t, serverConn2.Close()) - }) - - unsubCtx := context.WithValue(context.Background(), jsonrpc.ConnKey{}, &fakeConn{w: serverConn2}) - success, rpcErr := handler.Unsubscribe(unsubCtx, 1) - assert.False(t, success) - assert.NotNil(t, rpcErr) - }) - - t.Run("successful unsubscribe", func(t *testing.T) { - mockCtrl := gomock.NewController(t) - t.Cleanup(mockCtrl.Finish) - - mockChain := mocks.NewMockReader(mockCtrl) - mockSyncer := mocks.NewMockSyncReader(mockCtrl) - handler := New(mockChain, mockSyncer, nil, "", log) - - serverConn, _ := net.Pipe() - t.Cleanup(func() { - require.NoError(t, serverConn.Close()) - }) - - conn := &fakeConn{w: serverConn} - subCtx := context.WithValue(context.Background(), jsonrpc.ConnKey{}, conn) - _, subscriptionCtxCancel := context.WithCancel(subCtx) - sub := &subscription{ - cancel: subscriptionCtxCancel, - conn: conn, - } - handler.subscriptions.Store(uint64(1), sub) - - success, rpcErr := handler.Unsubscribe(subCtx, 1) - assert.True(t, success) - assert.Nil(t, rpcErr) - - // Verify subscription was removed - _, exists := handler.subscriptions.Load(uint64(1)) - assert.False(t, exists) - }) -} - func createWsConn(t *testing.T, ctx context.Context, server *jsonrpc.Server) *websocket.Conn { ws := jsonrpc.NewWebsocket(server, nil, utils.NewNopZapLogger()) httpSrv := httptest.NewServer(ws)