Skip to content

Commit

Permalink
Merge pull request #8 from dolittle/services-specifications
Browse files Browse the repository at this point in the history
Services specifications
  • Loading branch information
jakhog authored Sep 15, 2020
2 parents 5b68458 + 652671a commit a8ef04c
Show file tree
Hide file tree
Showing 36 changed files with 1,244 additions and 307 deletions.
467 changes: 241 additions & 226 deletions DotNET.SDK.sln

Large diffs are not rendered by default.

21 changes: 21 additions & 0 deletions Source/Services/DidNotReceiveConnectResponse.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) Dolittle. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;

namespace Dolittle.SDK.Services
{
/// <summary>
/// Exception that gets thrown when the server does not send a connect response message as the first message while performing a reverse call.
/// </summary>
public class DidNotReceiveConnectResponse : Exception
{
/// <summary>
/// Initializes a new instance of the <see cref="DidNotReceiveConnectResponse"/> class.
/// </summary>
public DidNotReceiveConnectResponse()
: base("The server did not respond with a connect response message as the first message.")
{
}
}
}
4 changes: 1 addition & 3 deletions Source/Services/ICanCallADuplexStreamingMethod.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@ namespace Dolittle.SDK.Services
/// <summary>
/// Represents a wrapper of a gRPC duplex streaming method call.
/// </summary>
/// <typeparam name="TClient">The type of generated gRPC client to use.</typeparam>
/// <typeparam name="TClientMessage">Type of the <see cref="IMessage">messages</see> that is sent from the client to the server.</typeparam>
/// <typeparam name="TServerMessage">Type of the <see cref="IMessage">messages</see> that is sent from the server to the client.</typeparam>
public interface ICanCallADuplexStreamingMethod<TClient, TClientMessage, TServerMessage>
where TClient : ClientBase<TClient>
public interface ICanCallADuplexStreamingMethod<TClientMessage, TServerMessage>
where TClientMessage : IMessage
where TServerMessage : IMessage
{
Expand Down
7 changes: 2 additions & 5 deletions Source/Services/ICreateReverseCallClients.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using Google.Protobuf;
using Grpc.Core;

namespace Dolittle.SDK.Services
{
Expand All @@ -18,20 +17,18 @@ public interface ICreateReverseCallClients
/// <param name="handler">The handler that will handle requests from the server.</param>
/// <param name="method">The method that will be called on the server to initiate the reverse call.</param>
/// <param name="converter">The converter that will be used to construct and deconstruct <typeparamref name="TClientMessage"/> and <typeparamref name="TServerMessage"/>.</param>
/// <typeparam name="TClient">The type of generated gRPC client to use.</typeparam>
/// <typeparam name="TClientMessage">Type of the <see cref="IMessage">messages</see> that is sent from the client to the server.</typeparam>
/// <typeparam name="TServerMessage">Type of the <see cref="IMessage">messages</see> that is sent from the server to the client.</typeparam>
/// <typeparam name="TConnectArguments">Type of the arguments that are sent along with the initial Connect call.</typeparam>
/// <typeparam name="TConnectResponse">Type of the response that is received after the initial Connect call.</typeparam>
/// <typeparam name="TRequest">Type of the requests sent from the server to the client.</typeparam>
/// <typeparam name="TResponse">Type of the responses received from the client.</typeparam>
/// <returns>A new reverse call client.</returns>
IReverseCallClient<TConnectArguments, TConnectResponse, TRequest, TResponse> Create<TClient, TClientMessage, TServerMessage, TConnectArguments, TConnectResponse, TRequest, TResponse>(
IReverseCallClient<TConnectArguments, TConnectResponse, TRequest, TResponse> Create<TClientMessage, TServerMessage, TConnectArguments, TConnectResponse, TRequest, TResponse>(
TConnectArguments arguments,
IReverseCallHandler<TRequest, TResponse> handler,
ICanCallADuplexStreamingMethod<TClient, TClientMessage, TServerMessage> method,
ICanCallADuplexStreamingMethod<TClientMessage, TServerMessage> method,
IConvertReverseCallMessages<TClientMessage, TServerMessage, TConnectArguments, TConnectResponse, TRequest, TResponse> converter)
where TClient : ClientBase<TClient>
where TClientMessage : IMessage
where TServerMessage : IMessage
where TConnectArguments : class
Expand Down
11 changes: 3 additions & 8 deletions Source/Services/IPerformMethodCalls.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Threading;
using Google.Protobuf;
using Grpc.Core;

namespace Dolittle.SDK.Services
{
Expand All @@ -16,15 +14,12 @@ public interface IPerformMethodCalls
/// <summary>
/// Performs the provided duplex streaming method call and sends the provided requests to the server.
/// </summary>
/// <param name="method">The <see cref="ICanCallADuplexStreamingMethod{TClient, TClientMessage, TServerMessage}">method</see> to call.</param>
/// <param name="method">The <see cref="ICanCallADuplexStreamingMethod{TClientMessage, TServerMessage}">method</see> to call.</param>
/// <param name="requests">An <see cref="IObservable{TClientMessage}"/> of requests to send.</param>
/// <param name="token">The <see cref="CancellationToken"/>.</param>
/// <typeparam name="TClient">The type of generated gRPC client to use.</typeparam>
/// <typeparam name="TClientMessage">Type of the <see cref="IMessage">messages</see> that is sent from the client to the server.</typeparam>
/// <typeparam name="TServerMessage">Type of the <see cref="IMessage">messages</see> that is sent from the server to the client.</typeparam>
/// <returns>An <see cref="IObservable{TServerMessage}"/> of response from the server.</returns>
IObservable<TServerMessage> Call<TClient, TClientMessage, TServerMessage>(ICanCallADuplexStreamingMethod<TClient, TClientMessage, TServerMessage> method, IObservable<TClientMessage> requests, CancellationToken token)
where TClient : ClientBase<TClient>
/// <returns>An <see cref="IObservable{TServerMessage}"/> of response from the server, that when subscribed to initiates the call.</returns>
IObservable<TServerMessage> Call<TClientMessage, TServerMessage>(ICanCallADuplexStreamingMethod<TClientMessage, TServerMessage> method, IObservable<TClientMessage> requests)
where TClientMessage : IMessage
where TServerMessage : IMessage;
}
Expand Down
60 changes: 30 additions & 30 deletions Source/Services/MethodCaller.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf;
Expand Down Expand Up @@ -40,49 +39,50 @@ public MethodCaller(string host, int port)
}

/// <inheritdoc/>
public IObservable<TServerMessage> Call<TClient, TClientMessage, TServerMessage>(ICanCallADuplexStreamingMethod<TClient, TClientMessage, TServerMessage> method, IObservable<TClientMessage> requests, CancellationToken token)
where TClient : ClientBase<TClient>
public IObservable<TServerMessage> Call<TClientMessage, TServerMessage>(ICanCallADuplexStreamingMethod<TClientMessage, TServerMessage> method, IObservable<TClientMessage> requests)
where TClientMessage : IMessage
where TServerMessage : IMessage
{
var call = method.Call(CreateChannel(), CreateCallOptions(token));
SendMessagesToServer(requests, call.RequestStream);
return ReceiveMessagesFromServer(call.ResponseStream, token);
}
=> Observable.Create<TServerMessage>((observer, token) =>
{
var tcs = CancellationTokenSource.CreateLinkedTokenSource(token);
var call = method.Call(CreateChannel(), CreateCallOptions(tcs.Token));
SendMessagesToServer(observer, requests, call.RequestStream, tcs);
return ReceiveAllMessagesFromServer(observer, call.ResponseStream, tcs.Token);
});

Channel CreateChannel() => new Channel(_host, _port, _channelCredentials, _channelOptions);

CallOptions CreateCallOptions(CancellationToken token) => new CallOptions(cancellationToken: token);

void SendMessagesToServer<T>(IObservable<T> messages, IClientStreamWriter<T> writer)
void SendMessagesToServer<TClientMessage, TServerMessage>(IObserver<TClientMessage> observer, IObservable<TServerMessage> messages, IClientStreamWriter<TServerMessage> writer, CancellationTokenSource tcs)
=> messages
.Select(message => Observable.FromAsync(() => writer.WriteAsync(message)))
.Concat()
.Concat(Observable.FromAsync(() => writer.CompleteAsync()));
.Concat(Observable.FromAsync(() => writer.CompleteAsync()))
.Subscribe(
_ => { },
error =>
{
observer.OnError(error);
tcs.Cancel();
});

IObservable<T> ReceiveMessagesFromServer<T>(IAsyncStreamReader<T> reader, CancellationToken token)
async Task ReceiveAllMessagesFromServer<T>(IObserver<T> observer, IAsyncStreamReader<T> reader, CancellationToken token)
{
var messages = new Subject<T>();

Task.Run(
async () =>
try
{
while (await reader.MoveNext(token).ConfigureAwait(false))
{
try
{
while (await reader.MoveNext(token).ConfigureAwait(false))
{
messages.OnNext(reader.Current);
}
}
catch (Exception ex)
{
messages.OnError(ex);
}

messages.OnCompleted();
});
observer.OnNext(reader.Current);
}
}
catch (Exception ex)
{
observer.OnError(ex);
return;
}

return messages;
observer.OnCompleted();
}
}
}
22 changes: 22 additions & 0 deletions Source/Services/PingTimedOut.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) Dolittle. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;

namespace Dolittle.SDK.Services
{
/// <summary>
/// Exception that gets thrown when the server does not send a ping within the specified ping interval on an open reverse call.
/// </summary>
public class PingTimedOut : Exception
{
/// <summary>
/// Initializes a new instance of the <see cref="PingTimedOut"/> class.
/// </summary>
/// <param name="timeSpan">The interval which the client asked the server to use between ping messages.</param>
public PingTimedOut(TimeSpan timeSpan)
: base($"Timed out while waiting for a ping from the server. Expected a ping every {timeSpan.TotalSeconds} seconds.")
{
}
}
}
70 changes: 40 additions & 30 deletions Source/Services/ReverseCallClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
Expand All @@ -11,32 +12,29 @@
using Dolittle.Services.Contracts;
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using Microsoft.Extensions.Logging;

namespace Dolittle.SDK.Services
{
/// <summary>
/// An implementation of <see cref="IReverseCallClient{TConnectArguments, TConnectResponse, TRequest, TResponse}"/>.
/// </summary>
/// <typeparam name="TClient">The type of generated gRPC client to use.</typeparam>
/// <typeparam name="TClientMessage">Type of the <see cref="IMessage">messages</see> that is sent from the client to the server.</typeparam>
/// <typeparam name="TServerMessage">Type of the <see cref="IMessage">messages</see> that is sent from the server to the client.</typeparam>
/// <typeparam name="TConnectArguments">Type of the arguments that are sent along with the initial Connect call.</typeparam>
/// <typeparam name="TConnectResponse">Type of the response that is received after the initial Connect call.</typeparam>
/// <typeparam name="TRequest">Type of the requests sent from the server to the client.</typeparam>
/// <typeparam name="TResponse">Type of the responses received from the client.</typeparam>
public class ReverseCallClient<TClient, TClientMessage, TServerMessage, TConnectArguments, TConnectResponse, TRequest, TResponse>
public class ReverseCallClient<TClientMessage, TServerMessage, TConnectArguments, TConnectResponse, TRequest, TResponse>
: IReverseCallClient<TConnectArguments, TConnectResponse, TRequest, TResponse>
where TClient : ClientBase<TClient>
where TClientMessage : IMessage
where TServerMessage : IMessage
where TConnectArguments : class
where TConnectResponse : class
where TRequest : class
where TResponse : class
{
readonly ICanCallADuplexStreamingMethod<TClient, TClientMessage, TServerMessage> _method;
readonly ICanCallADuplexStreamingMethod<TClientMessage, TServerMessage> _method;
readonly IConvertReverseCallMessages<TClientMessage, TServerMessage, TConnectArguments, TConnectResponse, TRequest, TResponse> _converter;
readonly TimeSpan _pingInterval;
readonly IPerformMethodCalls _caller;
Expand All @@ -45,7 +43,7 @@ public class ReverseCallClient<TClient, TClientMessage, TServerMessage, TConnect
readonly IObservable<TConnectResponse> _observable;

/// <summary>
/// Initializes a new instance of the <see cref="ReverseCallClient{TClient, TClientMessage, TServerMessage, TConnectArguments, TConnectResponse, TRequest, TResponse}"/> class.
/// Initializes a new instance of the <see cref="ReverseCallClient{TClientMessage, TServerMessage, TConnectArguments, TConnectResponse, TRequest, TResponse}"/> class.
/// </summary>
/// <param name="arguments">The <typeparamref name="TConnectArguments"/> to send to the server to start the reverse call protocol.</param>
/// <param name="handler">The handler that will handle requests from the server.</param>
Expand All @@ -58,7 +56,7 @@ public class ReverseCallClient<TClient, TClientMessage, TServerMessage, TConnect
public ReverseCallClient(
TConnectArguments arguments,
IReverseCallHandler<TRequest, TResponse> handler,
ICanCallADuplexStreamingMethod<TClient, TClientMessage, TServerMessage> method,
ICanCallADuplexStreamingMethod<TClientMessage, TServerMessage> method,
IConvertReverseCallMessages<TClientMessage, TServerMessage, TConnectArguments, TConnectResponse, TRequest, TResponse> converter,
TimeSpan pingInterval,
IPerformMethodCalls caller,
Expand Down Expand Up @@ -87,17 +85,9 @@ public IDisposable Subscribe(IObserver<TConnectResponse> observer)
=> _observable.Subscribe(observer);

IObservable<TConnectResponse> CreateObservable()
=> Observable.Create<TConnectResponse>((observer, token) =>
=> Observable.Create<TConnectResponse>((observer) =>
{
var toServerMessages = new Subject<TClientMessage>();
var toClientMessages = _caller.Call(_method, toServerMessages, token);

var connectArguments = Arguments;
var connectContext = CreateReverseCallArgumentsContext();
_converter.SetConnectArgumentsContextIn(connectContext, connectArguments);
var connectMessage = _converter.CreateMessageFrom(connectArguments);

toServerMessages.OnNext(connectMessage);
var toClientMessages = new Subject<TServerMessage>();

var validMessages = toClientMessages.Skip(1).Where(MessageIsPingOrRequest).Timeout(_pingInterval * 3);
var pings = validMessages.Where(MessageIsPing);
Expand All @@ -108,22 +98,42 @@ IObservable<TConnectResponse> CreateObservable()
.Select(_ => new Pong())
.Select(_converter.CreateMessageFrom);

var responses = new Subject<TResponse>();
requests
var responses = requests
.Select(_converter.GetRequestFrom)
.Where(RequestIsValid)
.Subscribe((request) =>
Task.Run(() => HandleRequest(request, responses, token), token));
.Select(request => Observable.FromAsync((token) => HandleRequest(request, token)))
.Merge()
.Select(_converter.CreateMessageFrom);

var connectArguments = Arguments;
var connectContext = CreateReverseCallArgumentsContext();
_converter.SetConnectArgumentsContextIn(connectContext, connectArguments);
var connectMessage = _converter.CreateMessageFrom(connectArguments);

pongs
.Merge(responses.Select(_converter.CreateMessageFrom))
.Subscribe(toServerMessages);
var toServerMessages = pongs.Merge(responses).StartWith(connectMessage);

var connectResponse = toClientMessages.FirstAsync().Select(_converter.GetConnectResponseFrom);
var errorsAndCompletion = toClientMessages.Where(_ => false).Select(_converter.GetConnectResponseFrom);
connectResponse.Merge(errorsAndCompletion).Subscribe(observer);
var connectResponse = toClientMessages
.Take(1)
.Select(_ =>
{
var response = _converter.GetConnectResponseFrom(_);
if (response == null)
{
return Notification.CreateOnError<TConnectResponse>(new DidNotReceiveConnectResponse());
}

return Notification.CreateOnNext(response);
})
.DefaultIfEmpty(Notification.CreateOnError<TConnectResponse>(new DidNotReceiveConnectResponse()))
.Dematerialize();

return Task.CompletedTask;
var errorsAndCompletion = toClientMessages
.Where(_ => false)
.Select(_converter.GetConnectResponseFrom)
.Catch((TimeoutException _) => Observable.Throw<TConnectResponse>(new PingTimedOut(_pingInterval)));

connectResponse.Merge(errorsAndCompletion).Subscribe(observer);
return _caller.Call(_method, toServerMessages).Subscribe(toClientMessages);
});

ReverseCallArgumentsContext CreateReverseCallArgumentsContext()
Expand Down Expand Up @@ -168,7 +178,7 @@ bool RequestIsValid(TRequest request)
return true;
}

async Task HandleRequest(TRequest request, Subject<TResponse> responses, CancellationToken token)
async Task<TResponse> HandleRequest(TRequest request, CancellationToken token)
{
var requestContext = _converter.GetRequestContextFrom(request);
var executionContext = requestContext.ExecutionContext.ToExecutionContext();
Expand All @@ -183,7 +193,7 @@ async Task HandleRequest(TRequest request, Subject<TResponse> responses, Cancell
var responseContext = new ReverseCallResponseContext { CallId = requestContext.CallId };
_converter.SetResponseContextIn(responseContext, response);

responses.OnNext(response);
return response;
}
}
}
Loading

0 comments on commit a8ef04c

Please sign in to comment.