Skip to content

Commit

Permalink
DRY processing of Events
Browse files Browse the repository at this point in the history
  • Loading branch information
IronGauntlets committed Nov 18, 2024
1 parent 98e2dd3 commit e75669b
Showing 1 changed file with 34 additions and 71 deletions.
105 changes: 34 additions & 71 deletions rpc/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,98 +67,61 @@ func (h *Handler) SubscribeEvents(ctx context.Context, fromAddr *felt.Felt, keys

// The specification doesn't enforce ordering of events therefore events from new blocks can be sent before
// old blocks.
// Todo: DRY
sub.wg.Go(func() {
for {
select {
case <-subscriptionCtx.Done():
return
case header := <-headerSub.Recv():
filter, err := h.bcReader.EventFilter(fromAddr, keys)
if err != nil {
h.log.Warnw("Error creating event filter", "err", err)
return
}
defer h.callAndLogErr(filter.Close, "Error closing event filter in events subscription")

if err = setEventFilterRange(filter, &BlockID{Number: header.Number},
&BlockID{Number: header.Number}, header.Number); err != nil {
h.log.Warnw("Error setting event filter range", "err", err)
return
}

var cToken *blockchain.ContinuationToken
filteredEvents, cToken, err := filter.Events(cToken, subscribeEventsChunkSize)
if err != nil {
h.log.Warnw("Error filtering events", "err", err)
return
}

err = sendEvents(subscriptionCtx, w, filteredEvents, id)
if err != nil {
h.log.Warnw("Error sending events", "err", err)
return
}

for cToken != nil {
filteredEvents, cToken, err = filter.Events(cToken, subscribeEventsChunkSize)
if err != nil {
h.log.Warnw("Error filtering events", "err", err)
return
}

err = sendEvents(subscriptionCtx, w, filteredEvents, id)
if err != nil {
h.log.Warnw("Error sending events", "err", err)
return
}
}
h.processEvents(subscriptionCtx, w, id, header.Number, headHeader.Number, fromAddr, keys)
}
}
})
h.processEvents(subscriptionCtx, w, id, requestedHeader.Number, headHeader.Number, fromAddr, keys)
})

filter, err := h.bcReader.EventFilter(fromAddr, keys)
if err != nil {
h.log.Warnw("Error creating event filter", "err", err)
return
}
defer h.callAndLogErr(filter.Close, "Error closing event filter in events subscription")
return &SubscriptionID{ID: id}, nil
}

if err = setEventFilterRange(filter, &BlockID{Number: requestedHeader.Number},
&BlockID{Number: headHeader.Number}, headHeader.Number); err != nil {
h.log.Warnw("Error setting event filter range", "err", err)
return
}
func (h *Handler) processEvents(ctx context.Context, w jsonrpc.Conn, id, from, to uint64, fromAddr *felt.Felt, keys [][]felt.Felt) {
filter, err := h.bcReader.EventFilter(fromAddr, keys)
if err != nil {
h.log.Warnw("Error creating event filter", "err", err)
return
}
defer h.callAndLogErr(filter.Close, "Error closing event filter in events subscription")

if err = setEventFilterRange(filter, &BlockID{Number: from}, &BlockID{Number: to}, to); err != nil {
h.log.Warnw("Error setting event filter range", "err", err)
return
}

var cToken *blockchain.ContinuationToken
filteredEvents, cToken, err := filter.Events(cToken, subscribeEventsChunkSize)
if err != nil {
h.log.Warnw("Error filtering events", "err", err)
return
}

var cToken *blockchain.ContinuationToken
filteredEvents, cToken, err := filter.Events(cToken, subscribeEventsChunkSize)
err = sendEvents(ctx, w, filteredEvents, id)
if err != nil {
h.log.Warnw("Error sending events", "err", err)
return
}

for cToken != nil {
filteredEvents, cToken, err = filter.Events(cToken, subscribeEventsChunkSize)
if err != nil {
h.log.Warnw("Error filtering events", "err", err)
return
}

err = sendEvents(subscriptionCtx, w, filteredEvents, id)
err = sendEvents(ctx, w, filteredEvents, id)
if err != nil {
h.log.Warnw("Error sending events", "err", err)
return
}

for cToken != nil {
filteredEvents, cToken, err = filter.Events(cToken, subscribeEventsChunkSize)
if err != nil {
h.log.Warnw("Error filtering events", "err", err)
return
}

err = sendEvents(subscriptionCtx, w, filteredEvents, id)
if err != nil {
h.log.Warnw("Error sending events", "err", err)
return
}
}
})

return &SubscriptionID{ID: id}, nil
}
}

func sendEvents(ctx context.Context, w jsonrpc.Conn, events []*blockchain.FilteredEvent, id uint64) error {
Expand Down

0 comments on commit e75669b

Please sign in to comment.