Skip to content

Commit

Permalink
fix #61 async context with nng_ctx more accessible
Browse files Browse the repository at this point in the history
- Previously needed downcast to ICtx that could fail to access `Ctx` property
- Now IAsyncContext implementations with nng_ctx implement IHasCtx interface that provides at compile-time rather than requiring runtime cast
- Mark ICtx obsolete so it can be deprecated and removed
- Surveyor needed new AsyncContext interface because it is like ISendReceiveX but also IHasCtx (has nng_ctx)
  • Loading branch information
jeikabu committed Jul 21, 2020
1 parent 6d9e70a commit 2d24038
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 42 deletions.
7 changes: 5 additions & 2 deletions nng.NETCore/Factory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ public NngResult<ISendReceiveAsyncContext<IMessage>> CreateSendReceiveAsyncConte
case SendReceiveContextSubtype.Bus:
case SendReceiveContextSubtype.Pair:
return SendReceiveAsyncContext<IMessage>.Create(this, socket);
case SendReceiveContextSubtype.Survey:
return SurveyAsyncContext<IMessage>.Create(this, socket);
default:
return NngResult<ISendReceiveAsyncContext<IMessage>>.Err(NngErrno.EINVAL);
}
Expand All @@ -80,6 +78,11 @@ public NngResult<IRepReqAsyncContext<IMessage>> CreateRepReqAsyncContext(ISocket
{
return RepAsyncCtx<IMessage>.Create(this, socket);
}

public NngResult<ISurveyorAsyncContext<IMessage>> CreateSurveyorAsyncContext(ISocket socket)
{
return SurveyAsyncContext<IMessage>.Create(this, socket);
}
#endregion

#region IMiscFactory
Expand Down
6 changes: 6 additions & 0 deletions nng.NETCore/ReqRepAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ public static NngResult<IReqRepAsyncContext<T>> Create(IMessageFactory<T> factor
return NngResult<IReqRepAsyncContext<T>>.Fail(res);
}

public NngResult<Unit> SetResendTime(int msTimeout)
{
var res = nng_ctx_setopt_ms(Ctx.NngCtx, nng.Native.Defines.NNG_OPT_REQ_RESENDTIME, new nng_duration { TimeMs = msTimeout });
return Unit.OkIfZero(res);
}

/// <summary>
/// Send the specified message.
/// </summary>
Expand Down
10 changes: 5 additions & 5 deletions nng.NETCore/SurveyAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ namespace nng
/// There can only be one survey at a time. Responses received when there is no outstanding survey are discarded.
/// </summary>
/// <typeparam name="T"></typeparam>
public class SurveyAsyncContext<T> : AsyncBase<T>, ISendReceiveAsyncContext<T>, ICtx
public class SurveyAsyncContext<T> : AsyncBase<T>, ISurveyorAsyncContext<T>
{
public INngCtx Ctx { get; protected set; }

public static NngResult<ISendReceiveAsyncContext<T>> Create(IMessageFactory<T> factory, ISocket socket)
public static NngResult<ISurveyorAsyncContext<T>> Create(IMessageFactory<T> factory, ISocket socket)
{
var context = new SurveyAsyncContext<T> { Factory = factory, Socket = socket };
var res = context.InitAio();
Expand All @@ -30,11 +30,11 @@ public static NngResult<ISendReceiveAsyncContext<T>> Create(IMessageFactory<T> f
if (ctx.IsOk())
{
context.Ctx = ctx.Ok();
return NngResult<ISendReceiveAsyncContext<T>>.Ok(context);
return NngResult<ISurveyorAsyncContext<T>>.Ok(context);
}
return NngResult<ISendReceiveAsyncContext<T>>.Err(ctx.Err());
return NngResult<ISurveyorAsyncContext<T>>.Err(ctx.Err());
}
return NngResult<ISendReceiveAsyncContext<T>>.Fail(res);
return NngResult<ISurveyorAsyncContext<T>>.Fail(res);
}

/// <summary>
Expand Down
3 changes: 2 additions & 1 deletion nng.NETCore/nng.NETCore.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
Assemblies for targetted frameworks are in runtimes/ instead of lib/ or ref/
See: https://docs.microsoft.com/en-us/nuget/reference/errors-and-warnings/nu5128#scenario-2
-->
<NoWarn>$(NoWarn);NU5128</NoWarn>
<!-- TODO: renable CS0618 after removing ICtx -->
<NoWarn>$(NoWarn);NU5128;CS0618</NoWarn>
</PropertyGroup>

<ItemGroup>
Expand Down
36 changes: 13 additions & 23 deletions nng.Shared/IAsyncContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,6 @@ namespace nng
// }
// }

// public interface IFactory
// {
// IReplySocket CreateRep();
// IRequestSocket CreateReq();
// }

// public class AsyncFactory : IFactory
// {
// public IReplySocket CreateRep()
// {

// }

// public IRequestSocket CreateReq()
// {

// }
// }

/// <summary>
/// Context for asynchronous nng operations. Most likely involves nng_aio, only involves nng_ctx if supported by protocol.
/// </summary>
Expand Down Expand Up @@ -79,16 +60,17 @@ public interface ISendReceiveAsyncContext<T> : ISendAsyncContext<T>, IReceiveAsy
/// Context with asynchronous request half of request/reply protocol
/// </summary>
/// <typeparam name="T"></typeparam>
public interface IReqRepAsyncContext<T> : IAsyncContext
public interface IReqRepAsyncContext<T> : IAsyncContext, IHasCtx
{
Task<NngResult<T>> Send(T message);
NngResult<Unit> SetResendTime(int msTimeout);
}

/// <summary>
/// Context with asynchronous reply half of request/reply protocol
/// </summary>
/// <typeparam name="T"></typeparam>
public interface IRepReqAsyncContext<T> : IAsyncContext
public interface IRepReqAsyncContext<T> : IAsyncContext, IHasCtx
{
Task<NngResult<T>> Receive();
Task<NngResult<Unit>> Reply(T message);
Expand All @@ -105,6 +87,14 @@ public interface ISubAsyncContext<T> : IReceiveAsyncContext<T>, ISubscriber
{
}

/// <summary>
/// Context with surveyor half of survey pattern
/// </summary>
/// <typeparam name="T"></typeparam>
public interface ISurveyorAsyncContext<T> : ISendReceiveAsyncContext<T>, IHasCtx
{
}

public static class AsyncContextExt
{
#region ISocket.CreateAsyncContext
Expand All @@ -116,8 +106,8 @@ public static class AsyncContextExt
public static NngResult<IReqRepAsyncContext<T>> CreateAsyncContext<T>(this IReqSocket socket, IAPIFactory<T> factory) => factory.CreateReqRepAsyncContext(socket);
public static NngResult<IRepReqAsyncContext<T>> CreateAsyncContext<T>(this IRepSocket socket, IAPIFactory<T> factory) => factory.CreateRepReqAsyncContext(socket);
public static NngResult<ISendReceiveAsyncContext<T>> CreateAsyncContext<T>(this IPairSocket socket, IAPIFactory<T> factory) => factory.CreateSendReceiveAsyncContext(socket, SendReceiveContextSubtype.Pair);
public static NngResult<ISendReceiveAsyncContext<T>> CreateAsyncContext<T>(this IRespondentSocket socket, IAPIFactory<T> factory) => factory.CreateSendReceiveAsyncContext(socket, SendReceiveContextSubtype.Survey);
public static NngResult<ISendReceiveAsyncContext<T>> CreateAsyncContext<T>(this ISurveyorSocket socket, IAPIFactory<T> factory) => factory.CreateSendReceiveAsyncContext(socket, SendReceiveContextSubtype.Survey);
public static NngResult<ISurveyorAsyncContext<T>> CreateAsyncContext<T>(this IRespondentSocket socket, IAPIFactory<T> factory) => factory.CreateSurveyorAsyncContext(socket);
public static NngResult<ISurveyorAsyncContext<T>> CreateAsyncContext<T>(this ISurveyorSocket socket, IAPIFactory<T> factory) => factory.CreateSurveyorAsyncContext(socket);
#endregion
}
}
9 changes: 7 additions & 2 deletions nng.Shared/ICtx.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@
namespace nng
{
/// <summary>
/// Context supporting nng_ctx
/// Represents object that has, is, or is like a `nng_ctx`.
/// </summary>
public interface ICtx
public interface IHasCtx
{
INngCtx Ctx { get; }
}

[Obsolete("Downcast to ICtx no longer needed, use IHasCtx")]
public interface ICtx : IHasCtx
{
}

/// <summary>
/// Context supporting nng_ctx
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion nng.Shared/IFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ public enum SendReceiveContextSubtype
{
Bus,
Pair,
Survey,
}

/// <summary>
Expand All @@ -53,6 +52,7 @@ public interface IAsyncContextFactory<T>
NngResult<ISubAsyncContext<T>> CreateSubAsyncContext(ISocket socket);
NngResult<IReqRepAsyncContext<T>> CreateReqRepAsyncContext(ISocket socket);
NngResult<IRepReqAsyncContext<T>> CreateRepReqAsyncContext(ISocket socket);
NngResult<ISurveyorAsyncContext<T>> CreateSurveyorAsyncContext(ISocket socket);
}

public interface IMiscFactory
Expand Down
36 changes: 32 additions & 4 deletions tests/CtxTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,43 @@ public void GetSetOpt()
using (var socket = Factory.ReplierOpen().ThenListen(url).Unwrap())
using (var rep = socket.CreateAsyncContext(Factory).Unwrap())
{
var ctx = (rep as ICtx).Ctx;
// TODO: remove this after deprecating ICtx
var _remove = (rep as ICtx).Ctx;
// Get a value, set a new value, get back the new value
Assert.Equal(0, ctx.GetOpt(NNG_OPT_RECVTIMEO, out nng_duration recvTimeout));
Assert.Equal(0, rep.Ctx.GetOpt(NNG_OPT_RECVTIMEO, out nng_duration recvTimeout));
var newResvTimeout = new nng_duration(recvTimeout);
++newResvTimeout.TimeMs;
Assert.Equal(0, ctx.SetOpt(NNG_OPT_RECVTIMEO, newResvTimeout));
ctx.GetOpt(NNG_OPT_RECVTIMEO, out nng_duration nextRecvTimeout);
Assert.Equal(0, rep.Ctx.SetOpt(NNG_OPT_RECVTIMEO, newResvTimeout));
rep.Ctx.GetOpt(NNG_OPT_RECVTIMEO, out nng_duration nextRecvTimeout);
Assert.Equal(newResvTimeout.TimeMs, nextRecvTimeout.TimeMs);
}
}

[Fact]
public async Task MustUseCtxGetSetOpt()
{
var url = UrlIpc();

// Setting receive timeout on nng_ctx works
using (var socket = Factory.ReplierOpen().ThenListen(url).Unwrap())
using (var rep = socket.CreateAsyncContext(Factory).Unwrap())
{
Assert.Equal(0, rep.Ctx.SetOpt(NNG_OPT_RECVTIMEO, nng_duration.Zero));
Assert.True((await rep.Receive()).Err() == NngErrno.ECLOSED);
}

// Setting receive timeout on socket doesn't timeout read from nng_ctx
var task = Task.Run(async () => {
using (var socket = Factory.ReplierOpen().ThenListen(url).Unwrap())
using (var rep = socket.CreateAsyncContext(Factory).Unwrap())
{
Assert.Equal(0, rep.Socket.SetOpt(NNG_OPT_RECVTIMEO, nng_duration.Zero));
(await rep.Receive()).Unwrap();
}
});
var timeout = Task.Delay(Util.ShortTestMs);
var first = await Task.WhenAny(Task.WhenAll(task), timeout);
Assert.Equal(timeout, first);
}
}
}
9 changes: 5 additions & 4 deletions tests/SurveyTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,11 @@ async Task DoSurveyorFail(string url)
{
// Receive with no survey fails
await Util.AssertThrowsNng(() => ctx.Receive(cts.Token), Defines.NngErrno.ESTATE);
// TODO: remove this after deprecating ICtx
var _asyncctx = (ctx as ICtx).Ctx;
// Survey with no responses times out
var asyncctx = (ctx as ICtx).Ctx;
// NB: when using nng_ctx must call ctx_setopt instead of (socket) setopt
asyncctx.SetOpt(Native.Defines.NNG_OPT_SURVEYOR_SURVEYTIME, new nng_duration { TimeMs = 10 });
ctx.Ctx.SetOpt(Native.Defines.NNG_OPT_SURVEYOR_SURVEYTIME, new nng_duration { TimeMs = 10 });
//ctx.Socket.SetOpt(Native.Defines.NNG_OPT_SURVEYOR_SURVEYTIME, new nng_duration { TimeMs = 10 });
await ctx.Send(Factory.CreateMessage());
await Util.AssertThrowsNng(() => ctx.Receive(cts.Token), Defines.NngErrno.ETIMEDOUT);
Expand Down Expand Up @@ -198,8 +199,8 @@ async Task DoContexts(string url)
{
using (var ctx = surveySocket.CreateAsyncContext(Factory).Unwrap())
{
(ctx as ICtx).Ctx.SetOpt(Native.Defines.NNG_OPT_RECVTIMEO, nng_duration.Infinite);
(ctx as ICtx).Ctx.SetOpt(Native.Defines.NNG_OPT_SURVEYOR_SURVEYTIME, nng_duration.Infinite);
ctx.Ctx.SetOpt(Native.Defines.NNG_OPT_RECVTIMEO, nng_duration.Infinite);
ctx.Ctx.SetOpt(Native.Defines.NNG_OPT_SURVEYOR_SURVEYTIME, nng_duration.Infinite);
await readyToDial.SignalAndWait();
await readyToSend.SignalAndWait();
Expand Down

0 comments on commit 2d24038

Please sign in to comment.