Skip to content

Commit

Permalink
bugfix message attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
Dai.Otsuka authored and Dai.Otsuka committed Nov 13, 2024
1 parent f58f9b0 commit 7b3ec1a
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 18 deletions.
35 changes: 26 additions & 9 deletions app/models/requests.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package models

import (
"encoding/base64"
"encoding/json"
"fmt"
"net/url"
Expand All @@ -13,8 +14,6 @@ import (
log "github.com/sirupsen/logrus"
)

var caser = cases.Title(language.AmericanEnglish)

type CreateQueueRequest struct {
QueueName string `json:"QueueName" schema:"QueueName"`
Attributes QueueAttributes `json:"Attributes" schema:"Attribute"`
Expand Down Expand Up @@ -205,12 +204,18 @@ func (r *SendMessageRequest) SetAttributesFromForm(values url.Values) {
}

stringValue := values.Get(fmt.Sprintf("MessageAttribute.%d.Value.StringValue", i))
binaryValue := values.Get(fmt.Sprintf("MessageAttribute.%d.Value.BinaryValue", i))
encodedBinaryValue := values.Get(fmt.Sprintf("MessageAttribute.%d.Value.BinaryValue", i))

binaryValue, err := base64.StdEncoding.DecodeString(encodedBinaryValue)
if err != nil {
log.Warnf("Failed to base64 decode. %s may not have been base64 encoded.", encodedBinaryValue)
continue
}

r.MessageAttributes[name] = MessageAttribute{
DataType: dataType,
StringValue: stringValue,
BinaryValue: []byte(binaryValue),
BinaryValue: binaryValue,
}
}
}
Expand Down Expand Up @@ -257,16 +262,22 @@ func (r *SendMessageBatchRequest) SetAttributesFromForm(values url.Values) {
}

stringValue := values.Get(fmt.Sprintf("Entries.%d.MessageAttributes.%d.Value.StringValue", entryIndex, attributeIndex))
binaryValue := values.Get(fmt.Sprintf("Entries.%d.MessageAttributes.%d.Value.BinaryValue", entryIndex, attributeIndex))
encodedBinaryValue := values.Get(fmt.Sprintf("Entries.%d.MessageAttributes.%d.Value.BinaryValue", entryIndex, attributeIndex))

if r.Entries[entryIndex].MessageAttributes == nil {
r.Entries[entryIndex].MessageAttributes = make(map[string]MessageAttribute)
}

binaryValue, err := base64.StdEncoding.DecodeString(encodedBinaryValue)
if err != nil {
log.Warnf("Failed to base64 decode. %s may not have been base64 encoded.", encodedBinaryValue)
continue
}

r.Entries[entryIndex].MessageAttributes[name] = MessageAttribute{
DataType: dataType,
StringValue: stringValue,
BinaryValue: []byte(binaryValue),
BinaryValue: binaryValue,
}

if _, ok := r.Entries[entryIndex].MessageAttributes[name]; !ok {
Expand Down Expand Up @@ -741,15 +752,21 @@ func (r *PublishRequest) SetAttributesFromForm(values url.Values) {
}

stringValue := values.Get(fmt.Sprintf("MessageAttributes.entry.%d.Value.StringValue", i))
binaryValue := values.Get(fmt.Sprintf("MessageAttributes.entry.%d.Value.BinaryValue", i))
encodedBinaryValue := values.Get(fmt.Sprintf("MessageAttributes.entry.%d.Value.BinaryValue", i))
binaryValue, err := base64.StdEncoding.DecodeString(encodedBinaryValue)
if err != nil {
log.Warnf("Failed to base64 decode. %s may not have been base64 encoded.", encodedBinaryValue)
continue
}

if r.MessageAttributes == nil {
r.MessageAttributes = make(map[string]MessageAttribute)
}

attributes[name] = MessageAttribute{
DataType: caser.String(dataType), // capitalize
DataType: cases.Title(language.AmericanEnglish).String(dataType), // capitalize
StringValue: stringValue,
BinaryValue: []byte(binaryValue),
BinaryValue: binaryValue,
}
}
r.MessageAttributes = attributes
Expand Down
2 changes: 1 addition & 1 deletion app/models/requests_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func TestSendMessageRequest_SetAttributesFromForm_success(t *testing.T) {
attr2 := r.MessageAttributes["Attr2"]
assert.Equal(t, "Binary", attr2.DataType)
assert.Empty(t, attr2.StringValue)
assert.Equal(t, []uint8("VmFsdWUy"), attr2.BinaryValue)
assert.Equal(t, []uint8("Value2"), attr2.BinaryValue)
}

func TestSetQueueAttributesRequest_SetAttributesFromForm_success(t *testing.T) {
Expand Down
7 changes: 7 additions & 0 deletions app/models/responses.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package models

import (
"encoding/base64"
"encoding/xml"
)

Expand Down Expand Up @@ -83,6 +84,9 @@ func (r *ResultMessage) MarshalXML(e *xml.Encoder, start xml.StartElement) error
}
var messageAttrs []MessageAttributes
for key, value := range r.MessageAttributes {
if value.DataType == "Binary" {
value.BinaryValue = []byte(base64.StdEncoding.EncodeToString(value.BinaryValue))
}
attribute := MessageAttributes{
Name: key,
Value: value,
Expand All @@ -95,6 +99,9 @@ func (r *ResultMessage) MarshalXML(e *xml.Encoder, start xml.StartElement) error
e.EncodeElement(r.MessageId, xml.StartElement{Name: xml.Name{Local: "MessageId"}})
e.EncodeElement(r.ReceiptHandle, xml.StartElement{Name: xml.Name{Local: "ReceiptHandle"}})
e.EncodeElement(r.MD5OfBody, xml.StartElement{Name: xml.Name{Local: "MD5OfBody"}})
if r.MessageAttributes != nil {
e.EncodeElement(r.MD5OfMessageAttributes, xml.StartElement{Name: xml.Name{Local: "MD5OfMessageAttributes"}})
}
e.EncodeElement(r.Body, xml.StartElement{Name: xml.Name{Local: "Body"}})
e.EncodeElement(attrs, xml.StartElement{Name: xml.Name{Local: "Attribute"}})
e.EncodeElement(messageAttrs, xml.StartElement{Name: xml.Name{Local: "MessageAttribute"}})
Expand Down
4 changes: 2 additions & 2 deletions app/models/responses_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func Test_ResultMessage_MarshalXML_success_with_attributes(t *testing.T) {
resultString := string(result)

// We have to assert piecemeal like this, the maps go into their lists unordered, which will randomly break this.
entry := "<ResultMessage><MessageId>message-id</MessageId><ReceiptHandle>receipt-handle</ReceiptHandle><MD5OfBody>body-md5</MD5OfBody><Body>message-body</Body>"
entry := "<ResultMessage><MessageId>message-id</MessageId><ReceiptHandle>receipt-handle</ReceiptHandle><MD5OfBody>body-md5</MD5OfBody><MD5OfMessageAttributes>message-attrs-md5</MD5OfMessageAttributes><Body>message-body</Body>"
assert.Contains(t, resultString, entry)

entry = "<Attribute><Name>ApproximateFirstReceiveTimestamp</Name><Value>1</Value></Attribute>"
Expand All @@ -81,7 +81,7 @@ func Test_ResultMessage_MarshalXML_success_with_attributes(t *testing.T) {
entry = "<MessageAttribute><Name>attr1</Name><Value><DataType>String</DataType><StringValue>string-value</StringValue></Value></MessageAttribute>"
assert.Contains(t, resultString, entry)

entry = "<MessageAttribute><Name>attr2</Name><Value><BinaryValue>binary-value</BinaryValue><DataType>Binary</DataType></Value></MessageAttribute>"
entry = "<MessageAttribute><Name>attr2</Name><Value><BinaryValue>YmluYXJ5LXZhbHVl</BinaryValue><DataType>Binary</DataType></Value></MessageAttribute>"
assert.Contains(t, resultString, entry)

entry = "<MessageAttribute><Name>attr3</Name><Value><DataType>Number</DataType><StringValue>number-value</StringValue></Value></MessageAttribute>"
Expand Down
88 changes: 87 additions & 1 deletion smoke_tests/sns_publish_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package smoke_tests

import (
"bytes"
"context"
"encoding/json"
"io"
Expand Down Expand Up @@ -64,7 +65,92 @@ func Test_Publish_sqs_json_raw(t *testing.T) {
assert.Equal(t, message, *receivedMessage.Messages[0].Body)
}

func Test_Publish_Sqs_With_Message_Attributes(t *testing.T) {
func Test_Publish_sqs_json_with_message_attributes_raw(t *testing.T) {
server := generateServer()
defer func() {
server.Close()
models.ResetResources()
}()

sdkConfig, _ := config.LoadDefaultConfig(context.TODO())
sdkConfig.BaseEndpoint = aws.String(server.URL)
sqsClient := sqs.NewFromConfig(sdkConfig)
snsClient := sns.NewFromConfig(sdkConfig)

createQueueResult, _ := sqsClient.CreateQueue(context.TODO(), &sqs.CreateQueueInput{
QueueName: &af.QueueName,
})

topicName := aws.String("unit-topic2")

createTopicResult, _ := snsClient.CreateTopic(context.TODO(), &sns.CreateTopicInput{
Name: topicName,
})

subscribeResult, _ := snsClient.Subscribe(context.TODO(), &sns.SubscribeInput{
Protocol: aws.String("sqs"),
TopicArn: createTopicResult.TopicArn,
Attributes: map[string]string{},
Endpoint: createQueueResult.QueueUrl,
ReturnSubscriptionArn: true,
})

snsClient.SetSubscriptionAttributes(context.TODO(), &sns.SetSubscriptionAttributesInput{
SubscriptionArn: subscribeResult.SubscriptionArn,
AttributeName: aws.String("RawMessageDelivery"),
AttributeValue: aws.String("true"),
})
message := "{\"IAm\": \"aMessage\"}"
subject := "I am a subject"
stringKey := "string-key"
binaryKey := "binary-key"
numberKey := "number-key"
stringValue := "string-value"
binaryValue := []byte("binary-value")
numberValue := "100"
attributes := map[string]types.MessageAttributeValue{
stringKey: {
StringValue: aws.String(stringValue),
DataType: aws.String("String"),
},
binaryKey: {
BinaryValue: binaryValue,
DataType: aws.String("Binary"),
},
numberKey: {
StringValue: aws.String(numberValue),
DataType: aws.String("Number"),
},
}

publishResponse, publishErr := snsClient.Publish(context.TODO(), &sns.PublishInput{
TopicArn: createTopicResult.TopicArn,
Message: &message,
Subject: &subject,
MessageAttributes: attributes,
})

receiveMessageResponse, receiveErr := sqsClient.ReceiveMessage(context.TODO(), &sqs.ReceiveMessageInput{
QueueUrl: createQueueResult.QueueUrl,
})

assert.Nil(t, publishErr)
assert.NotNil(t, publishResponse)

assert.Nil(t, receiveErr)
assert.NotNil(t, receiveMessageResponse)
assert.Equal(t, message, *receiveMessageResponse.Messages[0].Body)

assert.Equal(t, "649b2c548f103e499304eda4d6d4c5a2", *receiveMessageResponse.Messages[0].MD5OfBody)
assert.Equal(t, "ddfbe54b92058bf5b5f00055fa2032a5", *receiveMessageResponse.Messages[0].MD5OfMessageAttributes)

assert.Equal(t, stringValue, *receiveMessageResponse.Messages[0].MessageAttributes[stringKey].StringValue)
assert.True(t, bytes.Equal(binaryValue, receiveMessageResponse.Messages[0].MessageAttributes[binaryKey].BinaryValue))
assert.Equal(t, numberValue, *receiveMessageResponse.Messages[0].MessageAttributes[numberKey].StringValue)

}

func Test_Publish_sqs_json_with_message_attributes_not_raw(t *testing.T) {
server := generateServer()
defer func() {
server.Close()
Expand Down
4 changes: 2 additions & 2 deletions smoke_tests/sqs_receive_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func Test_ReceiveMessageV1_xml_with_attributes(t *testing.T) {
assert.Equal(t, 1, len(receiveMessageResponse.Result.Messages))
assert.Equal(t, "MyTestMessage", receiveMessageResponse.Result.Messages[0].Body)
assert.Equal(t, "ad4883a84ad41c79aa3a373698c0d4e9", receiveMessageResponse.Result.Messages[0].MD5OfBody)
assert.Equal(t, "", receiveMessageResponse.Result.Messages[0].MD5OfMessageAttributes)
assert.Equal(t, "ae8770938aee44bc548cf65ac377e3bf", receiveMessageResponse.Result.Messages[0].MD5OfMessageAttributes)

entry := "<Attribute><Name>ApproximateFirstReceiveTimestamp</Name><Value>"
assert.Contains(t, response, entry)
Expand All @@ -288,6 +288,6 @@ func Test_ReceiveMessageV1_xml_with_attributes(t *testing.T) {
entry = "<MessageAttribute><Name>attr2</Name><Value><DataType>Number</DataType><StringValue>number-value</StringValue></Value></MessageAttribute>"
assert.Contains(t, response, entry)

entry = "<MessageAttribute><Name>attr3</Name><Value><BinaryValue>binary-value</BinaryValue><DataType>Binary</DataType></Value></MessageAttribute>"
entry = "<MessageAttribute><Name>attr3</Name><Value><BinaryValue>YmluYXJ5LXZhbHVl</BinaryValue><DataType>Binary</DataType></Value></MessageAttribute>"
assert.Contains(t, response, entry)
}
6 changes: 4 additions & 2 deletions smoke_tests/sqs_send_message_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package smoke_tests

import (
"context"
"encoding/base64"
"encoding/xml"
"fmt"
"net/http"
Expand Down Expand Up @@ -348,7 +349,8 @@ func TestSendMessageBatchV1_Xml_Success_including_attributes(t *testing.T) {
stringType := "String"
numberType := "Number"

binaryValue := "binary-value"
binaryValue := []byte("binary-value")
binaryValueEncodeString := base64.StdEncoding.EncodeToString([]byte("binary-value"))
stringValue := "string-value"
numberValue := "100"

Expand All @@ -370,7 +372,7 @@ func TestSendMessageBatchV1_Xml_Success_including_attributes(t *testing.T) {
WithFormField("Entries.1.MessageBody", messageBody2).
WithFormField("Entries.1.MessageAttributes.1.Name", binaryAttributeKey).
WithFormField("Entries.1.MessageAttributes.1.Value.DataType", binaryType).
WithFormField("Entries.1.MessageAttributes.1.Value.BinaryValue", binaryValue).
WithFormField("Entries.1.MessageAttributes.1.Value.BinaryValue", binaryValueEncodeString).
WithFormField("Entries.1.MessageAttributes.2.Name", stringAttributeKey).
WithFormField("Entries.1.MessageAttributes.2.Value.DataType", stringType).
WithFormField("Entries.1.MessageAttributes.2.Value.StringValue", stringValue).
Expand Down
2 changes: 1 addition & 1 deletion smoke_tests/sqs_send_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func Test_SendMessageV1_xml_with_attributes(t *testing.T) {
WithFormField("MessageAttribute.2.Value.StringValue", "2").
WithFormField("MessageAttribute.3.Name", "attr3").
WithFormField("MessageAttribute.3.Value.DataType", "Binary").
WithFormField("MessageAttribute.3.Value.BinaryValue", "attr3_value").
WithFormField("MessageAttribute.3.Value.BinaryValue", "YXR0cjNfdmFsdWU="). // base64 encode string attr3_value
Expect().
Status(http.StatusOK).
Body().Raw()
Expand Down

0 comments on commit 7b3ec1a

Please sign in to comment.