From a8006fbc5fba0ee3092db8fdd8291eafff54ecc8 Mon Sep 17 00:00:00 2001 From: Devin Humphreys Date: Tue, 18 Jun 2024 14:08:09 -0400 Subject: [PATCH] Add DeleteQueueV1 for JSON support --- app/gosqs/delete_queue.go | 38 ++++++++++ app/gosqs/delete_queue_test.go | 1 + app/gosqs/get_queue_url.go | 1 - app/gosqs/gosqs.go | 29 -------- app/gosqs/receive_message.go | 3 + app/gosqs/receive_message_test.go | 60 ---------------- app/models/models.go | 10 +++ app/models/responses.go | 14 ++++ app/router/router.go | 2 +- app/router/router_test.go | 2 +- app/sqs_messages.go | 21 ------ smoke_tests/sqs_delete_queue_test.go | 92 +++++++++++++++++++++++++ smoke_tests/sqs_receive_message_test.go | 42 ++++++++++- 13 files changed, 199 insertions(+), 116 deletions(-) create mode 100644 app/gosqs/delete_queue.go create mode 100644 app/gosqs/delete_queue_test.go create mode 100644 smoke_tests/sqs_delete_queue_test.go diff --git a/app/gosqs/delete_queue.go b/app/gosqs/delete_queue.go new file mode 100644 index 00000000..4d1d9194 --- /dev/null +++ b/app/gosqs/delete_queue.go @@ -0,0 +1,38 @@ +package gosqs + +import ( + "net/http" + "strings" + + "github.com/Admiral-Piett/goaws/app/interfaces" + + "github.com/Admiral-Piett/goaws/app/models" + "github.com/Admiral-Piett/goaws/app/utils" + + "github.com/Admiral-Piett/goaws/app" + log "github.com/sirupsen/logrus" +) + +func DeleteQueueV1(req *http.Request) (int, interfaces.AbstractResponseBody) { + requestBody := models.NewDeleteQueueRequest() + ok := utils.REQUEST_TRANSFORMER(requestBody, req, false) + if !ok { + log.Error("Invalid Request - DeleteQueueV1") + return createErrorResponseV1(ErrInvalidParameterValue.Type) + } + + uriSegments := strings.Split(requestBody.QueueUrl, "/") + queueName := uriSegments[len(uriSegments)-1] + + log.Infof("Deleting Queue: %s", queueName) + + app.SyncQueues.Lock() + delete(app.SyncQueues.Queues, queueName) + app.SyncQueues.Unlock() + + respStruct := models.DeleteQueueResponse{ + Xmlns: models.BASE_XMLNS, + Metadata: models.BASE_RESPONSE_METADATA, + } + return http.StatusOK, respStruct +} diff --git a/app/gosqs/delete_queue_test.go b/app/gosqs/delete_queue_test.go new file mode 100644 index 00000000..c97f8a3e --- /dev/null +++ b/app/gosqs/delete_queue_test.go @@ -0,0 +1 @@ +package gosqs diff --git a/app/gosqs/get_queue_url.go b/app/gosqs/get_queue_url.go index 9c467f49..109bb51c 100644 --- a/app/gosqs/get_queue_url.go +++ b/app/gosqs/get_queue_url.go @@ -11,7 +11,6 @@ import ( ) func GetQueueUrlV1(req *http.Request) (int, interfaces.AbstractResponseBody) { - requestBody := models.NewGetQueueUrlRequest() ok := utils.REQUEST_TRANSFORMER(requestBody, req, false) if !ok { diff --git a/app/gosqs/gosqs.go b/app/gosqs/gosqs.go index 0fc32ece..7c463c8e 100644 --- a/app/gosqs/gosqs.go +++ b/app/gosqs/gosqs.go @@ -333,35 +333,6 @@ func DeleteMessageBatch(w http.ResponseWriter, req *http.Request) { } } -func DeleteQueue(w http.ResponseWriter, req *http.Request) { - // Sent response type - w.Header().Set("Content-Type", "application/xml") - - // Retrieve FormValues required - queueUrl := getQueueFromPath(req.FormValue("QueueUrl"), req.URL.String()) - queueName := "" - if queueUrl == "" { - vars := mux.Vars(req) - queueName = vars["queueName"] - } else { - uriSegments := strings.Split(queueUrl, "/") - queueName = uriSegments[len(uriSegments)-1] - } - - log.Println("Deleting Queue:", queueName) - app.SyncQueues.Lock() - delete(app.SyncQueues.Queues, queueName) - app.SyncQueues.Unlock() - - // Create, encode/xml and send response - respStruct := app.DeleteQueueResponse{"http://queue.amazonaws.com/doc/2012-11-05/", app.ResponseMetadata{RequestId: "00000000-0000-0000-0000-000000000000"}} - enc := xml.NewEncoder(w) - enc.Indent(" ", " ") - if err := enc.Encode(respStruct); err != nil { - log.Printf("error: %v\n", err) - } -} - func getQueueFromPath(formVal string, theUrl string) string { if formVal != "" { return formVal diff --git a/app/gosqs/receive_message.go b/app/gosqs/receive_message.go index 613ded75..2c9f5a71 100644 --- a/app/gosqs/receive_message.go +++ b/app/gosqs/receive_message.go @@ -15,6 +15,9 @@ import ( log "github.com/sirupsen/logrus" ) +// TODO - Admiral-Piett - could we refactor the way we hide messages? Change data structure to a queue +// organized by "reveal time" or a map with the key being a timestamp of when it could be shown? +// Ordered Map - https://github.com/elliotchance/orderedmap func ReceiveMessageV1(req *http.Request) (int, interfaces.AbstractResponseBody) { requestBody := models.NewReceiveMessageRequest() ok := utils.REQUEST_TRANSFORMER(requestBody, req, false) diff --git a/app/gosqs/receive_message_test.go b/app/gosqs/receive_message_test.go index ea43c57e..2f301d7c 100644 --- a/app/gosqs/receive_message_test.go +++ b/app/gosqs/receive_message_test.go @@ -3,8 +3,6 @@ package gosqs import ( "context" "net/http" - "net/http/httptest" - "net/url" "sync" "testing" "time" @@ -136,64 +134,6 @@ func TestReceiveMessage_CanceledByClientV1(t *testing.T) { } } -func TestReceiveMessage_WithConcurrentDeleteQueueV1(t *testing.T) { - // create a queue - app.CurrentEnvironment = fixtures.LOCAL_ENVIRONMENT - defer func() { - utils.ResetApp() - }() - - app.SyncQueues.Queues["waiting-queue"] = &app.Queue{ - Name: "waiting-queue", - ReceiveMessageWaitTimeSeconds: 1, - } - - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - // receive message - _, r := utils.GenerateRequestInfo("POST", "/", models.ReceiveMessageRequest{ - QueueUrl: "http://localhost:4100/queue/waiting-queue", - }, true) - status, resp := ReceiveMessageV1(r) - assert.Equal(t, http.StatusBadRequest, status) - - // Check the response body is what we expect. - expected := "QueueNotFound" - result := resp.GetResult().(models.ErrorResult) - if result.Type != "Not Found" { - t.Errorf("handler returned unexpected body: got %v want %v", - result.Message, expected) - } - }() - - wg.Add(1) - go func() { - defer wg.Done() - time.Sleep(10 * time.Millisecond) // 10ms to let the ReceiveMessage() block - - // delete queue message - form := url.Values{} - form.Add("Action", "DeleteQueue") - form.Add("QueueUrl", "http://localhost:4100/queue/waiting-queue") - form.Add("Version", "2012-11-05") - _, r := utils.GenerateRequestInfo("POST", "/", form, false) - - rr := httptest.NewRecorder() - http.HandlerFunc(DeleteQueue).ServeHTTP(rr, r) - - if status := rr.Code; status != http.StatusOK { - t.Errorf("handler returned wrong status code: got \n%v want %v", - status, http.StatusOK) - } - }() - - if timedout := waitTimeout(&wg, 2*time.Second); timedout { - t.Errorf("concurrent handlers timeout, expecting both to return within timeout") - } -} - func TestReceiveMessageDelaySecondsV1(t *testing.T) { // create a queue app.CurrentEnvironment = fixtures.LOCAL_ENVIRONMENT diff --git a/app/models/models.go b/app/models/models.go index 6efd1501..3116c499 100644 --- a/app/models/models.go +++ b/app/models/models.go @@ -467,3 +467,13 @@ type PurgeQueueRequest struct { } func (r *PurgeQueueRequest) SetAttributesFromForm(values url.Values) {} + +func NewDeleteQueueRequest() *DeleteQueueRequest { + return &DeleteQueueRequest{} +} + +type DeleteQueueRequest struct { + QueueUrl string `json:"QueueUrl" schema:"QueueUrl"` +} + +func (r *DeleteQueueRequest) SetAttributesFromForm(values url.Values) {} diff --git a/app/models/responses.go b/app/models/responses.go index 157f69bf..f223941c 100644 --- a/app/models/responses.go +++ b/app/models/responses.go @@ -269,3 +269,17 @@ func (r PurgeQueueResponse) GetResult() interface{} { func (r PurgeQueueResponse) GetRequestId() string { return r.Metadata.RequestId } + +/*** Delete Queue Response */ +type DeleteQueueResponse struct { + Xmlns string `xml:"xmlns,attr,omitempty"` + Metadata app.ResponseMetadata `xml:"ResponseMetadata,omitempty"` +} + +func (r DeleteQueueResponse) GetResult() interface{} { + return nil +} + +func (r DeleteQueueResponse) GetRequestId() string { + return r.Metadata.RequestId +} diff --git a/app/router/router.go b/app/router/router.go index 049db0ce..70d4a6ca 100644 --- a/app/router/router.go +++ b/app/router/router.go @@ -73,13 +73,13 @@ var routingTableV1 = map[string]func(r *http.Request) (int, interfaces.AbstractR "DeleteMessage": sqs.DeleteMessageV1, "GetQueueUrl": sqs.GetQueueUrlV1, "PurgeQueue": sqs.PurgeQueueV1, + "DeleteQueue": sqs.DeleteQueueV1, } var routingTable = map[string]http.HandlerFunc{ // SQS "SendMessageBatch": sqs.SendMessageBatch, "DeleteMessageBatch": sqs.DeleteMessageBatch, - "DeleteQueue": sqs.DeleteQueue, // SNS "ListTopics": sns.ListTopics, diff --git a/app/router/router_test.go b/app/router/router_test.go index 8f79a2b1..5c1c017b 100644 --- a/app/router/router_test.go +++ b/app/router/router_test.go @@ -270,12 +270,12 @@ func TestActionHandler_v0_xml(t *testing.T) { "ChangeMessageVisibility": sqs.ChangeMessageVisibilityV1, "GetQueueUrl": sqs.GetQueueUrlV1, "PurgeQueue": sqs.PurgeQueueV1, + "DeleteQueue": sqs.DeleteQueueV1, } routingTable = map[string]http.HandlerFunc{ // SQS "SendMessageBatch": sqs.SendMessageBatch, "DeleteMessageBatch": sqs.DeleteMessageBatch, - "DeleteQueue": sqs.DeleteQueue, // SNS "ListTopics": sns.ListTopics, diff --git a/app/sqs_messages.go b/app/sqs_messages.go index 10fb0e01..20da794e 100644 --- a/app/sqs_messages.go +++ b/app/sqs_messages.go @@ -1,21 +1,5 @@ package app -/*** List Queues Response */ -type ListQueuesResult struct { - QueueUrl []string `xml:"QueueUrl"` -} - -type ListQueuesResponse struct { - Xmlns string `xml:"xmlns,attr"` - Result ListQueuesResult `xml:"ListQueuesResult"` - Metadata ResponseMetadata `xml:"ResponseMetadata"` -} - -type DeleteQueueResponse struct { - Xmlns string `xml:"xmlns,attr,omitempty"` - Metadata ResponseMetadata `xml:"ResponseMetadata,omitempty"` -} - type DeleteMessageBatchResultEntry struct { Id string `xml:"Id"` } @@ -58,8 +42,3 @@ type SendMessageBatchResponse struct { Result SendMessageBatchResult `xml:"SendMessageBatchResult"` Metadata ResponseMetadata `xml:"ResponseMetadata,omitempty"` } - -type SetQueueAttributesResponse struct { - Xmlns string `xml:"xmlns,attr,omitempty"` - Metadata ResponseMetadata `xml:"ResponseMetadata,omitempty"` -} diff --git a/smoke_tests/sqs_delete_queue_test.go b/smoke_tests/sqs_delete_queue_test.go new file mode 100644 index 00000000..811d7b1e --- /dev/null +++ b/smoke_tests/sqs_delete_queue_test.go @@ -0,0 +1,92 @@ +package smoke_tests + +import ( + "context" + "fmt" + "net/http" + "testing" + + "github.com/gavv/httpexpect/v2" + + "github.com/aws/aws-sdk-go-v2/service/sqs" + + "github.com/Admiral-Piett/goaws/app/utils" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + + "github.com/Admiral-Piett/goaws/app" + "github.com/stretchr/testify/assert" + + af "github.com/Admiral-Piett/goaws/app/fixtures" +) + +func Test_DeleteQueueV1_json(t *testing.T) { + server := generateServer() + defer func() { + server.Close() + utils.ResetResources() + }() + + sdkConfig, _ := config.LoadDefaultConfig(context.TODO()) + sdkConfig.BaseEndpoint = aws.String(server.URL) + sqsClient := sqs.NewFromConfig(sdkConfig) + + qName := "unit-queue1" + sqsClient.CreateQueue(context.TODO(), &sqs.CreateQueueInput{ + QueueName: &qName, + }) + + qUrl := fmt.Sprintf("%s/%s", af.BASE_URL, qName) + _, err := sqsClient.DeleteQueue(context.TODO(), &sqs.DeleteQueueInput{ + QueueUrl: &qUrl, + }) + + assert.Nil(t, err) + + app.SyncQueues.Lock() + defer app.SyncQueues.Unlock() + + targetQueue, ok := app.SyncQueues.Queues["unit-queue1"] + assert.False(t, ok) + assert.Nil(t, targetQueue) +} + +func Test_DeleteQueueV1_xml(t *testing.T) { + server := generateServer() + defer func() { + server.Close() + utils.ResetResources() + }() + + e := httpexpect.Default(t, server.URL) + + sdkConfig, _ := config.LoadDefaultConfig(context.TODO()) + sdkConfig.BaseEndpoint = aws.String(server.URL) + sqsClient := sqs.NewFromConfig(sdkConfig) + + qName := "unit-queue1" + sqsClient.CreateQueue(context.TODO(), &sqs.CreateQueueInput{ + QueueName: &qName, + }) + + qUrl := fmt.Sprintf("%s/%s", af.BASE_URL, qName) + + e.POST("/"). + WithForm(struct { + Action string `xml:"Action"` + QueueUrl string `xml:"QueueUrl"` + }{ + Action: "DeleteQueue", + QueueUrl: qUrl, + }). + Expect(). + Status(http.StatusOK). + Body().Raw() + + app.SyncQueues.Lock() + defer app.SyncQueues.Unlock() + + targetQueue, ok := app.SyncQueues.Queues["unit-queue1"] + assert.False(t, ok) + assert.Nil(t, targetQueue) +} diff --git a/smoke_tests/sqs_receive_message_test.go b/smoke_tests/sqs_receive_message_test.go index c6253a16..7c798647 100644 --- a/smoke_tests/sqs_receive_message_test.go +++ b/smoke_tests/sqs_receive_message_test.go @@ -5,6 +5,7 @@ import ( "encoding/xml" "fmt" "net/http" + "sync" "testing" af "github.com/Admiral-Piett/goaws/app/fixtures" @@ -47,14 +48,49 @@ func Test_ReceiveMessageV1_json(t *testing.T) { QueueUrl: createQueueResponse.QueueUrl, }) - if err != nil { - t.Fatalf("Error receiving message: %v", err) - } + assert.Nil(t, err) assert.Equal(t, 1, len(receiveMessageResponse.Messages)) assert.Equal(t, sf.SendMessageRequestBodyXML.MessageBody, *receiveMessageResponse.Messages[0].Body) } +func Test_ReceiveMessageV1_json_while_concurrent_delete(t *testing.T) { + server := generateServer() + defer func() { + server.Close() + utils.ResetResources() + }() + + sdkConfig, _ := config.LoadDefaultConfig(context.TODO()) + sdkConfig.BaseEndpoint = aws.String(server.URL) + sqsClient := sqs.NewFromConfig(sdkConfig) + + createQueueResponse, _ := sqsClient.CreateQueue(context.TODO(), &sqs.CreateQueueInput{ + QueueName: &af.QueueName, + Attributes: map[string]string{"ReceiveMessageWaitTimeSeconds": "1"}, + }) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + _, err := sqsClient.ReceiveMessage(context.TODO(), &sqs.ReceiveMessageInput{ + QueueUrl: createQueueResponse.QueueUrl, + }) + assert.Contains(t, err.Error(), "AWS.SimpleQueueService.NonExistentQueue") + }() + + wg.Add(1) + go func() { + defer wg.Done() + _, err := sqsClient.DeleteQueue(context.TODO(), &sqs.DeleteQueueInput{ + QueueUrl: createQueueResponse.QueueUrl, + }) + assert.Nil(t, err) + }() + wg.Wait() +} + func Test_ReceiveMessageV1_xml(t *testing.T) { server := generateServer() defer func() {