Skip to content

Commit

Permalink
improve listeners and fix enums on orders
Browse files Browse the repository at this point in the history
  • Loading branch information
larscom committed Jul 6, 2024
1 parent 574b40f commit 300b88b
Show file tree
Hide file tree
Showing 13 changed files with 575 additions and 368 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ For each event on the Bitvavo platform there is a listener available. A listener
import "github.com/larscom/bitvavo-go/pkg/bitvavo"

func main() {
// listen for ticker (public) events
listener := bitvavo.NewTickerListener()
// listen for candle (public) events
listener := bitvavo.NewCandlesListener()
defer listener.Close()

chn, err := listener.Listen([]string{"ETH-EUR"})
chn, err := listener.Listen([]string{"ETH-EUR"}, []bitvavo.Interval{bitvavo.INTERVAL_1M})
if err != nil {
panic(err)
}
Expand Down
87 changes: 47 additions & 40 deletions pkg/bitvavo/book_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,81 +13,88 @@ func NewBookListener() *BookListener {
chn := make(chan BookEvent)
rchn := make(chan struct{})

onMessage := func(data WebSocketEventData, err error) {
if err != nil {
chn <- BookEvent{Error: err}
} else if data.Event == EVENT_BOOK {
var book Book
chn <- BookEvent{Value: book, Error: data.Decode(&book)}
}
l := &BookListener{
chn: chn,
rchn: rchn,
once: new(sync.Once),
channel: CHANNEL_BOOK,
}

ctx, cancel := context.WithCancel(context.Background())
ws, err := NewWebSocket(ctx, onMessage, func() {
ws, err := NewWebSocket(ctx, l.onMessage, func() {
rchn <- struct{}{}
})

if err != nil {
panic(err)
}

return &BookListener{
ws: ws,
chn: chn,
rchn: rchn,
once: &sync.Once{},
subscriptions: make([]Subscription, 0),
closefn: cancel,
}
l.closefn = cancel
l.ws = ws

return l
}

// Listen for events, you 'can' call this function multiple times.
// The same channel is returned for each function call, meaning that all channel
// receivers get the same data.
func (t *BookListener) Listen(markets []string) (<-chan BookEvent, error) {
subs := []Subscription{NewSubscription(CHANNEL_BOOK, markets)}
if err := t.ws.Subscribe(subs); err != nil {
func (l *BookListener) Listen(markets []string) (<-chan BookEvent, error) {
if err := l.ws.Subscribe([]Subscription{NewSubscription(l.channel, markets)}); err != nil {
return nil, err
}

t.mu.Lock()
defer t.mu.Unlock()
t.subscriptions = append(t.subscriptions, subs...)
go l.resubscriber()

go t.resubscriber()

return t.chn, nil
return l.chn, nil
}

// Graceful shutdown, once you close a listener it can't be reused, you have to
// create a new one.
func (t *BookListener) Close() error {
t.mu.Lock()
defer t.mu.Unlock()

if len(t.subscriptions) == 0 {
func (l *BookListener) Close() error {
if len(l.subscriptions) == 0 {
return ErrNoSubscriptions
}

if err := t.ws.Unsubscribe(t.subscriptions); err != nil {
if err := l.ws.Unsubscribe(l.subscriptions); err != nil {
return err
}

t.closefn()
l.closefn()

close(t.chn)
close(t.rchn)
close(l.chn)
close(l.rchn)

t.subscriptions = nil
l.subscriptions = nil

return nil
}

func (t *BookListener) resubscriber() {
t.once.Do(func() {
for range t.rchn {
if err := t.ws.Subscribe(t.subscriptions); err != nil {
t.chn <- BookEvent{Error: err}
func (l *BookListener) onMessage(data WebSocketEventData, err error) {
if err != nil {
l.chn <- BookEvent{Error: err}
} else if data.Event == EVENT_SUBSCRIBED {
var subscribed Subscribed
if err := data.Decode(&subscribed); err != nil {
l.chn <- BookEvent{Error: err}
} else {
markets, ok := subscribed.Subscriptions[l.channel]
if ok {
l.subscriptions = []Subscription{NewSubscription(l.channel, markets)}
} else {
l.chn <- BookEvent{Error: ErrExpectedChannel(l.channel)}
}
}
} else if data.Event == EVENT_BOOK {
var book Book
l.chn <- BookEvent{Value: book, Error: data.Decode(&book)}
}
}

func (l *BookListener) resubscriber() {
l.once.Do(func() {
for range l.rchn {
if err := l.ws.Subscribe(l.subscriptions); err != nil {
l.chn <- BookEvent{Error: err}
}
}
})
Expand Down
93 changes: 53 additions & 40 deletions pkg/bitvavo/candles_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,81 +13,94 @@ func NewCandlesListener() *CandlesListener {
chn := make(chan CandleEvent)
rchn := make(chan struct{})

onMessage := func(data WebSocketEventData, err error) {
if err != nil {
chn <- CandleEvent{Error: err}
} else if data.Event == EVENT_CANDLE {
var candle Candle
chn <- CandleEvent{Value: candle, Error: data.Decode(&candle)}
}
l := &CandlesListener{
chn: chn,
rchn: rchn,
once: new(sync.Once),
channel: CHANNEL_CANDLES,
}

ctx, cancel := context.WithCancel(context.Background())
ws, err := NewWebSocket(ctx, onMessage, func() {
ws, err := NewWebSocket(ctx, l.onMessage, func() {
rchn <- struct{}{}
})

if err != nil {
panic(err)
}

return &CandlesListener{
ws: ws,
chn: chn,
rchn: rchn,
once: &sync.Once{},
subscriptions: make([]Subscription, 0),
closefn: cancel,
}
l.closefn = cancel
l.ws = ws

return l
}

// Listen for events, you 'can' call this function multiple times.
// The same channel is returned for each function call, meaning that all channel
// receivers get the same data.
func (t *CandlesListener) Listen(markets []string, intervals []Interval) (<-chan CandleEvent, error) {
subs := []Subscription{NewSubscription(CHANNEL_CANDLES, markets, intervals...)}
if err := t.ws.Subscribe(subs); err != nil {
func (l *CandlesListener) Listen(markets []string, intervals []Interval) (<-chan CandleEvent, error) {
if err := l.ws.Subscribe([]Subscription{NewSubscription(l.channel, markets, intervals...)}); err != nil {
return nil, err
}

t.mu.Lock()
defer t.mu.Unlock()
t.subscriptions = append(t.subscriptions, subs...)
go l.resubscriber()

go t.resubscriber()

return t.chn, nil
return l.chn, nil
}

// Graceful shutdown, once you close a listener it can't be reused, you have to
// create a new one.
func (t *CandlesListener) Close() error {
t.mu.Lock()
defer t.mu.Unlock()

if len(t.subscriptions) == 0 {
func (l *CandlesListener) Close() error {
if len(l.subscriptions) == 0 {
return ErrNoSubscriptions
}

if err := t.ws.Unsubscribe(t.subscriptions); err != nil {
if err := l.ws.Unsubscribe(l.subscriptions); err != nil {
return err
}

t.closefn()
l.closefn()

close(t.chn)
close(t.rchn)
close(l.chn)
close(l.rchn)

t.subscriptions = nil
l.subscriptions = nil

return nil
}

func (t *CandlesListener) resubscriber() {
t.once.Do(func() {
for range t.rchn {
if err := t.ws.Subscribe(t.subscriptions); err != nil {
t.chn <- CandleEvent{Error: err}
func (l *CandlesListener) onMessage(data WebSocketEventData, err error) {
if err != nil {
l.chn <- CandleEvent{Error: err}
} else if data.Event == EVENT_SUBSCRIBED {
var subscribed Subscribed
if err := data.Decode(&subscribed); err != nil {
l.chn <- CandleEvent{Error: err}
} else {
subs, ok := subscribed.SubscriptionsInterval[l.channel]
if ok {
markets := make([]string, 0)
intervals := make([]Interval, 0)
for i, m := range subs {
intervals = append(intervals, i)
markets = append(markets, m...)
}
l.subscriptions = []Subscription{NewSubscription(l.channel, markets, intervals...)}
} else {
l.chn <- CandleEvent{Error: ErrExpectedChannel(l.channel)}
}
}
} else if data.Event == EVENT_CANDLE {
var candle Candle
l.chn <- CandleEvent{Value: candle, Error: data.Decode(&candle)}
}
}

func (l *CandlesListener) resubscriber() {
l.once.Do(func() {
for range l.rchn {
if err := l.ws.Subscribe(l.subscriptions); err != nil {
l.chn <- CandleEvent{Error: err}
}
}
})
Expand Down
Loading

0 comments on commit 300b88b

Please sign in to comment.