Skip to content

Commit

Permalink
Fix dispatch service not being able to have multiple channels per target
Browse files Browse the repository at this point in the history
  • Loading branch information
Kaioru committed Aug 20, 2024
1 parent 90bf31a commit b2be1f7
Show file tree
Hide file tree
Showing 11 changed files with 103 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using Edelstein.Protocol.Gameplay;
using Edelstein.Protocol.Network.Transports;
using Edelstein.Protocol.Plugin;
using Edelstein.Protocol.Services.Dispatch;
using Edelstein.Protocol.Services.Server;
using MapsterMapper;
using Microsoft.Extensions.Configuration;
Expand Down Expand Up @@ -51,6 +52,7 @@ IConfiguration config
subProvider.GetRequiredService<TServerInfo>(),
version,
p.GetRequiredService<IPluginManager<TContext>>(),
p.GetRequiredService<IDispatchService>(),
subProvider.GetRequiredService<TContext>()
);
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
using System.IO;
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Edelstein.Application.Server.Bindings;
using Edelstein.Common.Network.DotNetty.Transports;
using Edelstein.Protocol.Gameplay;
using Edelstein.Protocol.Network.Transports;
using Edelstein.Protocol.Plugin;
using Edelstein.Protocol.Services.Dispatch;
using Edelstein.Protocol.Services.Dispatch.Contracts;
using Edelstein.Protocol.Services.Server;
using Grpc.Core;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using R3;

namespace Edelstein.Application.Server.Services;

Expand All @@ -20,12 +25,14 @@ public class SystemHostService<TStageSystem, TStageSystemUser, TContext>(
IServerInfo info,
TransportVersion version,
IPluginManager<TContext> plugins,
IDispatchService dispatch,
TContext context
) : IHostedService
where TStageSystem : IStageSystem<TStageSystem, TStageSystemUser>
where TStageSystemUser : class, IStageSystemUser<TStageSystem, TStageSystemUser>
{
private ITransportContext? Context { get; set; }
private IDisposable? Subscription { get; set; }

public async Task StartAsync(CancellationToken cancellationToken)
{
Expand All @@ -37,6 +44,22 @@ public async Task StartAsync(CancellationToken cancellationToken)
system,
system
).Accept(info.Host, info.Port);

var request = new DispatchServiceSubscribeRequest
{
ServerID = info.ID
};

Subscription = dispatch
.Subscribe(request)
.ToObservable()
.Select(i =>
{
// TODO
Console.WriteLine(i);
return Task.CompletedTask;
})
.Subscribe();

logger.LogSystemHostServiceStarted(
info.ID,
Expand All @@ -51,6 +74,7 @@ public async Task StopAsync(CancellationToken cancellationToken)

if (Context != null)
await Context.Close();
Subscription?.Dispose();

logger.LogSystemHostServiceStopped(info.ID);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Linq;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Edelstein.Protocol.Services.Dispatch;
using Edelstein.Protocol.Services.Dispatch.Contracts;
Expand All @@ -11,68 +12,32 @@ public partial class DispatchService
{
public async Task<DispatchServiceResponse> Send(DispatchServiceSendRequest request, CallContext context = default)
{
switch (request.Info.TargetType)
var info = request.Info;
var targets = (await _repository.RetrieveAll()).AsEnumerable();

if (info.TargetServerID != null)
targets = targets.Where(t => t.ServerID == info.TargetServerID);

if (info.TargetWorldID.HasValue)
targets = targets.Where(t => t.WorldID == info.TargetWorldID);

if (info.TargetChannelID.HasValue)
targets = targets.Where(t => t.ChannelID == info.TargetChannelID);

if (info.TargetCharacterID.HasValue)
{
case DispatchTarget.All:
await Task.WhenAll(
(await _serverIdIndex.RetrieveAll())
.Select(async e
=> await e.Channel.WriteAsync(request.Info, context.CancellationToken)));
break;
case DispatchTarget.World:
var session = await sessions.GetByActiveCharacter(new SessionServiceGetByActiveCharacterRequest
{
var entry = await _worldIdIndex.Retrieve(request.Info.TargetID);
CharacterID = info.TargetCharacterID.Value
});

if (entry == null)
return new DispatchServiceResponse
{
Result = DispatchServiceResult.FailedUnknown
};

await entry.Channel.WriteAsync(request.Info, context.CancellationToken);
break;
}
case DispatchTarget.Channel:
{
var entry = await _channelIdIndex.Retrieve(request.Info.TargetID);

if (entry == null)
return new DispatchServiceResponse
{
Result = DispatchServiceResult.FailedUnknown
};

await entry.Channel.WriteAsync(request.Info, context.CancellationToken);
break;
}
case DispatchTarget.Character:
{
var session = await sessions.GetByActiveCharacter(new SessionServiceGetByActiveCharacterRequest
{
CharacterID = request.Info.TargetID
});

if (session.Info != null)
{
var entry = await _serverIdIndex.Retrieve(session.Info.ServerID);

if (entry == null)
return new DispatchServiceResponse
{
Result = DispatchServiceResult.FailedUnknown
};

await entry.Channel.WriteAsync(request.Info, context.CancellationToken);
}
break;
}
default:
return new DispatchServiceResponse
{
Result = DispatchServiceResult.FailedUnknown
};
if (session.Info != null)
targets = targets.Where(t => t.ServerID == session.Info.ServerID);
}

await Task.WhenAll(targets
.Select(async t
=> await t.Channel.WriteAsync(request.Info)));
return new DispatchServiceResponse
{
Result = DispatchServiceResult.Success
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Channels;
using Edelstein.Protocol.Services.Dispatch.Contracts;
using ProtoBuf.Grpc;
Expand All @@ -7,21 +10,35 @@ namespace Edelstein.Common.Services.Dispatch;

public partial class DispatchService
{
public async IAsyncEnumerable<DispatchInfo> Subscribe(DispatchServiceSubscribeRequest request, CallContext context = default)
public IAsyncEnumerable<DispatchInfo> Subscribe(DispatchServiceSubscribeRequest request, CallContext context = default)
=> Subscribe(request, context.CancellationToken);

private async IAsyncEnumerable<DispatchInfo> Subscribe(DispatchServiceSubscribeRequest request, [EnumeratorCancellation] CancellationToken token)
{
if (await _repository.Retrieve(request.ServerID) != null)
yield break;

var channel = Channel.CreateUnbounded<DispatchInfo>();
var subscription = new DispatchSubscription
{
Channel = channel,
ServerID = request.ServerID,
WorldID = request.WorldID,
ChannelID = request.ChannelID
};

await _serverIdIndex.Insert(new DispatchServiceEntry<string>(request.ServerID, channel));
if (request.WorldID.HasValue) await _worldIdIndex.Insert(new DispatchServiceEntry<int>(request.WorldID.Value, channel));
if (request.ChannelID.HasValue) await _channelIdIndex.Insert(new DispatchServiceEntry<int>(request.ChannelID.Value, channel));
await _repository.Insert(subscription);

while (!context.CancellationToken.IsCancellationRequested)
yield return await channel.Reader.ReadAsync(context.CancellationToken);

await _serverIdIndex.Delete(request.ServerID);
if (request.WorldID.HasValue) await _worldIdIndex.Delete(request.WorldID.Value);
if (request.ChannelID.HasValue) await _channelIdIndex.Delete(request.ChannelID.Value);

channel.Writer.Complete();
try
{
await foreach (var info in channel.Reader.ReadAllAsync(token))
yield return info;
}
finally
{
Console.WriteLine("CLEANED");
await _repository.Delete(subscription);
channel.Writer.Complete();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Edelstein.Protocol.Services.Dispatch;
using Edelstein.Common.Utilities.Repositories;
using Edelstein.Protocol.Services.Dispatch;
using Edelstein.Protocol.Services.Session;

namespace Edelstein.Common.Services.Dispatch;
Expand All @@ -7,7 +8,5 @@ public partial class DispatchService(
ISessionService sessions
) : IDispatchService
{
private readonly DispatchServiceEntryRepository<string> _serverIdIndex = new();
private readonly DispatchServiceEntryRepository<int> _worldIdIndex = new();
private readonly DispatchServiceEntryRepository<int> _channelIdIndex = new();
private readonly Repository<string, DispatchSubscription> _repository = new();
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System.Threading.Channels;
using Edelstein.Protocol.Services.Dispatch.Contracts;
using Edelstein.Protocol.Utilities.Repositories;

namespace Edelstein.Common.Services.Dispatch;

public record DispatchSubscription : DispatchServiceSubscribeRequest, IRepositoryEntry<string>
{
public string ID => ServerID;
public required ChannelWriter<DispatchInfo> Channel { get; init; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@ namespace Edelstein.Protocol.Services.Dispatch.Contracts;
[DataContract]
public record DispatchInfo : IDispatchInfo
{
[DataMember(Order = 1)]
public required DispatchTarget TargetType { get; init; }
[DataMember(Order = 1)] public string? TargetServerID { get; init; }
[DataMember(Order = 2)] public int? TargetWorldID { get; init; }
[DataMember(Order = 3)] public int? TargetChannelID { get; init; }
[DataMember(Order = 4)] public int? TargetCharacterID { get; init; }

[DataMember(Order = 2)]
public required int TargetID { get; init; }

[DataMember(Order = 3)]
public required byte[] Payload { get; init; }
[DataMember(Order = 5)] public required byte[] Payload { get; init; }
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

public interface IDispatchInfo
{
DispatchTarget TargetType { get; }
int TargetID { get; }
public string? TargetServerID { get; init; }
public int? TargetWorldID { get; init; }
public int? TargetChannelID { get; init; }
public int? TargetCharacterID { get; init; }

byte[] Payload { get; }
}

0 comments on commit b2be1f7

Please sign in to comment.