diff --git a/ajc/main.go b/ajc/main.go index ff68b52..bc029b8 100644 --- a/ajc/main.go +++ b/ajc/main.go @@ -56,8 +56,10 @@ func main() { default: fmt.Fprintf(os.Stderr, "ajc runtime error: %v\n", err) - fmt.Fprintln(os.Stderr) - ajc.Usage(os.Args[1:]) + if err != asyncjobs.ErrNoTasks { + fmt.Fprintln(os.Stderr) + ajc.Usage(os.Args[1:]) + } } os.Exit(1) diff --git a/storage.go b/storage.go index 4b51391..3af5c21 100644 --- a/storage.go +++ b/storage.go @@ -920,6 +920,11 @@ func (s *jetStreamStorage) Tasks(ctx context.Context, limit int32) (chan *Task, return nil, fmt.Errorf("%w: task store not initialized", ErrStorageNotReady) } + if limit <= 0 { + s.log.Debugf("Defaulting to task list limit of 10000") + limit = 10000 + } + nfo, err := s.tasks.stream.State() if err != nil { return nil, err @@ -934,29 +939,33 @@ func (s *jetStreamStorage) Tasks(ctx context.Context, limit int32) (chan *Task, timeout, cancel := context.WithTimeout(ctx, time.Minute) sub, err := s.nc.Subscribe(s.nc.NewRespInbox(), func(msg *nats.Msg) { - if len(msg.Data) > 0 { - task := &Task{} - err := json.Unmarshal(msg.Data, task) + if len(msg.Data) == 0 { + return + } + + task := &Task{} + err := json.Unmarshal(msg.Data, task) + if err != nil { + return + } + + select { + case out <- task: + md, err := msg.Metadata() if err != nil { return } - - select { - case out <- task: - md, err := msg.Metadata() - if err != nil { - return - } - done := atomic.AddInt32(&cnt, 1) - if md.NumPending == 0 || done == limit { - msg.Sub.Unsubscribe() - cancel() - } - default: + done := atomic.AddInt32(&cnt, 1) + if md.NumPending == 0 || done == limit { msg.Sub.Unsubscribe() cancel() } + default: + msg.Sub.Unsubscribe() + cancel() } + + msg.Ack() }) if err != nil { return nil, err