Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support of passing attributes in publish/consume #226

Merged
merged 3 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 0 additions & 16 deletions docker/docker-compose.yml

This file was deleted.

48 changes: 26 additions & 22 deletions engine/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,31 +37,33 @@ type jobImpl struct {
// NOTE: there is a trick in this factory, the delay is embedded in the jobID.
// By doing this we can delete the job that's located in hourly AOF, by placing
// a tombstone record in that AOF.
func NewJob(namespace, queue string, body []byte, ttl, delay uint32, tries uint16, jobID string) Job {
func NewJob(namespace, queue string, body []byte, attributes map[string]string, ttl, delay uint32, tries uint16, jobID string) Job {
if jobID == "" {
jobID = uuid.GenJobIDWithVersion(0, delay)
}
return &jobImpl{
namespace: namespace,
queue: queue,
id: jobID,
body: body,
ttl: ttl,
delay: delay,
tries: tries,
namespace: namespace,
queue: queue,
id: jobID,
body: body,
ttl: ttl,
delay: delay,
tries: tries,
attributes: attributes,
}
}

func NewJobWithID(namespace, queue string, body []byte, ttl uint32, tries uint16, jobID string) Job {
func NewJobWithID(namespace, queue string, body []byte, attributes map[string]string, ttl uint32, tries uint16, jobID string) Job {
delay, _ := uuid.ExtractDelaySecondFromUniqueID(jobID)
return &jobImpl{
namespace: namespace,
queue: queue,
id: jobID,
body: body,
ttl: ttl,
delay: delay,
tries: tries,
namespace: namespace,
queue: queue,
id: jobID,
body: body,
ttl: ttl,
delay: delay,
tries: tries,
attributes: attributes,
}
}

Expand Down Expand Up @@ -108,19 +110,21 @@ func (j *jobImpl) Attributes() map[string]string {

func (j *jobImpl) MarshalText() (text []byte, err error) {
var job struct {
Namespace string `json:"namespace"`
Queue string `json:"queue"`
ID string `json:"id"`
TTL uint32 `json:"ttl"`
ElapsedMS int64 `json:"elapsed_ms"`
Body []byte `json:"body"`
Namespace string `json:"namespace"`
Queue string `json:"queue"`
ID string `json:"id"`
TTL uint32 `json:"ttl"`
ElapsedMS int64 `json:"elapsed_ms"`
Body []byte `json:"body"`
Attributes map[string]string `json:"attributes,omitempty"`
}
job.Namespace = j.namespace
job.Queue = j.queue
job.ID = j.id
job.TTL = j.ttl
job.ElapsedMS = j._elapsedMS
job.Body = j.body
job.Attributes = j.attributes
return json.Marshal(job)
}

Expand Down
30 changes: 15 additions & 15 deletions engine/migration/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ import (
func TestEngine_Publish(t *testing.T) {
e := NewEngine(OldRedisEngine, NewRedisEngine)
body := []byte("hello msg 1")
j := engine.NewJob("ns-engine", "q1", body, 10, 2, 1, "")
j := engine.NewJob("ns-engine", "q1", body, nil, 10, 2, 1, "")
jobID, err := e.Publish(j)
t.Log(jobID)
if err != nil {
t.Fatalf("Failed to publish: %s", err)
}

// Publish no-delay job
j = engine.NewJob("ns-engine", "q1", body, 10, 0, 1, "")
j = engine.NewJob("ns-engine", "q1", body, nil, 10, 0, 1, "")
jobID, err = e.Publish(j)
t.Log(jobID)
if err != nil {
Expand All @@ -37,7 +37,7 @@ func TestEngine_Publish(t *testing.T) {
func TestEngine_Consume(t *testing.T) {
e := NewEngine(OldRedisEngine, NewRedisEngine)
body := []byte("hello msg 2")
j := engine.NewJob("ns-engine", "q2", body, 10, 2, 1, "")
j := engine.NewJob("ns-engine", "q2", body, nil, 10, 2, 1, "")
jobID, err := e.Publish(j)
t.Log(jobID)
if err != nil {
Expand All @@ -53,7 +53,7 @@ func TestEngine_Consume(t *testing.T) {
}

// Consume job that's published in no-delay way
j = engine.NewJob("ns-engine", "q2", body, 10, 0, 1, "")
j = engine.NewJob("ns-engine", "q2", body, nil, 10, 0, 1, "")
jobID, err = e.Publish(j)
t.Log(jobID)
if err != nil {
Expand All @@ -72,9 +72,9 @@ func TestEngine_Consume(t *testing.T) {
func TestEngine_Consume2(t *testing.T) {
e := NewEngine(OldRedisEngine, NewRedisEngine)
body := []byte("hello msg 3")
j1 := engine.NewJob("ns-engine", "q3", []byte("delay msg"), 10, 5, 1, "")
j1 := engine.NewJob("ns-engine", "q3", []byte("delay msg"), nil, 10, 5, 1, "")
_, err := e.Publish(j1)
j2 := engine.NewJob("ns-engine", "q3", body, 10, 2, 1, "")
j2 := engine.NewJob("ns-engine", "q3", body, nil, 10, 2, 1, "")
jobID, err := e.Publish(j2)
if err != nil {
t.Fatalf("Failed to publish: %s", err)
Expand All @@ -91,12 +91,12 @@ func TestEngine_Consume2(t *testing.T) {
func TestEngine_ConsumeMulti(t *testing.T) {
e := NewEngine(OldRedisEngine, NewRedisEngine)
body := []byte("hello msg 4")
j1 := engine.NewJob("ns-engine", "q4", body, 10, 3, 1, "")
j1 := engine.NewJob("ns-engine", "q4", body, nil, 10, 3, 1, "")
jobID, err := e.Publish(j1)
if err != nil {
t.Fatalf("Failed to publish: %s", err)
}
j2 := engine.NewJob("ns-engine", "q5", body, 10, 1, 1, "")
j2 := engine.NewJob("ns-engine", "q5", body, nil, 10, 1, 1, "")
jobID2, err := e.Publish(j2)
if err != nil {
t.Fatalf("Failed to publish: %s", err)
Expand All @@ -122,7 +122,7 @@ func TestEngine_ConsumeMulti(t *testing.T) {
func TestEngine_Peek(t *testing.T) {
e := NewEngine(OldRedisEngine, NewRedisEngine)
body := []byte("hello msg 6")
j := engine.NewJob("ns-engine", "q6", body, 10, 0, 1, "")
j := engine.NewJob("ns-engine", "q6", body, nil, 10, 0, 1, "")
jobID, err := e.Publish(j)
if err != nil {
t.Fatalf("Failed to publish: %s", err)
Expand All @@ -136,7 +136,7 @@ func TestEngine_Peek(t *testing.T) {
func TestEngine_DrainOld(t *testing.T) {
e := NewEngine(OldRedisEngine, NewRedisEngine)
body := []byte("hello msg 7")
j := engine.NewJob("ns-engine", "q7", body, 10, 0, 1, "")
j := engine.NewJob("ns-engine", "q7", body, nil, 10, 0, 1, "")
jobID, err := OldRedisEngine.Publish(j)
job, err := e.Consume("ns-engine", []string{"q7"}, 5, 0)
if err != nil {
Expand All @@ -150,7 +150,7 @@ func TestEngine_DrainOld(t *testing.T) {
func TestEngine_BatchConsume(t *testing.T) {
e := NewEngine(OldRedisEngine, NewRedisEngine)
body := []byte("hello msg 8")
j := engine.NewJob("ns-engine", "q8", body, 10, 2, 1, "")
j := engine.NewJob("ns-engine", "q8", body, nil, 10, 2, 1, "")
jobID, err := e.Publish(j)
if err != nil {
t.Fatalf("Failed to publish: %s", err)
Expand All @@ -176,7 +176,7 @@ func TestEngine_BatchConsume(t *testing.T) {
// Consume some jobs
jobIDMap := map[string]bool{}
for i := 0; i < 4; i++ {
j := engine.NewJob("ns-engine", "q8", body, 10, 0, 1, "")
j := engine.NewJob("ns-engine", "q8", body, nil, 10, 0, 1, "")
jobID, err := e.Publish(j)
t.Log(jobID)
if err != nil {
Expand Down Expand Up @@ -223,7 +223,7 @@ func TestEngine_BatchConsume(t *testing.T) {
func TestEngine_DeadLetter_Size(t *testing.T) {
body := []byte("hello msg 9")
queues := []string{"q9"}
j := engine.NewJob("ns-engine", "q9", body, 10, 0, 1, "")
j := engine.NewJob("ns-engine", "q9", body, nil, 10, 0, 1, "")
jobID, err := OldRedisEngine.Publish(j)
job, err := OldRedisEngine.Consume("ns-engine", queues, 0, 0)
if err != nil {
Expand All @@ -232,7 +232,7 @@ func TestEngine_DeadLetter_Size(t *testing.T) {
if job.ID() != jobID {
t.Fatal("Mismatched job")
}
j = engine.NewJob("ns-engine", "q9", body, 10, 0, 1, "")
j = engine.NewJob("ns-engine", "q9", body, nil, 10, 0, 1, "")
jobID, err = NewRedisEngine.Publish(j)
job, err = NewRedisEngine.Consume("ns-engine", queues, 0, 0)
if job.ID() != jobID {
Expand All @@ -250,7 +250,7 @@ func TestEngine_PublishWithJobID(t *testing.T) {
e := NewEngine(OldRedisEngine, NewRedisEngine)
body := []byte("hello msg 1")
// Publish no-delay job
j := engine.NewJob("ns-engine", "q10", body, 10, 0, 1, "jobID1")
j := engine.NewJob("ns-engine", "q10", body, nil, 10, 0, 1, "jobID1")
jobID, err := e.Publish(j)
t.Log(jobID)
assert.Nil(t, err)
Expand Down
8 changes: 4 additions & 4 deletions engine/redis/deadletter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ func TestDeadLetter_Delete(t *testing.T) {

func TestDeadLetter_Respawn(t *testing.T) {
p := NewPool(R)
job1 := engine.NewJob("ns-dead", "q3", []byte("1"), 60, 0, 1, "")
job2 := engine.NewJob("ns-dead", "q3", []byte("2"), 60, 0, 1, "")
job3 := engine.NewJob("ns-dead", "q3", []byte("3"), 60, 0, 1, "")
job1 := engine.NewJob("ns-dead", "q3", []byte("1"), nil, 60, 0, 1, "")
job2 := engine.NewJob("ns-dead", "q3", []byte("2"), nil, 60, 0, 1, "")
job3 := engine.NewJob("ns-dead", "q3", []byte("3"), nil, 60, 0, 1, "")
p.Add(job1)
p.Add(job2)
p.Add(job3)
Expand Down Expand Up @@ -120,7 +120,7 @@ func TestDeadLetter_Size(t *testing.T) {
dl, _ := NewDeadLetter("ns-dead", "q3", R)
cnt := 3
for i := 0; i < cnt; i++ {
job := engine.NewJob("ns-dead", "q3", []byte("1"), 60, 0, 1, "")
job := engine.NewJob("ns-dead", "q3", []byte("1"), nil, 60, 0, 1, "")
p.Add(job)
dl.Add(job.ID())
}
Expand Down
12 changes: 6 additions & 6 deletions engine/redis/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (e *Engine) consumeMulti(namespace string, queues []string, ttrSecond, time
return nil, nil
}
endTime := time.Now().Unix()
body, ttl, err := e.pool.Get(namespace, queueName.Queue, jobID)
payload, ttl, err := e.pool.Get(namespace, queueName.Queue, jobID)
switch err {
case nil:
// no-op
Expand All @@ -177,7 +177,7 @@ func (e *Engine) consumeMulti(namespace string, queues []string, ttrSecond, time
default:
return nil, fmt.Errorf("pool: %s", err)
}
job = engine.NewJobWithID(namespace, queueName.Queue, body, ttl, tries, jobID)
job = engine.NewJobWithID(namespace, queueName.Queue, payload.Body, payload.Attributes, ttl, tries, jobID)
metrics.jobElapsedMS.WithLabelValues(e.redis.Name, namespace, queueName.Queue).Observe(float64(job.ElapsedMS()))
return job, nil
}
Expand Down Expand Up @@ -207,19 +207,19 @@ func (e *Engine) Peek(namespace, queue, optionalJobID string) (job engine.Job, e
return nil, fmt.Errorf("failed to peek queue: %s", err)
}
}
body, ttl, err := e.pool.Get(namespace, queue, jobID)
payload, ttl, err := e.pool.Get(namespace, queue, jobID)
// Tricky: we shouldn't return the not found error when the job was not found,
// since the job may expired(TTL was reached) and it would confuse the user, so
// we return the nil job instead of the not found error here. But if the `optionalJobID`
// was assigned we should return the not fond error.
if optionalJobID == "" && err == engine.ErrNotFound {
if optionalJobID == "" && errors.Is(err, engine.ErrNotFound) {
// return jobID with nil body if the job is expired
return engine.NewJobWithID(namespace, queue, nil, 0, 0, jobID), nil
return engine.NewJobWithID(namespace, queue, nil, nil, 0, 0, jobID), nil
}
if err != nil {
return nil, err
}
return engine.NewJobWithID(namespace, queue, body, ttl, tries, jobID), err
return engine.NewJobWithID(namespace, queue, payload.Body, payload.Attributes, ttl, tries, jobID), err
}

func (e *Engine) Size(namespace, queue string) (size int64, err error) {
Expand Down
24 changes: 12 additions & 12 deletions engine/redis/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ func TestEngine_Publish(t *testing.T) {
}
defer e.Shutdown()
body := []byte("hello msg 1")
j := engine.NewJob("ns-engine", "q0", body, 10, 2, 1, "")
j := engine.NewJob("ns-engine", "q0", body, nil, 10, 2, 1, "")
jobID, err := e.Publish(j)
t.Log(jobID)
if err != nil {
t.Fatalf("Failed to publish: %s", err)
}

// Publish no-delay job
j = engine.NewJob("ns-engine", "q0", body, 10, 0, 1, "")
j = engine.NewJob("ns-engine", "q0", body, nil, 10, 0, 1, "")
jobID, err = e.Publish(j)
t.Log(jobID)
if err != nil {
Expand All @@ -40,7 +40,7 @@ func TestEngine_Consume(t *testing.T) {
}
defer e.Shutdown()
body := []byte("hello msg 2")
j := engine.NewJob("ns-engine", "q2", body, 10, 2, 1, "")
j := engine.NewJob("ns-engine", "q2", body, nil, 10, 2, 1, "")
jobID, err := e.Publish(j)
t.Log(jobID)
if err != nil {
Expand All @@ -58,7 +58,7 @@ func TestEngine_Consume(t *testing.T) {
}

// Consume job that's published in no-delay way
j = engine.NewJob("ns-engine", "q2", body, 10, 0, 1, "")
j = engine.NewJob("ns-engine", "q2", body, nil, 10, 0, 1, "")
jobID, err = e.Publish(j)
t.Log(jobID)
if err != nil {
Expand All @@ -81,9 +81,9 @@ func TestEngine_Consume2(t *testing.T) {
}
defer e.Shutdown()
body := []byte("hello msg 3")
j := engine.NewJob("ns-engine", "q3", []byte("delay msg"), 10, 5, 1, "")
j := engine.NewJob("ns-engine", "q3", []byte("delay msg"), nil, 10, 5, 1, "")
_, err = e.Publish(j)
j = engine.NewJob("ns-engine", "q3", body, 10, 2, 1, "")
j = engine.NewJob("ns-engine", "q3", body, nil, 10, 2, 1, "")
jobID, err := e.Publish(j)
if err != nil {
t.Fatalf("Failed to publish: %s", err)
Expand All @@ -107,12 +107,12 @@ func TestEngine_ConsumeMulti(t *testing.T) {
}
defer e.Shutdown()
body := []byte("hello msg 4")
j := engine.NewJob("ns-engine", "q4", body, 10, 3, 1, "")
j := engine.NewJob("ns-engine", "q4", body, nil, 10, 3, 1, "")
jobID, err := e.Publish(j)
if err != nil {
t.Fatalf("Failed to publish: %s", err)
}
j2 := engine.NewJob("ns-engine", "q5", body, 10, 1, 1, "")
j2 := engine.NewJob("ns-engine", "q5", body, nil, 10, 1, 1, "")
jobID2, err := e.Publish(j2)
if err != nil {
t.Fatalf("Failed to publish: %s", err)
Expand Down Expand Up @@ -148,7 +148,7 @@ func TestEngine_Peek(t *testing.T) {
}
defer e.Shutdown()
body := []byte("hello msg 6")
j := engine.NewJob("ns-engine", "q6", body, 10, 0, 1, "")
j := engine.NewJob("ns-engine", "q6", body, nil, 10, 0, 1, "")
jobID, err := e.Publish(j)
if err != nil {
t.Fatalf("Failed to publish: %s", err)
Expand All @@ -173,7 +173,7 @@ func TestEngine_BatchConsume(t *testing.T) {
}
defer e.Shutdown()
body := []byte("hello msg 7")
j := engine.NewJob("ns-engine", "q7", body, 10, 3, 1, "")
j := engine.NewJob("ns-engine", "q7", body, nil, 10, 3, 1, "")
jobID, err := e.Publish(j)
t.Log(jobID)
if err != nil {
Expand All @@ -199,7 +199,7 @@ func TestEngine_BatchConsume(t *testing.T) {
// Consume some jobs
jobIDMap := map[string]bool{}
for i := 0; i < 4; i++ {
j := engine.NewJob("ns-engine", "q7", body, 10, 0, 1, "")
j := engine.NewJob("ns-engine", "q7", body, nil, 10, 0, 1, "")
jobID, err := e.Publish(j)
t.Log(jobID)
if err != nil {
Expand Down Expand Up @@ -251,7 +251,7 @@ func TestEngine_PublishWithJobID(t *testing.T) {
}
defer e.Shutdown()
body := []byte("hello msg 1")
j := engine.NewJob("ns-engine", "q8", body, 10, 0, 1, "jobID1")
j := engine.NewJob("ns-engine", "q8", body, nil, 10, 0, 1, "jobID1")
jobID, err := e.Publish(j)
t.Log(jobID)
assert.Nil(t, err)
Expand Down
Loading
Loading