Skip to content

Commit

Permalink
Fix PeriodicTasks tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dhumphreys01 authored and Admiral-Piett committed Dec 17, 2024
1 parent b7de554 commit f4e5ccd
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 382 deletions.
2 changes: 1 addition & 1 deletion app/cmd/goaws.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func main() {

r := router.New()

quit := make(chan struct{}, 0)
quit := make(chan bool, 0)
go gosqs.PeriodicTasks(1*time.Second, quit)

if len(portNumbers) == 1 {
Expand Down
4 changes: 2 additions & 2 deletions app/conf/mock-data/mock-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ BaseUnitTests:
Queues:
- Name: unit-queue1
- Name: unit-queue2
RedrivePolicy: '{"maxReceiveCount": 100, "deadLetterTargetArn":"arn:aws:sqs:us-east-1:100010001000:other-queue1"}'
- Name: other-queue1
RedrivePolicy: '{"maxReceiveCount": 1, "deadLetterTargetArn":"arn:aws:sqs:us-east-1:100010001000:dead-letter-queue1"}'
- Name: dead-letter-queue1
- Name: subscribed-queue1
- Name: subscribed-queue3
Topics:
Expand Down
29 changes: 28 additions & 1 deletion app/gosqs/change_message_visibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/stretchr/testify/assert"
)

func TestChangeMessageVisibility_POST_SUCCESS(t *testing.T) {
func TestChangeMessageVisibility_success(t *testing.T) {
// create a queue
models.CurrentEnvironment = fixtures.LOCAL_ENVIRONMENT
defer func() {
Expand Down Expand Up @@ -45,4 +45,31 @@ func TestChangeMessageVisibility_POST_SUCCESS(t *testing.T) {
// the time being set, we can't reliably assert an exact value. So assert
// that the time.Time value is no longer the default zero value.
assert.NotZero(t, q.Messages[0].VisibilityTimeout)
assert.NotZero(t, q.Messages[0].ReceiptTime)
assert.Equal(t, "", q.Messages[0].ReceiptHandle)
assert.Equal(t, 1, q.Messages[0].Retry)
}

func TestChangeMessageVisibility_success_adds_to_existing_visibility_timeout(t *testing.T) {
// TODO
}

func TestChangeMessageVisibility_success_transfers_to_dead_letter_queue(t *testing.T) {
// TODO
}

func TestChangeMessageVisibility_request_transformer_error(t *testing.T) {
// TODO
}

func TestChangeMessageVisibility_visibility_timeout_too_large(t *testing.T) {
// TODO
}

func TestChangeMessageVisibility_missing_queue(t *testing.T) {
// TODO
}

func TestChangeMessageVisibility_missing_message(t *testing.T) {
// TODO - mismatch receipt handle
}
2 changes: 1 addition & 1 deletion app/gosqs/get_queue_attributes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestGetQueueAttributesV1_success_all_with_redrive_queue(t *testing.T) {
expectedResponse.Result.Attrs = append(expectedResponse.Result.Attrs,
models.Attribute{
Name: "RedrivePolicy",
Value: fmt.Sprintf(`{"maxReceiveCount":"100", "deadLetterTargetArn":"%s:%s"}`, fixtures.BASE_SQS_ARN, "other-queue1"),
Value: fmt.Sprintf(`{"maxReceiveCount":"1", "deadLetterTargetArn":"%s:%s"}`, fixtures.BASE_SQS_ARN, "dead-letter-queue1"),
},
)

Expand Down
22 changes: 11 additions & 11 deletions app/gosqs/gosqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,27 @@ func init() {
models.SyncQueues.Queues = make(map[string]*models.Queue)
}

func PeriodicTasks(d time.Duration, quit <-chan struct{}) {
func PeriodicTasks(d time.Duration, quit chan bool) {
ticker := time.NewTicker(d)
for {
select {
case <-ticker.C:
models.SyncQueues.Lock()
for j := range models.SyncQueues.Queues {
queue := models.SyncQueues.Queues[j]
for qName := range models.SyncQueues.Queues {
queue := models.SyncQueues.Queues[qName]

// Reset deduplication period
for dedupId, startTime := range queue.Duplicates {
if time.Now().After(startTime.Add(models.DeduplicationPeriod)) {
log.Debugf("deduplication period for message with deduplicationId [%s] expired", dedupId)
delete(queue.Duplicates, dedupId)
}
}

log.Debugf("Queue [%s] length [%d]", queue.Name, len(queue.Messages))
for i := 0; i < len(queue.Messages); i++ {
msg := &queue.Messages[i]

// Reset deduplication period
for dedupId, startTime := range queue.Duplicates {
if time.Now().After(startTime.Add(models.DeduplicationPeriod)) {
log.Debugf("deduplication period for message with deduplicationId [%s] expired", dedupId)
delete(queue.Duplicates, dedupId)
}
}

if msg.ReceiptHandle != "" {
if msg.VisibilityTimeout.Before(time.Now()) {
log.Debugf("Making message visible again %s", msg.ReceiptHandle)
Expand Down
Loading

0 comments on commit f4e5ccd

Please sign in to comment.