Skip to content

Commit

Permalink
fix: send message to DLQ after correct number of attempts
Browse files Browse the repository at this point in the history
Closes: #263
  • Loading branch information
Thomasvdam authored and Admiral-Piett committed Nov 11, 2024
1 parent 885f4dd commit b7de554
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 8 deletions.
3 changes: 1 addition & 2 deletions app/gosqs/change_message_visibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,9 @@ func ChangeMessageVisibilityV1(req *http.Request) (int, interfaces.AbstractRespo
msgs[i].Retry++
if queue.MaxReceiveCount > 0 &&
queue.DeadLetterQueue != nil &&
msgs[i].Retry > queue.MaxReceiveCount {
msgs[i].Retry >= queue.MaxReceiveCount {
queue.DeadLetterQueue.Messages = append(queue.DeadLetterQueue.Messages, msgs[i])
queue.Messages = append(queue.Messages[:i], queue.Messages[i+1:]...)
i++
}
} else {
msgs[i].VisibilityTimeout = time.Now().Add(time.Duration(visibilityTimeout) * time.Second)
Expand Down
4 changes: 2 additions & 2 deletions app/gosqs/gosqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ func PeriodicTasks(d time.Duration, quit <-chan struct{}) {
msg.Retry++
if queue.MaxReceiveCount > 0 &&
queue.DeadLetterQueue != nil &&
msg.Retry > queue.MaxReceiveCount {
msg.Retry >= queue.MaxReceiveCount {
queue.DeadLetterQueue.Messages = append(queue.DeadLetterQueue.Messages, *msg)
queue.Messages = append(queue.Messages[:i], queue.Messages[i+1:]...)
i++
i--
}
}
}
Expand Down
100 changes: 96 additions & 4 deletions app/gosqs/gosqs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func TestDeadLetterQueue(t *testing.T) {
form.Add("Attribute.1.Name", "VisibilityTimeout")
form.Add("Attribute.1.Value", "1")
form.Add("Attribute.2.Name", "RedrivePolicy")
form.Add("Attribute.2.Value", `{"maxReceiveCount": 1, "deadLetterTargetArn":"arn:aws:sqs::000000000000:failed-messages"}`)
form.Add("Attribute.2.Value", `{"maxReceiveCount": 2, "deadLetterTargetArn":"arn:aws:sqs::000000000000:failed-messages"}`)
form.Add("Version", "2012-11-05")
req.PostForm = form

Expand All @@ -267,6 +267,9 @@ func TestDeadLetterQueue(t *testing.T) {
t.Errorf("handler returned wrong status code: got \n%v want %v",
status, http.StatusOK)
}
if len(models.SyncQueues.Queues["testing-deadletter"].Messages) != 1 {
t.Fatal("expected a message in testing-deadletter")
}

// receive message
req, err = http.NewRequest("POST", "/", nil)
Expand All @@ -285,6 +288,13 @@ func TestDeadLetterQueue(t *testing.T) {

time.Sleep(2 * time.Second)

if len(models.SyncQueues.Queues["testing-deadletter"].Messages) != 1 {
t.Fatal("expected message in testing-deadletter after 1 receive attempt")
}
if len(deadLetterQueue.Messages) > 0 {
t.Fatal("expected no message in DLQ")
}

// receive the message one more time
req, err = http.NewRequest("POST", "/", nil)
if err != nil {
Expand All @@ -295,20 +305,102 @@ func TestDeadLetterQueue(t *testing.T) {

status, _ = ReceiveMessageV1(req)
assert.Equal(t, status, http.StatusOK)

// wait for messages to be moved to DLQ and stop the periodic tasks to prevent data races.
time.Sleep(2 * time.Second)
done <- struct{}{}

if len(models.SyncQueues.Queues["testing-deadletter"].Messages) != 0 {
t.Fatal("expected no message in testing-deadletter")
}
if len(deadLetterQueue.Messages) == 0 {
t.Fatal("expected a message in DLQ")
}
}

func TestDeadLetterQueueMultiple(t *testing.T) {
done := make(chan struct{}, 0)
go PeriodicTasks(1*time.Second, done)

// create a queue
req, err := http.NewRequest("POST", "/", nil)
if err != nil {
t.Fatal(err)
}
deadLetterQueue := &models.Queue{
Name: "failed-messages-multiple",
Messages: []models.SqsMessage{},
}
models.SyncQueues.Lock()
models.SyncQueues.Queues["failed-messages-multiple"] = deadLetterQueue
models.SyncQueues.Unlock()
form := url.Values{}
form.Add("Action", "CreateQueue")
form.Add("QueueName", "testing-deadletter-multiple")
form.Add("Attribute.1.Name", "VisibilityTimeout")
form.Add("Attribute.1.Value", "1")
form.Add("Attribute.2.Name", "RedrivePolicy")
form.Add("Attribute.2.Value", `{"maxReceiveCount": 1, "deadLetterTargetArn":"arn:aws:sqs::000000000000:failed-messages-multiple"}`)
form.Add("Version", "2012-11-05")
req.PostForm = form

// another receive attempt
status, _ := CreateQueueV1(req)
assert.Equal(t, status, http.StatusOK)

// send 2 messages
req, err = http.NewRequest("POST", "/", nil)
if err != nil {
t.Fatal(err)
}

form = url.Values{}
form.Add("Action", "SendMessage")
form.Add("QueueUrl", "http://localhost:4100/queue/testing-deadletter-multiple")
form.Add("MessageBody", "1")
form.Add("Version", "2012-11-05")
req.PostForm = form

status, _ = SendMessageV1(req)
if status != http.StatusOK {
t.Errorf("handler returned wrong status code: got \n%v want %v",
status, http.StatusOK)
}
status, _ = SendMessageV1(req)
if status != http.StatusOK {
t.Errorf("handler returned wrong status code: got \n%v want %v",
status, http.StatusOK)
}

if len(models.SyncQueues.Queues["testing-deadletter-multiple"].Messages) != 2 {
t.Fatal("expected 2 messages in testing-deadletter-multiple")
}

// receive messages
req, err = http.NewRequest("POST", "/", nil)
if err != nil {
t.Fatal(err)
}

form = url.Values{}
form.Add("Action", "ReceiveMessage")
form.Add("QueueUrl", "http://localhost:4100/queue/testing-deadletter-multiple")
form.Add("MaxNumberOfMessages", "2")
form.Add("Version", "2012-11-05")
req.PostForm = form

status, _ = ReceiveMessageV1(req)
assert.Equal(t, status, http.StatusOK)
if len(deadLetterQueue.Messages) == 0 {
t.Fatal("expected a message")

// wait for messages to be moved to DLQ and stop the periodic tasks to prevent data races.
time.Sleep(3 * time.Second)
done <- struct{}{}

numMessages := len(models.SyncQueues.Queues["testing-deadletter-multiple"].Messages)
if numMessages != 0 {
t.Fatalf("expected no messages in testing-deadletter-multiple, found: %d", numMessages)
}
if len(deadLetterQueue.Messages) != 2 {
t.Fatal("expected 2 messages in DLQ")
}
}

Expand Down

0 comments on commit b7de554

Please sign in to comment.