Skip to content

Commit

Permalink
temp
Browse files Browse the repository at this point in the history
  • Loading branch information
AnkushinDaniil committed Jan 22, 2025
1 parent cc0972d commit 4907552
Show file tree
Hide file tree
Showing 2 changed files with 232 additions and 252 deletions.
114 changes: 105 additions & 9 deletions rpc/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"iter"
"log"
"maps"
"math"
"strings"
stdsync "sync"

"github.com/NethermindEth/juno/blockchain"
"github.com/NethermindEth/juno/clients/feeder"
Expand Down Expand Up @@ -100,8 +100,7 @@ type Handler struct {
l1Heads *feed.Feed[*core.L1Head]

idgen func() uint64
mu stdsync.Mutex // protects subscriptions.
subscriptions map[uint64]*subscription
subscriptions subscriptions

blockTraceCache *lru.Cache[traceCacheKey, []TracedBlockTransaction]

Expand All @@ -112,6 +111,106 @@ type Handler struct {
coreContractABI abi.ABI
}

func newSubscriptions() subscriptions {
return subscriptions{
id2sub: make(map[uint64]*subscription),
event: make(chan event),
valueChan: make(chan value),
valuesChan: make(chan iter.Seq[*subscription]),
}
}

type subscriptions struct {
id2sub map[uint64]*subscription
event chan event
valueChan chan value
valuesChan chan iter.Seq[*subscription]
}

type value struct {
sub *subscription
ok bool
}

type event struct {
id uint64
sub *subscription
action action
}

type action uint8

const (
add action = iota
remove
getValue
getValues
)

func (s subscriptions) add(id uint64, sub *subscription) {
fmt.Println("add")
s.event <- event{id: id, sub: sub, action: add}
fmt.Println("added")
}

func (s subscriptions) remove(id uint64) {
fmt.Println("remove")
s.event <- event{id: id, sub: nil, action: remove}
fmt.Println("removed")
}

func (s subscriptions) getValue(id uint64) (*subscription, bool) {
fmt.Println("getValue")
s.event <- event{id: id, action: getValue}
v := <-s.valueChan
fmt.Println("gotValue")
return v.sub, v.ok
}

func (s subscriptions) getValues() iter.Seq[*subscription] {
fmt.Println("getValues")
s.event <- event{action: getValues}
fmt.Println("gotValues")
return <-s.valuesChan
}

func (s subscriptions) run(ctx context.Context) {
fmt.Println("run")
go func() {
defer func() {
fmt.Println("defer close valueChan")
close(s.valueChan)
fmt.Println("defer close valuesChan")
close(s.valuesChan)
}()
for {
fmt.Println("select")
select {
case e := <-s.event:
fmt.Println("event")
switch e.action {
case add:
fmt.Println("add in run")
s.id2sub[e.id] = e.sub
case remove:
fmt.Println("remove in run")
delete(s.id2sub, e.id)
case getValue:
fmt.Println("getValue in run")
sub, ok := s.id2sub[e.id]
s.valueChan <- value{sub: sub, ok: ok}
case getValues:
fmt.Println("getValues in run")
s.valuesChan <- maps.Values(s.id2sub)
}
case <-ctx.Done():
fmt.Println("done")
return
}
}
}()
}

type subscription struct {
cancel func()
wg conc.WaitGroup
Expand Down Expand Up @@ -141,7 +240,7 @@ func New(bcReader blockchain.Reader, syncReader sync.Reader, virtualMachine vm.V
reorgs: feed.New[*sync.ReorgBlockRange](),
pendingTxs: feed.New[[]core.Transaction](),
l1Heads: feed.New[*core.L1Head](),
subscriptions: make(map[uint64]*subscription),
subscriptions: newSubscriptions(),

blockTraceCache: lru.NewCache[traceCacheKey, []TracedBlockTransaction](traceCacheSize),
filterLimit: math.MaxUint,
Expand Down Expand Up @@ -181,6 +280,7 @@ func (h *Handler) WithGateway(gatewayClient Gateway) *Handler {
}

func (h *Handler) Run(ctx context.Context) error {
h.subscriptions.run(ctx)
newHeadsSub := h.syncReader.SubscribeNewHeads().Subscription
reorgsSub := h.syncReader.SubscribeReorg().Subscription
pendingTxsSub := h.syncReader.SubscribePendingTxs().Subscription
Expand All @@ -196,11 +296,7 @@ func (h *Handler) Run(ctx context.Context) error {

<-ctx.Done()

h.mu.Lock()
subscriptions := maps.Values(h.subscriptions)
h.mu.Unlock()

for sub := range subscriptions {
for sub := range h.subscriptions.getValues() {
sub.wg.Wait()
}

Expand Down
Loading

0 comments on commit 4907552

Please sign in to comment.