Skip to content

Commit

Permalink
Merge pull request #133 from getAlby/task-mutexes
Browse files Browse the repository at this point in the history
chore: remove all mutexes and store in subscription struct
  • Loading branch information
im-adithya authored Dec 20, 2024
2 parents dd8e695 + 924c065 commit 0aec5b8
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 55 deletions.
49 changes: 25 additions & 24 deletions internal/nostr/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,31 @@ const (
)

type Subscription struct {
ID uint
RelayUrl string
WebhookUrl string
PushToken string
IsIOS bool
Open bool
Ids *[]string `gorm:"-"`
Kinds *[]int `gorm:"-"`
Authors *[]string `gorm:"-"` // WalletPubkey is included in this
Tags *nostr.TagMap `gorm:"-"` // RequestEvent ID goes in the "e" tag
Since time.Time
Until time.Time
Limit int
Search string
CreatedAt time.Time
UpdatedAt time.Time
Uuid string `gorm:"type:uuid;default:gen_random_uuid()"`
EventChan chan *nostr.Event `gorm:"-"`
RequestEvent *RequestEvent `gorm:"-"`

IdsJson json.RawMessage `gorm:"type:jsonb"`
KindsJson json.RawMessage `gorm:"type:jsonb"`
AuthorsJson json.RawMessage `gorm:"type:jsonb"`
TagsJson json.RawMessage `gorm:"type:jsonb"`
ID uint
RelayUrl string
WebhookUrl string
PushToken string
IsIOS bool
Open bool
Ids *[]string `gorm:"-"`
Kinds *[]int `gorm:"-"`
Authors *[]string `gorm:"-"` // WalletPubkey is included in this
Tags *nostr.TagMap `gorm:"-"` // RequestEvent ID goes in the "e" tag
Since time.Time
Until time.Time
Limit int
Search string
CreatedAt time.Time
UpdatedAt time.Time
Uuid string `gorm:"type:uuid;default:gen_random_uuid()"`
EventChan chan *nostr.Event `gorm:"-"`
RequestEvent *RequestEvent `gorm:"-"`
RelaySubscription *nostr.Subscription `gorm:"-"`

IdsJson json.RawMessage `gorm:"type:jsonb"`
KindsJson json.RawMessage `gorm:"type:jsonb"`
AuthorsJson json.RawMessage `gorm:"type:jsonb"`
TagsJson json.RawMessage `gorm:"type:jsonb"`
}

func (s *Subscription) BeforeSave(tx *gorm.DB) error {
Expand Down
65 changes: 35 additions & 30 deletions internal/nostr/nostr.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ type Service struct {
Relay *nostr.Relay
Cfg *Config
Logger *logrus.Logger
subscriptions map[string]*nostr.Subscription
subscriptionsMutex sync.Mutex
relayMutex sync.Mutex
client *expo.PushClient
subCancelFnMap map[string]context.CancelFunc
}

func NewService(ctx context.Context) (*Service, error) {
Expand Down Expand Up @@ -116,8 +116,6 @@ func NewService(ctx context.Context) (*Service, error) {
return nil, err
}

subscriptions := make(map[string]*nostr.Subscription)

client := expo.NewPushClient(&expo.ClientConfig{
Host: "https://api.expo.dev",
APIURL: "/v2",
Expand All @@ -131,7 +129,6 @@ func NewService(ctx context.Context) (*Service, error) {
Wg: &wg,
Logger: logger,
Relay: relay,
subscriptions: subscriptions,
client: client,
}

Expand All @@ -142,7 +139,7 @@ func NewService(ctx context.Context) (*Service, error) {
logger.WithError(err).Error("Failed to query open subscriptions")
return nil, err
}

cancelFnMap := make(map[string]context.CancelFunc)
for _, sub := range openSubscriptions {
// Create a copy of the loop variable to
// avoid passing address of the same variable
Expand All @@ -151,8 +148,11 @@ func NewService(ctx context.Context) (*Service, error) {
if sub.PushToken != "" {
handleEvent = svc.handleSubscribedEventForPushNotification
}
go svc.startSubscription(svc.Ctx, &subscription, nil, handleEvent)
subCtx, subCancelFn := context.WithCancel(svc.Ctx)
cancelFnMap[subscription.Uuid] = subCancelFn
go svc.startSubscription(subCtx, &subscription, nil, handleEvent)
}
svc.subCancelFnMap = cancelFnMap

return svc, nil
}
Expand Down Expand Up @@ -569,7 +569,11 @@ func (svc *Service) NIP47NotificationHandler(c echo.Context) error {
})
}

go svc.startSubscription(svc.Ctx, &subscription, nil, svc.handleSubscribedEvent)
subCtx, subCancelFn := context.WithCancel(svc.Ctx)
svc.subscriptionsMutex.Lock()
svc.subCancelFnMap[subscription.Uuid] = subCancelFn
svc.subscriptionsMutex.Unlock()
go svc.startSubscription(subCtx, &subscription, nil, svc.handleSubscribedEvent)

return c.JSON(http.StatusOK, SubscriptionResponse{
SubscriptionId: subscription.Uuid,
Expand Down Expand Up @@ -628,7 +632,11 @@ func (svc *Service) SubscriptionHandler(c echo.Context) error {
})
}

go svc.startSubscription(svc.Ctx, &subscription, nil, svc.handleSubscribedEvent)
subCtx, subCancelFn := context.WithCancel(svc.Ctx)
svc.subscriptionsMutex.Lock()
svc.subCancelFnMap[subscription.Uuid] = subCancelFn
svc.subscriptionsMutex.Unlock()
go svc.startSubscription(subCtx, &subscription, nil, svc.handleSubscribedEvent)

return c.JSON(http.StatusOK, SubscriptionResponse{
SubscriptionId: subscription.Uuid,
Expand Down Expand Up @@ -686,14 +694,17 @@ func (svc *Service) StopSubscriptionHandler(c echo.Context) error {

func (svc *Service) stopSubscription(subscription *Subscription) error {
svc.subscriptionsMutex.Lock()
sub, exists := svc.subscriptions[subscription.Uuid]
cancelFn, exists := svc.subCancelFnMap[subscription.Uuid]
svc.subscriptionsMutex.Unlock()
if exists {
sub.Unsub()
delete(svc.subscriptions, subscription.Uuid)
cancelFn()
}

if subscription.RelaySubscription != nil {
subscription.RelaySubscription.Unsub()
}
svc.subscriptionsMutex.Unlock()

if (!exists && !subscription.Open) {
if (!subscription.Open) {
return errors.New(SUBSCRIPTION_ALREADY_CLOSED)
}

Expand Down Expand Up @@ -739,7 +750,7 @@ func (svc *Service) startSubscription(ctx context.Context, subscription *Subscri
continue
}

sub, err := relay.Subscribe(ctx, []nostr.Filter{*filter})
relaySubscription, err := relay.Subscribe(ctx, []nostr.Filter{*filter})
if err != nil {
// TODO: notify user about subscription failure
waitToReconnectSeconds = max(waitToReconnectSeconds, 1)
Expand All @@ -751,9 +762,7 @@ func (svc *Service) startSubscription(ctx context.Context, subscription *Subscri
continue
}

svc.subscriptionsMutex.Lock()
svc.subscriptions[subscription.Uuid] = sub
svc.subscriptionsMutex.Unlock()
subscription.RelaySubscription = relaySubscription

svc.Logger.WithFields(logrus.Fields{
"subscription_id": subscription.ID,
Expand Down Expand Up @@ -791,10 +800,8 @@ func (svc *Service) startSubscription(ctx context.Context, subscription *Subscri
func (svc *Service) publishRequestEvent(ctx context.Context, subscription *Subscription) {
walletPubkey, clientPubkey := getPubkeys(subscription)

svc.subscriptionsMutex.Lock()
sub := svc.subscriptions[subscription.Uuid]
svc.subscriptionsMutex.Unlock()
err := sub.Relay.Publish(ctx, *subscription.RequestEvent.SignedEvent)
relaySubscription := subscription.RelaySubscription
err := relaySubscription.Relay.Publish(ctx, *subscription.RequestEvent.SignedEvent)
if err != nil {
// TODO: notify user about publish failure
svc.Logger.WithError(err).WithFields(logrus.Fields{
Expand All @@ -804,7 +811,7 @@ func (svc *Service) publishRequestEvent(ctx context.Context, subscription *Subsc
"client_pubkey": clientPubkey,
}).Error("Failed to publish to relay")
subscription.RequestEvent.State = REQUEST_EVENT_PUBLISH_FAILED
sub.Unsub()
relaySubscription.Unsub()
} else {
svc.Logger.WithFields(logrus.Fields{
"request_event_id": subscription.RequestEvent.NostrId,
Expand Down Expand Up @@ -860,15 +867,13 @@ func (svc *Service) handleSubscribedEvent(event *nostr.Event, subscription *Subs
}

func (svc *Service) processEvents(ctx context.Context, subscription *Subscription, onReceiveEOS OnReceiveEOSFunc, handleEvent HandleEventFunc) error {
svc.subscriptionsMutex.Lock()
sub := svc.subscriptions[subscription.Uuid]
svc.subscriptionsMutex.Unlock()
relaySubscription := subscription.RelaySubscription

go func(){
// block till EOS is received for nip 47 handlers
// only if request event is not yet published
if (onReceiveEOS != nil && subscription.RequestEvent.State != REQUEST_EVENT_PUBLISH_CONFIRMED) {
<-sub.EndOfStoredEvents
<-relaySubscription.EndOfStoredEvents
svc.Logger.WithFields(logrus.Fields{
"subscription_id": subscription.ID,
"relay_url": subscription.RelayUrl,
Expand All @@ -878,7 +883,7 @@ func (svc *Service) processEvents(ctx context.Context, subscription *Subscriptio
}

// loop through incoming events
for event := range sub.Events {
for event := range relaySubscription.Events {
go handleEvent(event, subscription)
}

Expand All @@ -889,11 +894,11 @@ func (svc *Service) processEvents(ctx context.Context, subscription *Subscriptio
}()

select {
case <-sub.Relay.Context().Done():
return sub.Relay.ConnectionError
case <-relaySubscription.Relay.Context().Done():
return relaySubscription.Relay.ConnectionError
case <-ctx.Done():
return nil
case <-sub.Context.Done():
case <-relaySubscription.Context.Done():
return nil
}
}
Expand Down
7 changes: 6 additions & 1 deletion internal/nostr/push.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nostr

import (
"context"
"net/http"
"time"

Expand Down Expand Up @@ -107,7 +108,11 @@ func (svc *Service) NIP47PushNotificationHandler(c echo.Context) error {
})
}

go svc.startSubscription(svc.Ctx, &subscription, nil, svc.handleSubscribedEventForPushNotification)
subCtx, subCancelFn := context.WithCancel(svc.Ctx)
svc.subscriptionsMutex.Lock()
svc.subCancelFnMap[subscription.Uuid] = subCancelFn
svc.subscriptionsMutex.Unlock()
go svc.startSubscription(subCtx, &subscription, nil, svc.handleSubscribedEventForPushNotification)

return c.JSON(http.StatusOK, PushSubscriptionResponse{
SubscriptionId: subscription.Uuid,
Expand Down

0 comments on commit 0aec5b8

Please sign in to comment.