Skip to content

Commit

Permalink
Rework timeouts cron (WIP)
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Nov 15, 2024
1 parent 4eaa22e commit 784b252
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 20 deletions.
2 changes: 1 addition & 1 deletion core/tasks/timeouts/bulk_timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func init() {

// BulkTimeoutTask is the payload of the task
type BulkTimeoutTask struct {
Timeouts []Timeout `json:"timeouts"`
Timeouts []*Timeout `json:"timeouts"`
}

func (t *BulkTimeoutTask) Type() string {
Expand Down
2 changes: 1 addition & 1 deletion core/tasks/timeouts/bulk_timeout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestBulkTimeout(t *testing.T) {
dates.SetNowFunc(dates.NewFixedNow(time.Date(2024, 11, 15, 13, 59, 0, 0, time.UTC)))

testsuite.QueueBatchTask(t, rt, testdata.Org1, &timeouts.BulkTimeoutTask{
Timeouts: []timeouts.Timeout{
Timeouts: []*timeouts.Timeout{
{SessionID: 123456, ContactID: testdata.Cathy.ID, TimeoutOn: time.Date(2024, 11, 15, 13, 57, 0, 0, time.UTC)},
{SessionID: 234567, ContactID: testdata.Bob.ID, TimeoutOn: time.Date(2024, 11, 15, 13, 58, 0, 0, time.UTC)},
},
Expand Down
44 changes: 26 additions & 18 deletions core/tasks/timeouts/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ package timeouts
import (
"context"
"fmt"
"slices"
"time"

"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/tasks"
"github.com/nyaruka/mailroom/core/tasks/handler"
"github.com/nyaruka/mailroom/core/tasks/handler/ctasks"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/redisx"
)
Expand Down Expand Up @@ -45,22 +44,24 @@ func (c *timeoutsCron) Run(ctx context.Context, rt *runtime.Runtime) (map[string
}
defer rows.Close()

taskID := func(t *Timeout) string { return fmt.Sprintf("%d:%s", t.SessionID, t.TimeoutOn.Format(time.RFC3339)) }

// scan and organize by org
byOrg := make(map[models.OrgID][]*Timeout, 50)

rc := rt.RP.Get()
defer rc.Close()

numQueued, numDupes := 0, 0

// add a timeout task for each run
timeout := &Timeout{}
for rows.Next() {
err := rows.StructScan(timeout)
if err != nil {
timeout := &Timeout{}
if err := rows.StructScan(timeout); err != nil {
return nil, fmt.Errorf("error scanning timeout: %w", err)
}

// check whether we've already queued this
taskID := fmt.Sprintf("%d:%s", timeout.SessionID, timeout.TimeoutOn.Format(time.RFC3339))
queued, err := c.marker.IsMember(rc, taskID)
queued, err := c.marker.IsMember(rc, taskID(timeout))
if err != nil {
return nil, fmt.Errorf("error checking whether task is queued: %w", err)
}
Expand All @@ -71,19 +72,26 @@ func (c *timeoutsCron) Run(ctx context.Context, rt *runtime.Runtime) (map[string
continue
}

// ok, queue this task
err = handler.QueueTask(rc, timeout.OrgID, timeout.ContactID, ctasks.NewWaitTimeout(timeout.SessionID, timeout.TimeoutOn))
if err != nil {
return nil, fmt.Errorf("error adding new handle task: %w", err)
}
byOrg[timeout.OrgID] = append(byOrg[timeout.OrgID], timeout)
}

// and mark it as queued
err = c.marker.Add(rc, taskID)
if err != nil {
return nil, fmt.Errorf("error marking timeout task as queued: %w", err)
for orgID, timeouts := range byOrg {
for batch := range slices.Chunk(timeouts, 100) {
if err := tasks.Queue(rc, tasks.ThrottledQueue, orgID, &BulkTimeoutTask{Timeouts: batch}, true); err != nil {
return nil, fmt.Errorf("error queuing bulk timeout task: %w", err)
}

numQueued += len(batch)

// mark all as queued
for _, timeout := range batch {
if err = c.marker.Add(rc, taskID(timeout)); err != nil {
return nil, fmt.Errorf("error marking timeout task as queued: %w", err)
}
}
}

numQueued++
// TODO queue directly for smaller sets
}

return map[string]any{"dupes": numDupes, "queued": numQueued}, nil
Expand Down

0 comments on commit 784b252

Please sign in to comment.