From b2b4996d5cad7a585eaed27d2c26c98b3c249564 Mon Sep 17 00:00:00 2001 From: IronGauntlets Date: Wed, 20 Nov 2024 11:31:54 +0000 Subject: [PATCH 1/6] Add EventFilterer The main reason for adding this interface is for testing as it is quite challenging to find events in a particular order. --- blockchain/event_filter.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/blockchain/event_filter.go b/blockchain/event_filter.go index e2f7b57c44..98af88218d 100644 --- a/blockchain/event_filter.go +++ b/blockchain/event_filter.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "errors" "fmt" + "io" "math" "github.com/NethermindEth/juno/core" @@ -14,6 +15,15 @@ import ( var errChunkSizeReached = errors.New("chunk size reached") +type EventFilterer interface { + io.Closer + + Events(cToken *ContinuationToken, chunkSize uint64) ([]*FilteredEvent, *ContinuationToken, error) + SetRangeEndBlockByNumber(filterRange EventFilterRange, blockNumber uint64) error + SetRangeEndBlockByHash(filterRange EventFilterRange, blockHash *felt.Felt) error + WithLimit(limit uint) *EventFilter +} + type EventFilter struct { txn db.Transaction fromBlock uint64 From d21a59f0368d2de90d16ea21cca62e2b796640ba Mon Sep 17 00:00:00 2001 From: IronGauntlets Date: Wed, 20 Nov 2024 11:35:47 +0000 Subject: [PATCH 2/6] Add MockEventFilterer and its go generate directive --- blockchain/event_filter.go | 1 + mocks/mock_event_filterer.go | 113 +++++++++++++++++++++++++++++++++++ 2 files changed, 114 insertions(+) create mode 100644 mocks/mock_event_filterer.go diff --git a/blockchain/event_filter.go b/blockchain/event_filter.go index 98af88218d..37c79b5e00 100644 --- a/blockchain/event_filter.go +++ b/blockchain/event_filter.go @@ -15,6 +15,7 @@ import ( var errChunkSizeReached = errors.New("chunk size reached") +//go:generate mockgen -destination=../mocks/mock_event_filterer.go -package=mocks github.com/NethermindEth/juno/blockchain EventFilterer type EventFilterer interface { io.Closer diff --git a/mocks/mock_event_filterer.go b/mocks/mock_event_filterer.go new file mode 100644 index 0000000000..8c800fc41b --- /dev/null +++ b/mocks/mock_event_filterer.go @@ -0,0 +1,113 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/NethermindEth/juno/blockchain (interfaces: EventFilterer) +// +// Generated by this command: +// +// mockgen -destination=../mocks/mock_event_filterer.go -package=mocks github.com/NethermindEth/juno/blockchain EventFilterer +// + +// Package mocks is a generated GoMock package. +package mocks + +import ( + reflect "reflect" + + blockchain "github.com/NethermindEth/juno/blockchain" + felt "github.com/NethermindEth/juno/core/felt" + gomock "go.uber.org/mock/gomock" +) + +// MockEventFilterer is a mock of EventFilterer interface. +type MockEventFilterer struct { + ctrl *gomock.Controller + recorder *MockEventFiltererMockRecorder +} + +// MockEventFiltererMockRecorder is the mock recorder for MockEventFilterer. +type MockEventFiltererMockRecorder struct { + mock *MockEventFilterer +} + +// NewMockEventFilterer creates a new mock instance. +func NewMockEventFilterer(ctrl *gomock.Controller) *MockEventFilterer { + mock := &MockEventFilterer{ctrl: ctrl} + mock.recorder = &MockEventFiltererMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockEventFilterer) EXPECT() *MockEventFiltererMockRecorder { + return m.recorder +} + +// Close mocks base method. +func (m *MockEventFilterer) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockEventFiltererMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockEventFilterer)(nil).Close)) +} + +// Events mocks base method. +func (m *MockEventFilterer) Events(arg0 *blockchain.ContinuationToken, arg1 uint64) ([]*blockchain.FilteredEvent, *blockchain.ContinuationToken, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Events", arg0, arg1) + ret0, _ := ret[0].([]*blockchain.FilteredEvent) + ret1, _ := ret[1].(*blockchain.ContinuationToken) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// Events indicates an expected call of Events. +func (mr *MockEventFiltererMockRecorder) Events(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Events", reflect.TypeOf((*MockEventFilterer)(nil).Events), arg0, arg1) +} + +// SetRangeEndBlockByHash mocks base method. +func (m *MockEventFilterer) SetRangeEndBlockByHash(arg0 blockchain.EventFilterRange, arg1 *felt.Felt) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetRangeEndBlockByHash", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// SetRangeEndBlockByHash indicates an expected call of SetRangeEndBlockByHash. +func (mr *MockEventFiltererMockRecorder) SetRangeEndBlockByHash(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetRangeEndBlockByHash", reflect.TypeOf((*MockEventFilterer)(nil).SetRangeEndBlockByHash), arg0, arg1) +} + +// SetRangeEndBlockByNumber mocks base method. +func (m *MockEventFilterer) SetRangeEndBlockByNumber(arg0 blockchain.EventFilterRange, arg1 uint64) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetRangeEndBlockByNumber", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// SetRangeEndBlockByNumber indicates an expected call of SetRangeEndBlockByNumber. +func (mr *MockEventFiltererMockRecorder) SetRangeEndBlockByNumber(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetRangeEndBlockByNumber", reflect.TypeOf((*MockEventFilterer)(nil).SetRangeEndBlockByNumber), arg0, arg1) +} + +// WithLimit mocks base method. +func (m *MockEventFilterer) WithLimit(arg0 uint) *blockchain.EventFilter { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WithLimit", arg0) + ret0, _ := ret[0].(*blockchain.EventFilter) + return ret0 +} + +// WithLimit indicates an expected call of WithLimit. +func (mr *MockEventFiltererMockRecorder) WithLimit(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WithLimit", reflect.TypeOf((*MockEventFilterer)(nil).WithLimit), arg0) +} From 9fd4b8b6d493653dd166bff2973cdb6be040b7aa Mon Sep 17 00:00:00 2001 From: IronGauntlets Date: Wed, 20 Nov 2024 11:45:28 +0000 Subject: [PATCH 3/6] Return EventFilterer from Events() on Blockchain --- blockchain/blockchain.go | 4 ++-- mocks/mock_blockchain.go | 4 ++-- rpc/events.go | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index 2ceb1f27de..039cce280c 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -44,7 +44,7 @@ type Reader interface { BlockCommitmentsByNumber(blockNumber uint64) (*core.BlockCommitments, error) - EventFilter(from *felt.Felt, keys [][]felt.Felt) (*EventFilter, error) + EventFilter(from *felt.Felt, keys [][]felt.Felt) (EventFilterer, error) Pending() (Pending, error) @@ -854,7 +854,7 @@ func (b *Blockchain) StateAtBlockHash(blockHash *felt.Felt) (core.StateReader, S } // EventFilter returns an EventFilter object that is tied to a snapshot of the blockchain -func (b *Blockchain) EventFilter(from *felt.Felt, keys [][]felt.Felt) (*EventFilter, error) { +func (b *Blockchain) EventFilter(from *felt.Felt, keys [][]felt.Felt) (EventFilterer, error) { b.listener.OnRead("EventFilter") txn, err := b.database.NewTransaction(false) if err != nil { diff --git a/mocks/mock_blockchain.go b/mocks/mock_blockchain.go index 6b02248259..b42b38232f 100644 --- a/mocks/mock_blockchain.go +++ b/mocks/mock_blockchain.go @@ -119,10 +119,10 @@ func (mr *MockReaderMockRecorder) BlockHeaderByNumber(arg0 any) *gomock.Call { } // EventFilter mocks base method. -func (m *MockReader) EventFilter(arg0 *felt.Felt, arg1 [][]felt.Felt) (*blockchain.EventFilter, error) { +func (m *MockReader) EventFilter(arg0 *felt.Felt, arg1 [][]felt.Felt) (blockchain.EventFilterer, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "EventFilter", arg0, arg1) - ret0, _ := ret[0].(*blockchain.EventFilter) + ret0, _ := ret[0].(blockchain.EventFilterer) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/rpc/events.go b/rpc/events.go index 002c0e077b..7a322fde1e 100644 --- a/rpc/events.go +++ b/rpc/events.go @@ -191,7 +191,7 @@ func (h *Handler) unsubscribe(sub *subscription, id uint64) { h.mu.Unlock() } -func setEventFilterRange(filter *blockchain.EventFilter, fromID, toID *BlockID, latestHeight uint64) error { +func setEventFilterRange(filter blockchain.EventFilterer, fromID, toID *BlockID, latestHeight uint64) error { set := func(filterRange blockchain.EventFilterRange, id *BlockID) error { if id == nil { return nil From b4b0cce1796018725692be1f5573151d4ef9de76 Mon Sep 17 00:00:00 2001 From: IronGauntlets Date: Wed, 6 Nov 2024 12:34:46 +0000 Subject: [PATCH 4/6] Add subscribeEvents and subcriptionEvents --- rpc/events.go | 4 + rpc/handlers.go | 9 + rpc/subscriptions.go | 178 ++++++++++++++++++++ rpc/subscriptions_test.go | 338 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 529 insertions(+) create mode 100644 rpc/subscriptions.go create mode 100644 rpc/subscriptions_test.go diff --git a/rpc/events.go b/rpc/events.go index 7a322fde1e..943315c3b5 100644 --- a/rpc/events.go +++ b/rpc/events.go @@ -44,6 +44,10 @@ type EventsChunk struct { ContinuationToken string `json:"continuation_token,omitempty"` } +type SubscriptionID struct { + ID uint64 `json:"subscription_id"` +} + /**************************************************** Events Handlers *****************************************************/ diff --git a/rpc/handlers.go b/rpc/handlers.go index 4d4d35b508..1cf96b0c21 100644 --- a/rpc/handlers.go +++ b/rpc/handlers.go @@ -5,6 +5,7 @@ import ( "crypto/rand" "encoding/binary" "encoding/json" + "fmt" "log" "math" "strings" @@ -65,12 +66,15 @@ var ( ErrUnsupportedTxVersion = &jsonrpc.Error{Code: 61, Message: "the transaction version is not supported"} ErrUnsupportedContractClassVersion = &jsonrpc.Error{Code: 62, Message: "the contract class version is not supported"} ErrUnexpectedError = &jsonrpc.Error{Code: 63, Message: "An unexpected error occurred"} + ErrTooManyBlocksBack = &jsonrpc.Error{Code: 68, Message: fmt.Sprintf("Cannot go back more than %v blocks", maxBlocksBack)} + ErrCallOnPending = &jsonrpc.Error{Code: 69, Message: "This method does not support being called on the pending block"} // These errors can be only be returned by Juno-specific methods. ErrSubscriptionNotFound = &jsonrpc.Error{Code: 100, Message: "Subscription not found"} ) const ( + maxBlocksBack = 1024 maxEventChunkSize = 10240 maxEventFilterKeys = 1024 traceCacheSize = 128 @@ -334,6 +338,11 @@ func (h *Handler) Methods() ([]jsonrpc.Method, string) { //nolint: funlen Name: "starknet_specVersion", Handler: h.SpecVersion, }, + { + Name: "starknet_subscribeEvents", + Params: []jsonrpc.Parameter{{Name: "from_address", Optional: true}, {Name: "keys", Optional: true}, {Name: "block", Optional: true}}, + Handler: h.SubscribeEvents, + }, { Name: "juno_subscribeNewHeads", Handler: h.SubscribeNewHeads, diff --git a/rpc/subscriptions.go b/rpc/subscriptions.go new file mode 100644 index 0000000000..314b054c14 --- /dev/null +++ b/rpc/subscriptions.go @@ -0,0 +1,178 @@ +package rpc + +import ( + "context" + "encoding/json" + "sync" + + "github.com/NethermindEth/juno/blockchain" + "github.com/NethermindEth/juno/core" + "github.com/NethermindEth/juno/core/felt" + "github.com/NethermindEth/juno/jsonrpc" +) + +const subscribeEventsChunkSize = 1024 + +func (h *Handler) SubscribeEvents(ctx context.Context, fromAddr *felt.Felt, keys [][]felt.Felt, + blockID *BlockID, +) (*SubscriptionID, *jsonrpc.Error) { + w, ok := jsonrpc.ConnFromContext(ctx) + if !ok { + return nil, jsonrpc.Err(jsonrpc.MethodNotFound, nil) + } + + lenKeys := len(keys) + for _, k := range keys { + lenKeys += len(k) + } + if lenKeys > maxEventFilterKeys { + return nil, ErrTooManyKeysInFilter + } + + var requestedHeader *core.Header + headHeader, err := h.bcReader.HeadsHeader() + if err != nil { + return nil, ErrInternal.CloneWithData(err.Error()) + } + + if blockID == nil { + requestedHeader = headHeader + } else { + if blockID.Pending { + return nil, ErrCallOnPending + } + + var rpcErr *jsonrpc.Error + requestedHeader, rpcErr = h.blockHeaderByID(blockID) + if rpcErr != nil { + return nil, rpcErr + } + + if headHeader.Number >= maxBlocksBack && requestedHeader.Number <= headHeader.Number-maxBlocksBack { + return nil, ErrTooManyBlocksBack + } + } + + id := h.idgen() + subscriptionCtx, subscriptionCtxCancel := context.WithCancel(ctx) + sub := &subscription{ + cancel: subscriptionCtxCancel, + conn: w, + } + h.mu.Lock() + h.subscriptions[id] = sub + h.mu.Unlock() + + headerSub := h.newHeads.Subscribe() + sub.wg.Go(func() { + defer func() { + h.unsubscribe(sub, id) + headerSub.Unsubscribe() + }() + + // The specification doesn't enforce ordering of events therefore events from new blocks can be sent before + // old blocks. + // Todo: see if sub's wg can be used? + wg := sync.WaitGroup{} + wg.Add(1) + + go func() { + defer wg.Done() + + for { + select { + case <-subscriptionCtx.Done(): + return + case header := <-headerSub.Recv(): + + h.processEvents(subscriptionCtx, w, id, header.Number, header.Number, fromAddr, keys) + } + } + }() + + h.processEvents(subscriptionCtx, w, id, requestedHeader.Number, headHeader.Number, fromAddr, keys) + + wg.Wait() + }) + + return &SubscriptionID{ID: id}, nil +} + +func (h *Handler) processEvents(ctx context.Context, w jsonrpc.Conn, id, from, to uint64, fromAddr *felt.Felt, keys [][]felt.Felt) { + filter, err := h.bcReader.EventFilter(fromAddr, keys) + if err != nil { + h.log.Warnw("Error creating event filter", "err", err) + return + } + + defer h.callAndLogErr(filter.Close, "Error closing event filter in events subscription") + + if err = setEventFilterRange(filter, &BlockID{Number: from}, &BlockID{Number: to}, to); err != nil { + h.log.Warnw("Error setting event filter range", "err", err) + return + } + + filteredEvents, cToken, err := filter.Events(nil, subscribeEventsChunkSize) + if err != nil { + h.log.Warnw("Error filtering events", "err", err) + return + } + + err = sendEvents(ctx, w, filteredEvents, id) + if err != nil { + h.log.Warnw("Error sending events", "err", err) + return + } + + for cToken != nil { + filteredEvents, cToken, err = filter.Events(cToken, subscribeEventsChunkSize) + if err != nil { + h.log.Warnw("Error filtering events", "err", err) + return + } + + err = sendEvents(ctx, w, filteredEvents, id) + if err != nil { + h.log.Warnw("Error sending events", "err", err) + return + } + } +} + +func sendEvents(ctx context.Context, w jsonrpc.Conn, events []*blockchain.FilteredEvent, id uint64) error { + for _, event := range events { + select { + case <-ctx.Done(): + return ctx.Err() + default: + emittedEvent := &EmittedEvent{ + BlockNumber: &event.BlockNumber, // This always be filled as subscribeEvents cannot be called on pending block + BlockHash: event.BlockHash, + TransactionHash: event.TransactionHash, + Event: &Event{ + From: event.From, + Keys: event.Keys, + Data: event.Data, + }, + } + + resp, err := json.Marshal(jsonrpc.Request{ + Version: "2.0", + Method: "starknet_subscriptionEvents", + Params: map[string]any{ + "subscription_id": id, + "result": emittedEvent, + }, + }) + if err != nil { + return err + } + + _, err = w.Write(resp) + if err != nil { + return err + } + } + } + return nil +} diff --git a/rpc/subscriptions_test.go b/rpc/subscriptions_test.go new file mode 100644 index 0000000000..4076364333 --- /dev/null +++ b/rpc/subscriptions_test.go @@ -0,0 +1,338 @@ +package rpc + +import ( + "context" + "encoding/json" + "io" + "net" + "testing" + "time" + + "github.com/NethermindEth/juno/blockchain" + "github.com/NethermindEth/juno/clients/feeder" + "github.com/NethermindEth/juno/core" + "github.com/NethermindEth/juno/core/felt" + "github.com/NethermindEth/juno/feed" + "github.com/NethermindEth/juno/jsonrpc" + "github.com/NethermindEth/juno/mocks" + adaptfeeder "github.com/NethermindEth/juno/starknetdata/feeder" + "github.com/NethermindEth/juno/utils" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" +) + +// Due to the difference in how some test files in rpc use "package rpc" vs "package rpc_test" it was easiest to copy +// the fakeConn here. +// Todo: move all the subscription related test here +type fakeConn struct { + w io.Writer +} + +func (fc *fakeConn) Write(p []byte) (int, error) { + return fc.w.Write(p) +} + +func (fc *fakeConn) Equal(other jsonrpc.Conn) bool { + fc2, ok := other.(*fakeConn) + if !ok { + return false + } + return fc.w == fc2.w +} + +func TestSubscribeEvents(t *testing.T) { + log := utils.NewNopZapLogger() + + t.Run("Return error if too many keys in filter", func(t *testing.T) { + mockCtrl := gomock.NewController(t) + t.Cleanup(mockCtrl.Finish) + + mockChain := mocks.NewMockReader(mockCtrl) + mockSyncer := mocks.NewMockSyncReader(mockCtrl) + handler := New(mockChain, mockSyncer, nil, "", log) + + keys := make([][]felt.Felt, 1024+1) + fromAddr := new(felt.Felt).SetBytes([]byte("from_address")) + + serverConn, clientConn := net.Pipe() + t.Cleanup(func() { + require.NoError(t, serverConn.Close()) + require.NoError(t, clientConn.Close()) + }) + + subCtx := context.WithValue(context.Background(), jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) + + id, rpcErr := handler.SubscribeEvents(subCtx, fromAddr, keys, nil) + assert.Zero(t, id) + assert.Equal(t, ErrTooManyKeysInFilter, rpcErr) + }) + + t.Run("Return error if called on pending block", func(t *testing.T) { + mockCtrl := gomock.NewController(t) + t.Cleanup(mockCtrl.Finish) + + mockChain := mocks.NewMockReader(mockCtrl) + mockSyncer := mocks.NewMockSyncReader(mockCtrl) + handler := New(mockChain, mockSyncer, nil, "", log) + + keys := make([][]felt.Felt, 1) + fromAddr := new(felt.Felt).SetBytes([]byte("from_address")) + blockID := &BlockID{Pending: true} + + serverConn, clientConn := net.Pipe() + t.Cleanup(func() { + require.NoError(t, serverConn.Close()) + require.NoError(t, clientConn.Close()) + }) + + subCtx := context.WithValue(context.Background(), jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) + + mockChain.EXPECT().HeadsHeader().Return(&core.Header{Number: 1}, nil) + + id, rpcErr := handler.SubscribeEvents(subCtx, fromAddr, keys, blockID) + assert.Zero(t, id) + assert.Equal(t, ErrCallOnPending, rpcErr) + }) + + t.Run("Return error if block is too far back", func(t *testing.T) { + mockCtrl := gomock.NewController(t) + t.Cleanup(mockCtrl.Finish) + + mockChain := mocks.NewMockReader(mockCtrl) + mockSyncer := mocks.NewMockSyncReader(mockCtrl) + handler := New(mockChain, mockSyncer, nil, "", log) + + keys := make([][]felt.Felt, 1) + fromAddr := new(felt.Felt).SetBytes([]byte("from_address")) + blockID := &BlockID{Number: 0} + + serverConn, clientConn := net.Pipe() + t.Cleanup(func() { + require.NoError(t, serverConn.Close()) + require.NoError(t, clientConn.Close()) + }) + + subCtx := context.WithValue(context.Background(), jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) + + // Note the end of the window doesn't need to be tested because if requested block number is more than the + // head, a block not found error will be returned. This behaviour has been tested in various other tests, and we + // don't need to test it here again. + t.Run("head is 1024", func(t *testing.T) { + mockChain.EXPECT().HeadsHeader().Return(&core.Header{Number: 1024}, nil) + mockChain.EXPECT().BlockHeaderByNumber(blockID.Number).Return(&core.Header{Number: 0}, nil) + + id, rpcErr := handler.SubscribeEvents(subCtx, fromAddr, keys, blockID) + assert.Zero(t, id) + assert.Equal(t, ErrTooManyBlocksBack, rpcErr) + }) + + t.Run("head is more than 1024", func(t *testing.T) { + mockChain.EXPECT().HeadsHeader().Return(&core.Header{Number: 2024}, nil) + mockChain.EXPECT().BlockHeaderByNumber(blockID.Number).Return(&core.Header{Number: 0}, nil) + + id, rpcErr := handler.SubscribeEvents(subCtx, fromAddr, keys, blockID) + assert.Zero(t, id) + assert.Equal(t, ErrTooManyBlocksBack, rpcErr) + }) + }) + + n := utils.Ptr(utils.Sepolia) + client := feeder.NewTestClient(t, n) + gw := adaptfeeder.New(client) + + b1, err := gw.BlockByNumber(context.Background(), 56377) + require.NoError(t, err) + + fromAddr := new(felt.Felt).SetBytes([]byte("some address")) + keys := [][]felt.Felt{{*new(felt.Felt).SetBytes([]byte("key1"))}} + + filteredEvents := []*blockchain.FilteredEvent{ + { + Event: b1.Receipts[0].Events[0], + BlockNumber: b1.Number, + BlockHash: new(felt.Felt).SetBytes([]byte("b1")), + TransactionHash: b1.Transactions[0].Hash(), + }, + { + Event: b1.Receipts[1].Events[0], + BlockNumber: b1.Number + 1, + BlockHash: new(felt.Felt).SetBytes([]byte("b2")), + TransactionHash: b1.Transactions[1].Hash(), + }, + } + + var emittedEvents []*EmittedEvent + for _, e := range filteredEvents { + emittedEvents = append(emittedEvents, &EmittedEvent{ + Event: &Event{ + From: e.From, + Keys: e.Keys, + Data: e.Data, + }, + BlockHash: e.BlockHash, + BlockNumber: &e.BlockNumber, + TransactionHash: e.TransactionHash, + }) + } + + t.Run("Events from old blocks", func(t *testing.T) { + mockCtrl := gomock.NewController(t) + t.Cleanup(mockCtrl.Finish) + + mockChain := mocks.NewMockReader(mockCtrl) + mockSyncer := mocks.NewMockSyncReader(mockCtrl) + mockEventFilterer := mocks.NewMockEventFilterer(mockCtrl) + handler := New(mockChain, mockSyncer, nil, "", log) + + mockChain.EXPECT().HeadsHeader().Return(&core.Header{Number: b1.Number}, nil) + mockChain.EXPECT().BlockHeaderByNumber(b1.Number).Return(b1.Header, nil) + mockChain.EXPECT().EventFilter(fromAddr, keys).Return(mockEventFilterer, nil) + + mockEventFilterer.EXPECT().SetRangeEndBlockByNumber(gomock.Any(), gomock.Any()).Return(nil).MaxTimes(2) + mockEventFilterer.EXPECT().Events(gomock.Any(), gomock.Any()).Return(filteredEvents, nil, nil) + mockEventFilterer.EXPECT().Close().AnyTimes() + + serverConn, clientConn := net.Pipe() + t.Cleanup(func() { + require.NoError(t, serverConn.Close()) + require.NoError(t, clientConn.Close()) + }) + + ctx, cancel := context.WithCancel(context.Background()) + subCtx := context.WithValue(ctx, jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) + id, rpcErr := handler.SubscribeEvents(subCtx, fromAddr, keys, &BlockID{Number: b1.Number}) + require.Nil(t, rpcErr) + + var marshalledResponses [][]byte + for _, e := range emittedEvents { + resp, err := marshalSubscriptionResponse(e, id.ID) + require.NoError(t, err) + marshalledResponses = append(marshalledResponses, resp) + } + + for _, m := range marshalledResponses { + got := make([]byte, len(m)) + _, err := clientConn.Read(got) + require.NoError(t, err) + assert.Equal(t, string(m), string(got)) + } + cancel() + }) + + t.Run("Events when continuation token is not nil", func(t *testing.T) { + mockCtrl := gomock.NewController(t) + t.Cleanup(mockCtrl.Finish) + + mockChain := mocks.NewMockReader(mockCtrl) + mockSyncer := mocks.NewMockSyncReader(mockCtrl) + mockEventFilterer := mocks.NewMockEventFilterer(mockCtrl) + handler := New(mockChain, mockSyncer, nil, "", log) + + mockChain.EXPECT().HeadsHeader().Return(&core.Header{Number: b1.Number}, nil) + mockChain.EXPECT().BlockHeaderByNumber(b1.Number).Return(b1.Header, nil) + mockChain.EXPECT().EventFilter(fromAddr, keys).Return(mockEventFilterer, nil) + + cToken := new(blockchain.ContinuationToken) + mockEventFilterer.EXPECT().SetRangeEndBlockByNumber(gomock.Any(), gomock.Any()).Return(nil).MaxTimes(2) + mockEventFilterer.EXPECT().Events(gomock.Any(), gomock.Any()).Return( + []*blockchain.FilteredEvent{filteredEvents[0]}, cToken, nil) + mockEventFilterer.EXPECT().Events(gomock.Any(), gomock.Any()).Return( + []*blockchain.FilteredEvent{filteredEvents[1]}, nil, nil) + mockEventFilterer.EXPECT().Close().AnyTimes() + + serverConn, clientConn := net.Pipe() + t.Cleanup(func() { + require.NoError(t, serverConn.Close()) + require.NoError(t, clientConn.Close()) + }) + + ctx, cancel := context.WithCancel(context.Background()) + subCtx := context.WithValue(ctx, jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) + id, rpcErr := handler.SubscribeEvents(subCtx, fromAddr, keys, &BlockID{Number: b1.Number}) + require.Nil(t, rpcErr) + + var marshalledResponses [][]byte + for _, e := range emittedEvents { + resp, err := marshalSubscriptionResponse(e, id.ID) + require.NoError(t, err) + marshalledResponses = append(marshalledResponses, resp) + } + + for _, m := range marshalledResponses { + got := make([]byte, len(m)) + _, err := clientConn.Read(got) + require.NoError(t, err) + assert.Equal(t, string(m), string(got)) + } + cancel() + }) + + t.Run("Events from new blocks", func(t *testing.T) { + mockCtrl := gomock.NewController(t) + t.Cleanup(mockCtrl.Finish) + + mockChain := mocks.NewMockReader(mockCtrl) + mockSyncer := mocks.NewMockSyncReader(mockCtrl) + mockEventFilterer := mocks.NewMockEventFilterer(mockCtrl) + + handler := New(mockChain, mockSyncer, nil, "", log) + headerFeed := feed.New[*core.Header]() + handler.newHeads = headerFeed + + mockChain.EXPECT().HeadsHeader().Return(&core.Header{Number: b1.Number}, nil) + mockChain.EXPECT().EventFilter(fromAddr, keys).Return(mockEventFilterer, nil) + + mockEventFilterer.EXPECT().SetRangeEndBlockByNumber(gomock.Any(), gomock.Any()).Return(nil).MaxTimes(2) + mockEventFilterer.EXPECT().Events(gomock.Any(), gomock.Any()).Return([]*blockchain.FilteredEvent{filteredEvents[0]}, nil, nil) + mockEventFilterer.EXPECT().Close().AnyTimes() + + serverConn, clientConn := net.Pipe() + t.Cleanup(func() { + require.NoError(t, serverConn.Close()) + require.NoError(t, clientConn.Close()) + }) + + ctx, cancel := context.WithCancel(context.Background()) + subCtx := context.WithValue(ctx, jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) + id, rpcErr := handler.SubscribeEvents(subCtx, fromAddr, keys, nil) + require.Nil(t, rpcErr) + + resp, err := marshalSubscriptionResponse(emittedEvents[0], id.ID) + require.NoError(t, err) + + got := make([]byte, len(resp)) + _, err = clientConn.Read(got) + require.NoError(t, err) + assert.Equal(t, string(resp), string(got)) + + mockChain.EXPECT().EventFilter(fromAddr, keys).Return(mockEventFilterer, nil) + + mockEventFilterer.EXPECT().SetRangeEndBlockByNumber(gomock.Any(), gomock.Any()).Return(nil).MaxTimes(2) + mockEventFilterer.EXPECT().Events(gomock.Any(), gomock.Any()).Return([]*blockchain.FilteredEvent{filteredEvents[1]}, nil, nil) + + headerFeed.Send(&core.Header{Number: b1.Number + 1}) + + resp, err = marshalSubscriptionResponse(emittedEvents[1], id.ID) + require.NoError(t, err) + + got = make([]byte, len(resp)) + _, err = clientConn.Read(got) + require.NoError(t, err) + assert.Equal(t, string(resp), string(got)) + + cancel() + time.Sleep(100 * time.Millisecond) + }) +} + +func marshalSubscriptionResponse(e *EmittedEvent, id uint64) ([]byte, error) { + return json.Marshal(jsonrpc.Request{ + Version: "2.0", + Method: "starknet_subscriptionEvents", + Params: map[string]any{ + "subscription_id": id, + "result": e, + }, + }) +} From a0f5ff2cce639b9aa758b2eca745461520095471 Mon Sep 17 00:00:00 2001 From: IronGauntlets Date: Fri, 22 Nov 2024 12:48:19 +0000 Subject: [PATCH 5/6] Add SubscriptionResponse Using jsonrpc.Request for subscription response is misleading and by adding a separate struct for returning responses to subscription events is more appropriate --- rpc/subscriptions.go | 8 +++++++- rpc/subscriptions_test.go | 2 +- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/rpc/subscriptions.go b/rpc/subscriptions.go index 314b054c14..b049c6ce0d 100644 --- a/rpc/subscriptions.go +++ b/rpc/subscriptions.go @@ -13,6 +13,12 @@ import ( const subscribeEventsChunkSize = 1024 +type SubscriptionResponse struct { + Version string `json:"jsonrpc"` + Method string `json:"method"` + Params any `json:"params"` +} + func (h *Handler) SubscribeEvents(ctx context.Context, fromAddr *felt.Felt, keys [][]felt.Felt, blockID *BlockID, ) (*SubscriptionID, *jsonrpc.Error) { @@ -156,7 +162,7 @@ func sendEvents(ctx context.Context, w jsonrpc.Conn, events []*blockchain.Filter }, } - resp, err := json.Marshal(jsonrpc.Request{ + resp, err := json.Marshal(SubscriptionResponse{ Version: "2.0", Method: "starknet_subscriptionEvents", Params: map[string]any{ diff --git a/rpc/subscriptions_test.go b/rpc/subscriptions_test.go index 4076364333..a3ab61fa7c 100644 --- a/rpc/subscriptions_test.go +++ b/rpc/subscriptions_test.go @@ -327,7 +327,7 @@ func TestSubscribeEvents(t *testing.T) { } func marshalSubscriptionResponse(e *EmittedEvent, id uint64) ([]byte, error) { - return json.Marshal(jsonrpc.Request{ + return json.Marshal(SubscriptionResponse{ Version: "2.0", Method: "starknet_subscriptionEvents", Params: map[string]any{ From 5cac638ece4cb54d73507d8c34ba312c609e46a3 Mon Sep 17 00:00:00 2001 From: IronGauntlets Date: Fri, 22 Nov 2024 12:51:21 +0000 Subject: [PATCH 6/6] Use SubscriptionResponse for subscribeNewHeads --- rpc/events.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rpc/events.go b/rpc/events.go index 943315c3b5..a7298486f8 100644 --- a/rpc/events.go +++ b/rpc/events.go @@ -78,7 +78,7 @@ func (h *Handler) SubscribeNewHeads(ctx context.Context) (uint64, *jsonrpc.Error case <-subscriptionCtx.Done(): return case header := <-headerSub.Recv(): - resp, err := json.Marshal(jsonrpc.Request{ + resp, err := json.Marshal(SubscriptionResponse{ Version: "2.0", Method: "juno_subscribeNewHeads", Params: map[string]any{