diff --git a/e2e/test/iothub/messaging/MessageReceiveE2ETests.cs b/e2e/test/iothub/messaging/MessageReceiveE2ETests.cs index 006c5676d8..b0a4516b4f 100644 --- a/e2e/test/iothub/messaging/MessageReceiveE2ETests.cs +++ b/e2e/test/iothub/messaging/MessageReceiveE2ETests.cs @@ -9,6 +9,7 @@ using System.Threading.Tasks; using FluentAssertions; using Microsoft.Azure.Devices.Client; +using Microsoft.Azure.Devices.Client.Exceptions; using Microsoft.Azure.Devices.Client.Transport.Mqtt; using Microsoft.Azure.Devices.E2ETests.Helpers; using Microsoft.Azure.Devices.E2ETests.Helpers.Templates; @@ -108,6 +109,12 @@ public async Task Message_DeviceReceiveSingleMessageWithCancellationToken_Mqtt() await ReceiveSingleMessageWithCancellationTokenAsync(TestDeviceType.Sasl, Client.TransportType.Mqtt_Tcp_Only).ConfigureAwait(false); } + [LoggedTestMethod] + public async Task Message_DeviceReceiveSingleMessageCancelled_Mqtt() + { + await Mqtt_ReceiveSingleMessageWithCancelledAsync(TestDeviceType.Sasl).ConfigureAwait(false); + } + [LoggedTestMethod] public async Task Message_DeviceReceiveSingleMessageWithCancellationToken_MqttWs() { @@ -515,6 +522,22 @@ public static async Task VerifyReceivedC2DMessageAsync(Client.TransportType tran Assert.IsTrue(received, $"No message received for device {deviceId} with payload={payload} in {FaultInjection.RecoveryTime}."); } + public static async Task Mqtt_VerifyReceivedC2dMessageCancelledAsync(DeviceClient dc) + { + try + { + using var cts = new CancellationTokenSource(s_fiveSeconds); + await dc.ReceiveAsync(cts.Token).ConfigureAwait(false); + } + catch (IotHubCommunicationException ex) + when (ex.IsTransient && ex.InnerException is OperationCanceledException) + { + return; + } + + Assert.Fail(); + } + public static async Task VerifyReceivedC2dMessageWithCancellationTokenAsync(Client.TransportType transport, DeviceClient dc, string deviceId, string payload, string p1Value, MsTestLogger logger) { var sw = new Stopwatch(); @@ -627,6 +650,19 @@ private async Task ReceiveSingleMessageAsync(TestDeviceType type, Client.Transpo await serviceClient.CloseAsync().ConfigureAwait(false); } + private async Task Mqtt_ReceiveSingleMessageWithCancelledAsync(TestDeviceType type) + { + TestDevice testDevice = await TestDevice.GetTestDeviceAsync(Logger, s_devicePrefix, type).ConfigureAwait(false); + using DeviceClient deviceClient = testDevice.CreateDeviceClient(Client.TransportType.Mqtt_Tcp_Only); + + await deviceClient.OpenAsync().ConfigureAwait(false); + + // There is no message being sent so the device client should timeout waiting for the message. + await Mqtt_VerifyReceivedC2dMessageCancelledAsync(deviceClient).ConfigureAwait(false); + + await deviceClient.CloseAsync().ConfigureAwait(false); + } + private async Task ReceiveSingleMessageWithCancellationTokenAsync(TestDeviceType type, Client.TransportType transport) { TestDevice testDevice = await TestDevice.GetTestDeviceAsync(Logger, s_devicePrefix, type).ConfigureAwait(false); diff --git a/iothub/device/src/InternalClient.cs b/iothub/device/src/InternalClient.cs index 33360e347a..5f4794c9c9 100644 --- a/iothub/device/src/InternalClient.cs +++ b/iothub/device/src/InternalClient.cs @@ -312,11 +312,11 @@ public async Task OpenAsync() /// Explicitly open the InternalClient instance. /// /// A cancellation token that can be used by other objects or threads to receive notice of cancellation. - public async Task OpenAsync(CancellationToken cancellationToken) + public Task OpenAsync(CancellationToken cancellationToken) { try { - await InnerHandler.OpenAsync(cancellationToken).ConfigureAwait(false); + return InnerHandler.OpenAsync(cancellationToken); } catch (IotHubCommunicationException ex) when (ex.InnerException is OperationCanceledException) { @@ -346,11 +346,11 @@ public async Task CloseAsync() /// /// Close the InternalClient instance /// - public async Task CloseAsync(CancellationToken cancellationToken) + public Task CloseAsync(CancellationToken cancellationToken) { try { - await InnerHandler.CloseAsync(cancellationToken).ConfigureAwait(false); + return InnerHandler.CloseAsync(cancellationToken); } catch (IotHubCommunicationException ex) when (ex.InnerException is OperationCanceledException) { @@ -431,7 +431,7 @@ public async Task CompleteAsync(string lockToken) /// /// A cancellation token that can be used by other objects or threads to receive notice of cancellation. /// The lock identifier for the previously received message - public async Task CompleteAsync(string lockToken, CancellationToken cancellationToken) + public Task CompleteAsync(string lockToken, CancellationToken cancellationToken) { // Codes_SRS_DEVICECLIENT_28_013: [The asynchronous operation shall retry until time specified in OperationTimeoutInMilliseconds property expire or unrecoverable error(authentication, quota exceed) occurs.] if (string.IsNullOrEmpty(lockToken)) @@ -441,7 +441,7 @@ public async Task CompleteAsync(string lockToken, CancellationToken cancellation try { - await InnerHandler.CompleteAsync(lockToken, cancellationToken).ConfigureAwait(false); + return InnerHandler.CompleteAsync(lockToken, cancellationToken); } catch (IotHubCommunicationException ex) when (ex.InnerException is OperationCanceledException) { @@ -471,17 +471,17 @@ public Task CompleteAsync(Message message) /// The message to complete /// A cancellation token that can be used by other objects or threads to receive notice of cancellation. /// The previously received message - public async Task CompleteAsync(Message message, CancellationToken cancellationToken) + public Task CompleteAsync(Message message, CancellationToken cancellationToken) { if (message == null) { throw new ArgumentNullException(nameof(message)); } - // Codes_SRS_DEVICECLIENT_28_015: [The asynchronous operation shall retry until time specified in OperationTimeoutInMilliseconds property expire or unrecoverable error(authentication, quota exceed) occurs.] + // The asynchronous operation shall retry until time specified in OperationTimeoutInMilliseconds property expire or unrecoverable error(authentication, quota exceed) occurs. try { - await CompleteAsync(message.LockToken, cancellationToken).ConfigureAwait(false); + return CompleteAsync(message.LockToken, cancellationToken); } catch (IotHubCommunicationException ex) when (ex.InnerException is OperationCanceledException) { @@ -513,17 +513,17 @@ public async Task AbandonAsync(string lockToken) /// Puts a received message back onto the device queue /// /// The previously received message - public async Task AbandonAsync(string lockToken, CancellationToken cancellationToken) + public Task AbandonAsync(string lockToken, CancellationToken cancellationToken) { if (string.IsNullOrEmpty(lockToken)) { throw new ArgumentNullException(nameof(lockToken)); } - // Codes_SRS_DEVICECLIENT_28_015: [The asynchronous operation shall retry until time specified in OperationTimeoutInMilliseconds property expire or unrecoverable error(authentication, quota exceed) occurs.] + // The asynchronous operation shall retry until time specified in OperationTimeoutInMilliseconds property expire or unrecoverable error(authentication, quota exceed) occurs. try { - await InnerHandler.AbandonAsync(lockToken, cancellationToken).ConfigureAwait(false); + return InnerHandler.AbandonAsync(lockToken, cancellationToken); } catch (IotHubCommunicationException ex) when (ex.InnerException is OperationCanceledException) { @@ -547,15 +547,16 @@ public Task AbandonAsync(Message message) /// Puts a received message back onto the device queue /// /// The lock identifier for the previously received message - public async Task AbandonAsync(Message message, CancellationToken cancellationToken) + public Task AbandonAsync(Message message, CancellationToken cancellationToken) { if (message == null) { throw new ArgumentNullException(nameof(message)); } + try { - await AbandonAsync(message.LockToken, cancellationToken).ConfigureAwait(false); + return AbandonAsync(message.LockToken, cancellationToken); } catch (IotHubCommunicationException ex) when (ex.InnerException is OperationCanceledException) { @@ -587,15 +588,16 @@ public async Task RejectAsync(string lockToken) /// Deletes a received message from the device queue and indicates to the server that the message could not be processed. /// /// The previously received message - public async Task RejectAsync(string lockToken, CancellationToken cancellationToken) + public Task RejectAsync(string lockToken, CancellationToken cancellationToken) { if (string.IsNullOrEmpty(lockToken)) { throw new ArgumentNullException(nameof(lockToken)); } + try { - await InnerHandler.RejectAsync(lockToken, cancellationToken).ConfigureAwait(false); + return InnerHandler.RejectAsync(lockToken, cancellationToken); } catch (IotHubCommunicationException ex) when (ex.InnerException is OperationCanceledException) { @@ -619,7 +621,7 @@ public Task RejectAsync(Message message) /// Deletes a received message from the device queue and indicates to the server that the message could not be processed. /// /// The lock identifier for the previously received message - public async Task RejectAsync(Message message, CancellationToken cancellationToken) + public Task RejectAsync(Message message, CancellationToken cancellationToken) { if (message == null) { @@ -628,7 +630,7 @@ public async Task RejectAsync(Message message, CancellationToken cancellationTok try { - await RejectAsync(message.LockToken, cancellationToken).ConfigureAwait(false); + return RejectAsync(message.LockToken, cancellationToken); } catch (IotHubCommunicationException ex) when (ex.InnerException is OperationCanceledException) { @@ -660,7 +662,7 @@ public async Task SendEventAsync(Message message) /// Sends an event to device hub /// /// The message containing the event - public async Task SendEventAsync(Message message, CancellationToken cancellationToken) + public Task SendEventAsync(Message message, CancellationToken cancellationToken) { if (message == null) { @@ -673,10 +675,10 @@ public async Task SendEventAsync(Message message, CancellationToken cancellation } IotHubClientDiagnostic.AddDiagnosticInfoIfNecessary(message, _diagnosticSamplingPercentage, ref _currentMessageCount); - // Codes_SRS_DEVICECLIENT_28_019: [The asynchronous operation shall retry until time specified in OperationTimeoutInMilliseconds property expire or unrecoverable error(authentication or quota exceed) occurs.] + // The asynchronous operation shall retry until time specified in OperationTimeoutInMilliseconds property expire or unrecoverable error(authentication or quota exceed) occurs. try { - await InnerHandler.SendEventAsync(message, cancellationToken).ConfigureAwait(false); + return InnerHandler.SendEventAsync(message, cancellationToken); } catch (IotHubCommunicationException ex) when (ex.InnerException is OperationCanceledException) { @@ -708,7 +710,7 @@ public async Task SendEventBatchAsync(IEnumerable messages) /// Sends a batch of events to device hub /// /// The task containing the event - public async Task SendEventBatchAsync(IEnumerable messages, CancellationToken cancellationToken) + public Task SendEventBatchAsync(IEnumerable messages, CancellationToken cancellationToken) { if (messages == null) { @@ -726,10 +728,10 @@ public async Task SendEventBatchAsync(IEnumerable messages, Cancellatio } } - // Codes_SRS_DEVICECLIENT_28_019: [The asynchronous operation shall retry until time specified in OperationTimeoutInMilliseconds property expire or unrecoverable error(authentication or quota exceed) occurs.] + // The asynchronous operation shall retry until time specified in OperationTimeoutInMilliseconds property expire or unrecoverable error(authentication or quota exceed) occurs. try { - await InnerHandler.SendEventAsync(messages, cancellationToken).ConfigureAwait(false); + return InnerHandler.SendEventAsync(messages, cancellationToken); } catch (IotHubCommunicationException ex) when (ex.InnerException is OperationCanceledException) { @@ -1020,11 +1022,11 @@ internal async Task OnMethodCalledAsync(MethodRequestInternal methodRequestInter } } - internal async Task SendMethodResponseAsync(MethodResponseInternal methodResponse, CancellationToken cancellationToken) + internal Task SendMethodResponseAsync(MethodResponseInternal methodResponse, CancellationToken cancellationToken) { try { - await InnerHandler.SendMethodResponseAsync(methodResponse, cancellationToken).ConfigureAwait(false); + return InnerHandler.SendMethodResponseAsync(methodResponse, cancellationToken); } catch (IotHubCommunicationException ex) when (ex.InnerException is OperationCanceledException) { @@ -1165,12 +1167,12 @@ public async Task GetTwinAsync() /// For the complete device twin object, use Microsoft.Azure.Devices.RegistryManager.GetTwinAsync(string deviceId). /// /// The device twin object for the current device - public async Task GetTwinAsync(CancellationToken cancellationToken) + public Task GetTwinAsync(CancellationToken cancellationToken) { - // Codes_SRS_DEVICECLIENT_18_001: `GetTwinAsync` shall call `SendTwinGetAsync` on the transport to get the twin state + // `GetTwinAsync` shall call `SendTwinGetAsync` on the transport to get the twin state. try { - return await InnerHandler.SendTwinGetAsync(cancellationToken).ConfigureAwait(false); + return InnerHandler.SendTwinGetAsync(cancellationToken); } catch (IotHubCommunicationException ex) when (ex.InnerException is OperationCanceledException) { @@ -1203,18 +1205,18 @@ public async Task UpdateReportedPropertiesAsync(TwinCollection reportedPropertie /// /// Reported properties to push /// A cancellation token that can be used by other objects or threads to receive notice of cancellation. - public async Task UpdateReportedPropertiesAsync(TwinCollection reportedProperties, CancellationToken cancellationToken) + public Task UpdateReportedPropertiesAsync(TwinCollection reportedProperties, CancellationToken cancellationToken) { - // Codes_SRS_DEVICECLIENT_18_006: `UpdateReportedPropertiesAsync` shall throw an `ArgumentNull` exception if `reportedProperties` is null + // `UpdateReportedPropertiesAsync` shall throw an `ArgumentNull` exception if `reportedProperties` is null. if (reportedProperties == null) { throw new ArgumentNullException(nameof(reportedProperties)); } - // Codes_SRS_DEVICECLIENT_18_002: `UpdateReportedPropertiesAsync` shall call `SendTwinPatchAsync` on the transport to update the reported properties + // `UpdateReportedPropertiesAsync` shall call `SendTwinPatchAsync` on the transport to update the reported properties. try { - await InnerHandler.SendTwinPatchAsync(reportedProperties, cancellationToken).ConfigureAwait(false); + return InnerHandler.SendTwinPatchAsync(reportedProperties, cancellationToken); } catch (IotHubCommunicationException ex) when (ex.InnerException is OperationCanceledException) { @@ -1281,12 +1283,12 @@ public async Task ReceiveAsync(TimeSpan timeout) /// /// A cancellation token that can be used by other objects or threads to receive notice of cancellation. /// The receive message or null if there was no message until the default timeout - public async Task ReceiveAsync(CancellationToken cancellationToken) + public Task ReceiveAsync(CancellationToken cancellationToken) { - // Codes_SRS_DEVICECLIENT_28_011: [The asynchronous operation shall retry until time specified in OperationTimeoutInMilliseconds property expire or unrecoverable (authentication, quota exceed) error occurs.] + // The asynchronous operation shall retry until time specified in OperationTimeoutInMilliseconds property expire or unrecoverable (authentication, quota exceed) error occurs. try { - return await InnerHandler.ReceiveAsync(cancellationToken).ConfigureAwait(false); + return InnerHandler.ReceiveAsync(cancellationToken); } catch (IotHubCommunicationException ex) when (ex.InnerException is OperationCanceledException) { @@ -1454,7 +1456,7 @@ public Task UploadToBlobAsync(string blobName, Stream source) /// A cancellation token that can be used by other objects or threads to receive notice of cancellation. /// AsncTask [Obsolete("This API has been split into three APIs: GetFileUploadSasUri, uploading to blob directly using the Azure Storage SDK, and CompleteFileUploadAsync")] - public async Task UploadToBlobAsync(string blobName, Stream source, CancellationToken cancellationToken) + public Task UploadToBlobAsync(string blobName, Stream source, CancellationToken cancellationToken) { try { @@ -1480,9 +1482,8 @@ public async Task UploadToBlobAsync(string blobName, Stream source, Cancellation throw Fx.Exception.Argument(nameof(blobName), "Path segment count cannot exceed 254"); } - await _fileUploadHttpTransportHandler - .UploadToBlobAsync(blobName, source, cancellationToken) - .ConfigureAwait(false); + return _fileUploadHttpTransportHandler + .UploadToBlobAsync(blobName, source, cancellationToken); } catch (IotHubCommunicationException ex) when (ex.InnerException is OperationCanceledException) { @@ -1530,7 +1531,7 @@ public async Task SendEventAsync(string outputName, Message message) /// The message to send /// A cancellation token /// The message containing the event - public async Task SendEventAsync(string outputName, Message message, CancellationToken cancellationToken) + public Task SendEventAsync(string outputName, Message message, CancellationToken cancellationToken) { try { @@ -1557,7 +1558,7 @@ public async Task SendEventAsync(string outputName, Message message, Cancellatio message.SystemProperties.Add(MessageSystemPropertyNames.OutputName, outputName); // Codes_SRS_DEVICECLIENT_10_011: [The `SendEventAsync` operation shall retry sending `message` until the `BaseClient::RetryStrategy` timespan expires or unrecoverable error(authentication or quota exceed) occurs.] - await InnerHandler.SendEventAsync(message, cancellationToken).ConfigureAwait(false); + return InnerHandler.SendEventAsync(message, cancellationToken); } finally { @@ -1596,7 +1597,7 @@ public async Task SendEventBatchAsync(string outputName, IEnumerable me /// A list of one or more messages to send /// /// The task containing the event - public async Task SendEventBatchAsync(string outputName, IEnumerable messages, CancellationToken cancellationToken) + public Task SendEventBatchAsync(string outputName, IEnumerable messages, CancellationToken cancellationToken) { try { @@ -1624,7 +1625,7 @@ public async Task SendEventBatchAsync(string outputName, IEnumerable me messagesList.ForEach(m => m.SystemProperties.Add(MessageSystemPropertyNames.OutputName, outputName)); // Codes_SRS_DEVICECLIENT_10_014: [The `SendEventBachAsync` operation shall retry sending `messages` until the `BaseClient::RetryStrategy` timespan expires or unrecoverable error(authentication or quota exceed) occurs.] - await InnerHandler.SendEventAsync(messagesList, cancellationToken).ConfigureAwait(false); + return InnerHandler.SendEventAsync(messagesList, cancellationToken); } finally {