Skip to content

Commit

Permalink
fix(iot-device): Fix for retrieving message system properties over Http
Browse files Browse the repository at this point in the history
  • Loading branch information
abhipsaMisra committed Jun 24, 2020
1 parent 5e0f250 commit d08cdad
Show file tree
Hide file tree
Showing 13 changed files with 128 additions and 100 deletions.
7 changes: 4 additions & 3 deletions e2e/test/AzureSecurityCenterForIoTSecurityMessageE2ETests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ private async Task SendSingleSecurityMessageAsync(
Client.Message testMessage = ComposeD2CSecurityTestMessage(out string eventId, out string payload, out string p1Value);
await deviceClient.SendEventAsync(testMessage).ConfigureAwait(false);

await ValidateEventAsync(deviceId, eventId, payload, p1Value, logAnalticsTestClient).ConfigureAwait(false);
await ValidateEventAsync(deviceId, testMessage, eventId, payload, p1Value, logAnalticsTestClient).ConfigureAwait(false);
}

private async Task SendSingleSecurityMessageModuleAsync(
Expand All @@ -180,17 +180,18 @@ private async Task SendSingleSecurityMessageModuleAsync(
Client.Message testMessage = ComposeD2CSecurityTestMessage(out string eventId, out string payload, out string p1Value);
await moduleClient.SendEventAsync(testMessage).ConfigureAwait(false);

await ValidateEventAsync(deviceId, eventId, payload, p1Value, logAnalticsTestClient).ConfigureAwait(false);
await ValidateEventAsync(deviceId, testMessage, eventId, payload, p1Value, logAnalticsTestClient).ConfigureAwait(false);
}

private async Task ValidateEventAsync(
string deviceId,
Client.Message message,
string eventId,
string payload,
string p1Value,
AzureSecurityCenterForIoTLogAnalyticsClient logAnalticsTestClient)
{
bool isReceivedEventHub = EventHubTestListener.VerifyIfMessageIsReceived(deviceId, payload, p1Value, TimeSpan.FromSeconds(10));
bool isReceivedEventHub = EventHubTestListener.VerifyIfMessageIsReceived(deviceId, message, payload, p1Value, TimeSpan.FromSeconds(10));
Assert.IsFalse(isReceivedEventHub, "Security message received in customer event hub.");
bool isReceivedOms = await logAnalticsTestClient.IsRawEventExist(deviceId, eventId).ConfigureAwait(false);
Assert.IsTrue(isReceivedOms, "Security message was not received in customer log analytics");
Expand Down
17 changes: 9 additions & 8 deletions e2e/test/CombinedClientOperationsPoolAmqpTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ private async Task DeviceCombinedClientOperationsAsync(
// Initialize service client for service-side operations
using ServiceClient serviceClient = ServiceClient.CreateFromConnectionString(Configuration.IoTHub.ConnectionString);

// Message payload for C2D operation
var messagesSent = new Dictionary<string, List<string>>();
// Message payload and properties for C2D operation
var messagesSent = new Dictionary<string, Tuple<Message, string>>();

// Twin properties
var twinPropertyMap = new Dictionary<string, List<string>>();
Expand All @@ -134,10 +134,10 @@ private async Task DeviceCombinedClientOperationsAsync(

// Send C2D Message
s_log.WriteLine($"{nameof(CombinedClientOperationsPoolAmqpTests)}: Send C2D for device={testDevice.Id}");
(Message msg, string messageId, string payload, string p1Value) = MessageReceiveE2ETests.ComposeC2dTestMessage();
(Message msg, string payload, string p1Value) = MessageReceiveE2ETests.ComposeC2dTestMessage();
using (msg)
{
messagesSent.Add(testDevice.Id, new List<string> { payload, p1Value });
messagesSent.Add(testDevice.Id, Tuple.Create(msg, payload));
Task sendC2dMessage = serviceClient.SendAsync(testDevice.Id, msg);
initOperations.Add(sendC2dMessage);

Expand Down Expand Up @@ -170,10 +170,11 @@ private async Task DeviceCombinedClientOperationsAsync(

// C2D Operation
s_log.WriteLine($"{nameof(CombinedClientOperationsPoolAmqpTests)}: Operation 2: Receive C2D for device={testDevice.Id}");
List<string> msgSent = messagesSent[testDevice.Id];
string payload = msgSent[0];
string p1Value = msgSent[1];
Task verifyDeviceClientReceivesMessage = MessageReceiveE2ETests.VerifyReceivedC2DMessageAsync(transport, deviceClient, testDevice.Id, payload, p1Value);
Tuple<Message, string> msgSent = messagesSent[testDevice.Id];
Message msg = msgSent.Item1;
string payload = msgSent.Item2;

Task verifyDeviceClientReceivesMessage = MessageReceiveE2ETests.VerifyReceivedC2DMessageAsync(transport, deviceClient, testDevice.Id, msg, payload);
clientOperations.Add(verifyDeviceClientReceivesMessage);

// Invoke direct methods
Expand Down
8 changes: 4 additions & 4 deletions e2e/test/EventHubTestListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ private EventHubTestListener()
}

// verify required message is present in the dictionary
public static bool VerifyIfMessageIsReceived(string deviceId, string payload, string p1Value, TimeSpan? maxWaitTime = null)
public static bool VerifyIfMessageIsReceived(string deviceId, Client.Message message, string payload, string p1Value, TimeSpan? maxWaitTime = null)
{
if (!maxWaitTime.HasValue)
{
maxWaitTime = s_maximumWaitTime;
}

s_log.WriteLine($"Expected payload: deviceId={deviceId}; payload={payload}; property1={p1Value}");
s_log.WriteLine($"Expected payload: deviceId={deviceId}; messageId = {message.MessageId}, userId={message.UserId}, payload={payload}; property1={p1Value}");

bool isReceived = false;

Expand All @@ -64,7 +64,7 @@ public static bool VerifyIfMessageIsReceived(string deviceId, string payload, st
continue;
}

isReceived = VerifyTestMessage(eventData, deviceId, p1Value);
isReceived = VerifyTestMessage(eventData, deviceId, message, p1Value);
}

sw.Stop();
Expand Down Expand Up @@ -101,7 +101,7 @@ private static string GetEventDataBody(EventData eventData)
#endif
}

private static bool VerifyTestMessage(EventData eventData, string deviceName, string p1Value)
private static bool VerifyTestMessage(EventData eventData, string deviceName, Client.Message message, string p1Value)
{
#if NET451
var connectionDeviceId = eventData.SystemProperties["iothub-connection-device-id"].ToString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,15 +618,15 @@ private async Task ReceiveMessageRecoveryPoolOverAmqp(

Func<DeviceClient, TestDevice, Task> testOperation = async (deviceClient, testDevice) =>
{
(Message msg, string messageId, string payload, string p1Value) = MessageReceiveE2ETests.ComposeC2dTestMessage();
(Message msg, string payload, string p1Value) = MessageReceiveE2ETests.ComposeC2dTestMessage();
_log.WriteLine($"{nameof(FaultInjectionPoolAmqpTests)}: Sending message to device {testDevice.Id}: payload='{payload}' p1Value='{p1Value}'");
await serviceClient.SendAsync(testDevice.Id, msg)
.ConfigureAwait(false);

_log.WriteLine($"{nameof(FaultInjectionPoolAmqpTests)}: Preparing to receive message for device {testDevice.Id}");
await deviceClient.OpenAsync()
.ConfigureAwait(false);
await MessageReceiveE2ETests.VerifyReceivedC2DMessageAsync(transport, deviceClient, testDevice.Id, payload, p1Value)
await MessageReceiveE2ETests.VerifyReceivedC2DMessageAsync(transport, deviceClient, testDevice.Id, msg, payload)
.ConfigureAwait(false);
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -866,12 +866,12 @@ private async Task SendMessageRecoveryPoolOverAmqpAsync(
_log.WriteLine($"{nameof(FaultInjectionPoolAmqpTests)}: Preparing to send message for device {testDevice.Id}");
await deviceClient.OpenAsync().ConfigureAwait(false);

(Client.Message testMessage, string messageId, string payload, string p1Value) = MessageSendE2ETests.ComposeD2cTestMessage();
(Client.Message testMessage, string payload, string p1Value) = MessageSendE2ETests.ComposeD2cTestMessage();

_log.WriteLine($"{nameof(FaultInjectionPoolAmqpTests)}.{testDevice.Id}: payload='{payload}' p1Value='{p1Value}'");
await deviceClient.SendEventAsync(testMessage).ConfigureAwait(false);

bool isReceived = EventHubTestListener.VerifyIfMessageIsReceived(testDevice.Id, payload, p1Value);
bool isReceived = EventHubTestListener.VerifyIfMessageIsReceived(testDevice.Id, testMessage, payload, p1Value);
Assert.IsTrue(isReceived, $"Message is not received for device {testDevice.Id}.");
};

Expand Down
2 changes: 1 addition & 1 deletion e2e/test/MessageFeedbackE2ETests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private static async Task CompleteMessageMixOrder(TestDeviceType type, Client.Tr
var messages = new List<Client.Message>();
for (int i = 0; i < MESSAGE_COUNT; i++)
{
(Message msg, string messageId, string payload, string p1Value) = MessageReceiveE2ETests.ComposeC2dTestMessage();
(Message msg, string payload, string p1Value) = MessageReceiveE2ETests.ComposeC2dTestMessage();
await serviceClient.SendAsync(testDevice.Id, msg).ConfigureAwait(false);
Client.Message message = await deviceClient.ReceiveAsync(TIMESPAN_ONE_MINUTE).ConfigureAwait(false);
if (message == null)
Expand Down
13 changes: 5 additions & 8 deletions e2e/test/MessageReceiveE2EPoolAmqpTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,15 @@ private async Task ReceiveMessagePoolOverAmqpAsync(
int devicesCount,
ConnectionStringAuthScope authScope = ConnectionStringAuthScope.Device)
{
var messagesSent = new Dictionary<string, List<string>>();
var messagesSent = new Dictionary<string, Tuple<Message, string>>();

// Initialize the service client
ServiceClient serviceClient = ServiceClient.CreateFromConnectionString(Configuration.IoTHub.ConnectionString);

Func<DeviceClient, TestDevice, Task> initOperation = async (deviceClient, testDevice) =>
{
(Message msg, string messageId, string payload, string p1Value) = MessageReceiveE2ETests.ComposeC2dTestMessage();
messagesSent.Add(testDevice.Id, new List<string> { payload, p1Value });
(Message msg, string payload, string p1Value) = MessageReceiveE2ETests.ComposeC2dTestMessage();
messagesSent.Add(testDevice.Id, Tuple.Create(msg, payload));

await serviceClient.SendAsync(testDevice.Id, msg).ConfigureAwait(false);
};
Expand All @@ -132,11 +132,8 @@ private async Task ReceiveMessagePoolOverAmqpAsync(
_log.WriteLine($"{nameof(MessageReceiveE2EPoolAmqpTests)}: Preparing to receive message for device {testDevice.Id}");
await deviceClient.OpenAsync().ConfigureAwait(false);

List<string> msgSent = messagesSent[testDevice.Id];
string payload = msgSent[0];
string p1Value = msgSent[1];

await MessageReceiveE2ETests.VerifyReceivedC2DMessageAsync(transport, deviceClient, testDevice.Id, payload, p1Value).ConfigureAwait(false);
Tuple<Message, string> msgSent = messagesSent[testDevice.Id];
await MessageReceiveE2ETests.VerifyReceivedC2DMessageAsync(transport, deviceClient, testDevice.Id, msgSent.Item1, msgSent.Item2).ConfigureAwait(false);
};

Func<Task> cleanupOperation = async () =>
Expand Down
29 changes: 19 additions & 10 deletions e2e/test/MessageReceiveE2ETests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ namespace Microsoft.Azure.Devices.E2ETests
public partial class MessageReceiveE2ETests : IDisposable
{
private static readonly string s_devicePrefix = $"E2E_{nameof(MessageReceiveE2ETests)}_";

private static readonly TestLogging s_log = TestLogging.GetInstance();
private static readonly TimeSpan s_oneMinute = TimeSpan.FromMinutes(1);
private static readonly TimeSpan s_oneSecond = TimeSpan.FromSeconds(1);
Expand Down Expand Up @@ -220,30 +221,33 @@ public async Task Message_DeviceReceiveMessageOperationTimeout_MqttWs()
await ReceiveMessageInOperationTimeoutAsync(TestDeviceType.Sasl, Client.TransportType.Mqtt_WebSocket_Only).ConfigureAwait(false);
}

public static (Message message, string messageId, string payload, string p1Value) ComposeC2dTestMessage()
public static (Message message, string payload, string p1Value) ComposeC2dTestMessage()
{
var payload = Guid.NewGuid().ToString();
var messageId = Guid.NewGuid().ToString();
var p1Value = Guid.NewGuid().ToString();
var userId = Guid.NewGuid().ToString();

s_log.WriteLine($"{nameof(ComposeC2dTestMessage)}: messageId='{messageId}' payload='{payload}' p1Value='{p1Value}'");
s_log.WriteLine($"{nameof(ComposeC2dTestMessage)}: messageId='{messageId}' userId='{userId}' payload='{payload}' p1Value='{p1Value}'");
var message = new Message(Encoding.UTF8.GetBytes(payload))
{
MessageId = messageId,
UserId = userId,
Properties = { ["property1"] = p1Value }
};

return (message, messageId, payload, p1Value);
return (message, payload, p1Value);
}

public static async Task VerifyReceivedC2DMessageAsync(Client.TransportType transport, DeviceClient dc, string deviceId, string payload, string p1Value)
public static async Task VerifyReceivedC2DMessageAsync(Client.TransportType transport, DeviceClient dc, string deviceId, Message message, string payload)
{
string receivedMessageDestination = $"/devices/{deviceId}/messages/deviceBound";

var sw = new Stopwatch();
bool received = false;

sw.Start();


while (!received && sw.ElapsedMilliseconds < FaultInjection.RecoveryTimeMilliseconds)
{
Client.Message receivedMessage = null;
Expand Down Expand Up @@ -277,14 +281,19 @@ public static async Task VerifyReceivedC2DMessageAsync(Client.TransportType tran
// ignore exception from CompleteAsync
}

Assert.AreEqual(receivedMessage.MessageId, message.MessageId, "Recieved message Id is not what was sent by service");
Assert.AreEqual(receivedMessage.UserId, message.UserId, "Recieved user Id is not what was sent by service");
Assert.AreEqual(receivedMessage.To, receivedMessageDestination, "Recieved message destination is not what was sent by service");

string messageData = Encoding.ASCII.GetString(receivedMessage.GetBytes());
s_log.WriteLine($"{nameof(VerifyReceivedC2DMessageAsync)}: Received message: for {deviceId}: {messageData}");
if (Equals(payload, messageData))
{
Assert.AreEqual(1, receivedMessage.Properties.Count, $"The count of received properties did not match for device {deviceId}");
System.Collections.Generic.KeyValuePair<string, string> prop = receivedMessage.Properties.Single();
Assert.AreEqual("property1", prop.Key, $"The key \"property1\" did not match for device {deviceId}");
Assert.AreEqual(p1Value, prop.Value, $"The value of \"property1\" did not match for device {deviceId}");
string propertyKey = "property1";
Assert.AreEqual(propertyKey, prop.Key, $"The key \"property1\" did not match for device {deviceId}");
Assert.AreEqual(message.Properties[propertyKey], prop.Value, $"The value of \"property1\" did not match for device {deviceId}");
received = true;
}
}
Expand Down Expand Up @@ -422,11 +431,11 @@ private async Task ReceiveSingleMessageAsync(TestDeviceType type, Client.Transpo

await serviceClient.OpenAsync().ConfigureAwait(false);

(Message msg, string messageId, string payload, string p1Value) = ComposeC2dTestMessage();
(Message msg, string payload, string p1Value) = ComposeC2dTestMessage();
using (msg)
{
await serviceClient.SendAsync(testDevice.Id, msg).ConfigureAwait(false);
await VerifyReceivedC2DMessageAsync(transport, deviceClient, testDevice.Id, payload, p1Value).ConfigureAwait(false);
await VerifyReceivedC2DMessageAsync(transport, deviceClient, testDevice.Id, msg, payload).ConfigureAwait(false);
}

await deviceClient.CloseAsync().ConfigureAwait(false);
Expand All @@ -450,7 +459,7 @@ private async Task ReceiveSingleMessageWithCancellationTokenAsync(TestDeviceType

await serviceClient.OpenAsync().ConfigureAwait(false);

(Message msg, string messageId, string payload, string p1Value) = ComposeC2dTestMessage();
(Message msg, string payload, string p1Value) = ComposeC2dTestMessage();
using (msg)
{
await serviceClient.SendAsync(testDevice.Id, msg).ConfigureAwait(false);
Expand Down
4 changes: 2 additions & 2 deletions e2e/test/MessageReceiveFaultInjectionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,9 @@ private async Task ReceiveMessageRecovery(

Func<DeviceClient, TestDevice, Task> testOperation = async (deviceClient, testDevice) =>
{
(Message message, string messageId, string payload, string p1Value) = MessageReceiveE2ETests.ComposeC2dTestMessage();
(Message message, string payload, string p1Value) = MessageReceiveE2ETests.ComposeC2dTestMessage();
await serviceClient.SendAsync(testDevice.Id, message).ConfigureAwait(false);
await MessageReceiveE2ETests.VerifyReceivedC2DMessageAsync(transport, deviceClient, testDevice.Id, payload, p1Value).ConfigureAwait(false);
await MessageReceiveE2ETests.VerifyReceivedC2DMessageAsync(transport, deviceClient, testDevice.Id, message, payload).ConfigureAwait(false);
};

Func<Task> cleanupOperation = () =>
Expand Down
6 changes: 3 additions & 3 deletions e2e/test/MessageSendE2EPoolAmqpTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,11 @@ private async Task SendMessagePoolOverAmqp(
s_log.WriteLine($"{nameof(MessageSendE2EPoolAmqpTests)}: Preparing to send message for device {testDevice.Id}");
await deviceClient.OpenAsync().ConfigureAwait(false);

(Client.Message testMessage, string messageId, string payload, string p1Value) = MessageSendE2ETests.ComposeD2cTestMessage();
s_log.WriteLine($"{nameof(MessageSendE2EPoolAmqpTests)}.{testDevice.Id}: messageId='{messageId}' payload='{payload}' p1Value='{p1Value}'");
(Client.Message testMessage, string payload, string p1Value) = MessageSendE2ETests.ComposeD2cTestMessage();
s_log.WriteLine($"{nameof(MessageSendE2EPoolAmqpTests)}.{testDevice.Id}: messageId='{testMessage.MessageId}' payload='{payload}' p1Value='{p1Value}'");
await deviceClient.SendEventAsync(testMessage).ConfigureAwait(false);

bool isReceived = EventHubTestListener.VerifyIfMessageIsReceived(testDevice.Id, payload, p1Value);
bool isReceived = EventHubTestListener.VerifyIfMessageIsReceived(testDevice.Id, testMessage, payload, p1Value);
Assert.IsTrue(isReceived, "Message is not received.");
};

Expand Down
Loading

0 comments on commit d08cdad

Please sign in to comment.