Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow accessing send queue count from WebSocketConnection #1156

Merged
merged 2 commits into from
Oct 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/Transports.AspNetCore/WebSockets/AsyncMessagePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,21 @@ internal class AsyncMessagePump<T>
private readonly Func<T, Task> _callback;
private readonly Queue<ValueTask<T>> _queue = new();

/// <summary>
/// Returns the number of messages in the queue.
/// This count includes any message currently being processed.
/// </summary>
public int Count
{
get
{
lock (_queue)
{
return _queue.Count;
}
}
}

/// <summary>
/// Initializes a new instance with the specified asynchronous callback delegate.
/// </summary>
Expand Down
9 changes: 8 additions & 1 deletion src/Transports.AspNetCore/WebSockets/WebSocketConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ public class WebSocketConnection : IWebSocketConnection
/// <inheritdoc/>
public HttpContext HttpContext { get; }

/// <summary>
/// Returns the number of packets waiting in the send queue, including
/// messages, keep-alive packets, and the close message.
/// This count includes any packet currently being processed.
/// </summary>
protected int SendQueueCount => _pump.Count;

/// <summary>
/// Initializes an instance with the specified parameters.
/// </summary>
Expand Down Expand Up @@ -218,7 +225,7 @@ public Task CloseAsync(int eventId, string? description)
/// <remarks>
/// The message is posted to a queue and execution returns immediately.
/// </remarks>
public Task SendMessageAsync(OperationMessage message)
public virtual Task SendMessageAsync(OperationMessage message)
{
_pump.Post(new Message { OperationMessage = message });
return Task.CompletedTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ namespace GraphQL.Server.Transports.AspNetCore.WebSockets
public Microsoft.AspNetCore.Http.HttpContext HttpContext { get; }
public System.DateTime LastMessageSentAt { get; }
public System.Threading.CancellationToken RequestAborted { get; }
protected int SendQueueCount { get; }
public System.Threading.Tasks.Task CloseAsync() { }
public System.Threading.Tasks.Task CloseAsync(int eventId, string? description) { }
public virtual void Dispose() { }
Expand All @@ -348,7 +349,7 @@ namespace GraphQL.Server.Transports.AspNetCore.WebSockets
protected virtual System.Threading.Tasks.Task OnDispatchMessageAsync(GraphQL.Server.Transports.AspNetCore.WebSockets.IOperationMessageProcessor operationMessageProcessor, GraphQL.Transport.OperationMessage message) { }
protected virtual System.Threading.Tasks.Task OnNonGracefulShutdownAsync(bool receivedCloseMessage, bool sentCloseMessage) { }
protected virtual System.Threading.Tasks.Task OnSendMessageAsync(GraphQL.Transport.OperationMessage message) { }
public System.Threading.Tasks.Task SendMessageAsync(GraphQL.Transport.OperationMessage message) { }
public virtual System.Threading.Tasks.Task SendMessageAsync(GraphQL.Transport.OperationMessage message) { }
}
}
namespace GraphQL.Server.Transports.AspNetCore.WebSockets.GraphQLWs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ namespace GraphQL.Server.Transports.AspNetCore.WebSockets
public Microsoft.AspNetCore.Http.HttpContext HttpContext { get; }
public System.DateTime LastMessageSentAt { get; }
public System.Threading.CancellationToken RequestAborted { get; }
protected int SendQueueCount { get; }
public System.Threading.Tasks.Task CloseAsync() { }
public System.Threading.Tasks.Task CloseAsync(int eventId, string? description) { }
public virtual void Dispose() { }
Expand All @@ -366,7 +367,7 @@ namespace GraphQL.Server.Transports.AspNetCore.WebSockets
protected virtual System.Threading.Tasks.Task OnDispatchMessageAsync(GraphQL.Server.Transports.AspNetCore.WebSockets.IOperationMessageProcessor operationMessageProcessor, GraphQL.Transport.OperationMessage message) { }
protected virtual System.Threading.Tasks.Task OnNonGracefulShutdownAsync(bool receivedCloseMessage, bool sentCloseMessage) { }
protected virtual System.Threading.Tasks.Task OnSendMessageAsync(GraphQL.Transport.OperationMessage message) { }
public System.Threading.Tasks.Task SendMessageAsync(GraphQL.Transport.OperationMessage message) { }
public virtual System.Threading.Tasks.Task SendMessageAsync(GraphQL.Transport.OperationMessage message) { }
}
}
namespace GraphQL.Server.Transports.AspNetCore.WebSockets.GraphQLWs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ namespace GraphQL.Server.Transports.AspNetCore.WebSockets
public Microsoft.AspNetCore.Http.HttpContext HttpContext { get; }
public System.DateTime LastMessageSentAt { get; }
public System.Threading.CancellationToken RequestAborted { get; }
protected int SendQueueCount { get; }
public System.Threading.Tasks.Task CloseAsync() { }
public System.Threading.Tasks.Task CloseAsync(int eventId, string? description) { }
public virtual void Dispose() { }
Expand All @@ -348,7 +349,7 @@ namespace GraphQL.Server.Transports.AspNetCore.WebSockets
protected virtual System.Threading.Tasks.Task OnDispatchMessageAsync(GraphQL.Server.Transports.AspNetCore.WebSockets.IOperationMessageProcessor operationMessageProcessor, GraphQL.Transport.OperationMessage message) { }
protected virtual System.Threading.Tasks.Task OnNonGracefulShutdownAsync(bool receivedCloseMessage, bool sentCloseMessage) { }
protected virtual System.Threading.Tasks.Task OnSendMessageAsync(GraphQL.Transport.OperationMessage message) { }
public System.Threading.Tasks.Task SendMessageAsync(GraphQL.Transport.OperationMessage message) { }
public virtual System.Threading.Tasks.Task SendMessageAsync(GraphQL.Transport.OperationMessage message) { }
}
}
namespace GraphQL.Server.Transports.AspNetCore.WebSockets.GraphQLWs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@ public Task Do_OnCloseOutputAsync(WebSocketCloseStatus closeStatus, string? clos

public TimeSpan Get_DefaultDisconnectionTimeout
=> DefaultDisconnectionTimeout;

public int Get_SendQueueCount => base.SendQueueCount;
}
Original file line number Diff line number Diff line change
Expand Up @@ -597,18 +597,44 @@ public async Task CloseConnectionAsync_Specific()
public async Task SendMessageAsync()
{
var message = new OperationMessage();
_mockConnection.Setup(x => x.SendMessageAsync(It.IsAny<OperationMessage>())).CallBase().Verifiable();
_mockConnection.Protected().Setup<Task>("OnSendMessageAsync", message)
.Returns(Task.CompletedTask).Verifiable();
await _connection.SendMessageAsync(message);
_mockConnection.Verify();
}

[Fact]
public async Task MessageCountAsync()
{
var tc = new TaskCompletionSource<bool>();
var message = new OperationMessage();
_mockConnection.Setup(x => x.SendMessageAsync(It.IsAny<OperationMessage>())).CallBase().Verifiable();
_mockConnection.Protected().Setup<Task>("OnSendMessageAsync", message)
.Returns(tc.Task).Verifiable();
await _connection.SendMessageAsync(message);
_connection.Get_SendQueueCount.ShouldBe(1);
await _connection.SendMessageAsync(message);
_connection.Get_SendQueueCount.ShouldBe(2);
tc.SetResult(true);
for (int i = 0; i < 100; i++)
{
if (_connection.Get_SendQueueCount != 0)
await Task.Delay(100);
else
break;
}
_connection.Get_SendQueueCount.ShouldBe(0);
_mockConnection.Verify();
}

[Fact]
public async Task LastMessageSentAt()
{
var oldTime = _connection.LastMessageSentAt;
await Task.Delay(100);
var message = new OperationMessage();
_mockConnection.Setup(x => x.SendMessageAsync(It.IsAny<OperationMessage>())).CallBase().Verifiable();
_mockConnection.Protected().Setup<Task>("OnSendMessageAsync", message)
.Returns(Task.CompletedTask).Verifiable();
await _connection.SendMessageAsync(message);
Expand All @@ -623,6 +649,7 @@ public async Task DoNotSendMessagesAfterOutputIsClosed()
{
// send a message
var message = new OperationMessage();
_mockConnection.Setup(x => x.SendMessageAsync(It.IsAny<OperationMessage>())).CallBase().Verifiable();
_mockConnection.Protected().SetupGet<TimeSpan>("DefaultDisconnectionTimeout").CallBase().Verifiable();
_mockConnection.Protected().Setup<Task>("OnSendMessageAsync", message)
.Returns(Task.CompletedTask).Verifiable();
Expand Down
Loading