From ad12baab71ec6143e30fa08ac3b3c88c68b29686 Mon Sep 17 00:00:00 2001 From: Andy Librian Date: Thu, 2 May 2024 15:12:52 +0700 Subject: [PATCH] Fix ingestion_worker --- pkg/protoqueue/nats.go | 9 ++++---- pkg/server/ingestion_worker.go | 39 ++++++---------------------------- 2 files changed, 11 insertions(+), 37 deletions(-) diff --git a/pkg/protoqueue/nats.go b/pkg/protoqueue/nats.go index 8b1fbbc..2935e6f 100644 --- a/pkg/protoqueue/nats.go +++ b/pkg/protoqueue/nats.go @@ -3,7 +3,6 @@ package protoqueue import ( "errors" "fmt" - "log" "time" "github.com/nats-io/nats.go" @@ -189,14 +188,14 @@ func (j *JetStream) publishWithRetry(subject string, data []byte) error { RetryInterval := 5 * time.Second var err error for i := 0; i < maxRetries; i++ { - // Publish message _, err = j.Conn.JSContext.Publish(subject, data) if err == nil { - // Message published successfully return nil } - log.Printf("Publish attempt %d failed: %v", i+1, err) - // Wait before retrying + + j.logger.Warn("Publish attempt failed") + j.logger.WithError(err).Warnf("Publish attempt %d failed", i+1) + time.Sleep(RetryInterval) } return err diff --git a/pkg/server/ingestion_worker.go b/pkg/server/ingestion_worker.go index ff2390f..af6899e 100644 --- a/pkg/server/ingestion_worker.go +++ b/pkg/server/ingestion_worker.go @@ -13,7 +13,7 @@ import ( type IngestionWorker struct { eventStore store.EventStore IngestionQueue protoqueue.QueueSubscriber - ingestionQueueForEventDetection protoqueue.QueueSubscriber + IngestionQueueForEventDetection protoqueue.QueueSubscriber logger *logrus.Logger } @@ -30,7 +30,7 @@ func NewIngestionWorker(logger *logrus.Logger, eventStore store.EventStore, queu return &IngestionWorker{ eventStore: eventStore, IngestionQueue: queueSubscriber, - ingestionQueueForEventDetection: queueSubscriberForEventDetection, + IngestionQueueForEventDetection: queueSubscriberForEventDetection, logger: logger, } } @@ -39,7 +39,7 @@ func NewIngestionWorker(logger *logrus.Logger, eventStore store.EventStore, queu // It uses a goroutine and a buffered channel to read events from the queue in the background. func (iw *IngestionWorker) Start() { go iw.loopConsumeQueue(iw.IngestionQueue) - go iw.loopConsumeQueueEventDetection(iw.ingestionQueueForEventDetection) + go iw.loopConsumeQueue(iw.IngestionQueueForEventDetection) } func (iw *IngestionWorker) loopConsumeQueue(queue protoqueue.QueueSubscriber) { @@ -59,13 +59,11 @@ func (iw *IngestionWorker) loopConsumeQueue(queue protoqueue.QueueSubscriber) { } }() - go func() { - defer iw.logger.Info("stopped consuming events from ingestion queue") + defer iw.logger.Info("stopped consuming events from ingestion queue") - for event := range eventChan { - iw.processEvent(event) - } - }() + for event := range eventChan { + iw.processEvent(event) + } } func (iw *IngestionWorker) processEvent(event *tarianpb.Event) { @@ -78,26 +76,3 @@ func (iw *IngestionWorker) processEvent(event *tarianpb.Event) { iw.logger.WithError(err).Error("error while processing event") } } - -func (iw *IngestionWorker) loopConsumeQueueEventDetection(queue protoqueue.QueueSubscriber) { - for { - msg, err := queue.NextMessage(&tarianpb.Event{}) - if err != nil { - // iw.logger.WithError(err).Error("error while processing event") - continue - } - - event, ok := msg.(*tarianpb.Event) - if !ok { - // iw.logger.WithError(err).Error("error while processing event") - continue - } - - event.ServerTimestamp = timestamppb.Now() - _ = iw.eventStore.Add(event) - - // if err != nil { - // iw.logger.WithError(err).Error("error while processing event") - // } - } -}