Skip to content

Commit

Permalink
fix: Fix to Task stored in TaskWorker, which should correspond to the…
Browse files Browse the repository at this point in the history
… awaited func to trigger IsCompleted correctly in WebSocketClient::ConnectAsync(). (#57)

Add a semaphore to avoid multi-thread issues that can lead to ObjectDisposedException (and possibly other issues)
  • Loading branch information
Wennemyr authored Jan 5, 2024
1 parent 8b04f80 commit 842a7f6
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 20 deletions.
2 changes: 1 addition & 1 deletion src/libs/H.WebSockets/Utilities/TaskWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void Start(Func<CancellationToken, Task> func)
}

OnCompleted();
}, CancellationTokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}, CancellationTokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default).Unwrap();
}

/// <summary>
Expand Down
51 changes: 32 additions & 19 deletions src/libs/H.WebSockets/WebSocketClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public sealed partial class WebSocketClient : IDisposable
///
/// </summary>
public ClientWebSocket Socket { get; private set; } = new();
private readonly SemaphoreSlim SocketSemaphore = new(1, 1);

/// <summary>
///
Expand Down Expand Up @@ -95,35 +96,45 @@ public WebSocketClient()
/// <returns></returns>
public async Task ConnectAsync(Uri? uri, CancellationToken cancellationToken = default)
{
if (IsConnected)
{
return;
}
await SocketSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);

Socket = Socket ?? throw new ObjectDisposedException(nameof(Socket));
if (Socket.State != WebSocketState.None)
try
{
Socket.Dispose();
Socket = new ClientWebSocket();
}

LastConnectUri = uri ?? throw new ArgumentNullException(nameof(uri));
if (IsConnected)
{
return;
}

Socket = Socket ?? throw new ObjectDisposedException(nameof(Socket));
if (Socket.State != WebSocketState.None)
{
Socket.Dispose();
Socket = new ClientWebSocket();
}

LastConnectUri = uri ?? throw new ArgumentNullException(nameof(uri));

#if NETSTANDARD2_1 || NET5_0_OR_GREATER
if (RemoteCertificateValidationCallback != null)
{
Socket.Options.RemoteCertificateValidationCallback += RemoteCertificateValidationCallback;
}
if (RemoteCertificateValidationCallback != null)
{
Socket.Options.RemoteCertificateValidationCallback += RemoteCertificateValidationCallback;
}
#endif

await Socket.ConnectAsync(uri, cancellationToken).ConfigureAwait(false);
await Socket.ConnectAsync(uri, cancellationToken).ConfigureAwait(false);

if (ReceiveWorker.Task.IsCompleted)
if (ReceiveWorker.Task.IsCompleted)
{
ReceiveWorker.Start(async token => await ReceiveAsync(token).ConfigureAwait(false));
}

OnConnected();
}
finally
{
ReceiveWorker.Start(async token => await ReceiveAsync(token).ConfigureAwait(false));
SocketSemaphore.Release();
}

OnConnected();
}

/// <summary>
Expand Down Expand Up @@ -287,6 +298,7 @@ public void Dispose()
{
ReceiveWorker.Dispose();
Socket.Dispose();
SocketSemaphore.Dispose();
}

#if NETSTANDARD2_1 || NET5_0_OR_GREATER
Expand All @@ -298,6 +310,7 @@ public async ValueTask DisposeAsync()
{
await ReceiveWorker.DisposeAsync().ConfigureAwait(false);
Socket.Dispose();
SocketSemaphore.Dispose();
}
#endif

Expand Down

0 comments on commit 842a7f6

Please sign in to comment.