diff --git a/example/Program.cs b/example/Program.cs index 2c0cc7e..8b16b77 100644 --- a/example/Program.cs +++ b/example/Program.cs @@ -1,76 +1,140 @@ using Haukcode.sACN; using Haukcode.sACN.Model; using System; +using System.Buffers; using System.Linq; using System.Net; using System.Reactive.Linq; using System.Threading; using System.Threading.Channels; +using System.Threading.Tasks; -namespace Haukcode.sACN.ConsoleExample +namespace Haukcode.sACN.ConsoleExample; + +public class Program { - public class Program + private static readonly Guid acnSourceId = new Guid("{B32625A6-C280-4389-BD25-E0D13F5B50E0}"); + private static readonly string acnSourceName = "DMXPlayer"; + + private static MemoryPool memoryPool = MemoryPool.Shared; + private static double last = 0; + + public static void Main(string[] args) + { + Listen(); + } + + static void Listen() { - private static readonly Guid acnSourceId = new Guid("{B32625A6-C280-4389-BD25-E0D13F5B50E0}"); - private static readonly string acnSourceName = "DMXPlayer"; + var recvClient = new SACNClient( + senderId: acnSourceId, + senderName: acnSourceName, + localAddress: IPAddress.Any); + + var sendClient = new SACNClient( + senderId: acnSourceId, + senderName: acnSourceName, + localAddress: Haukcode.Network.Utils.GetFirstBindAddress().IPAddress); - public static void Main(string[] args) + recvClient.OnError.Subscribe(e => { - Listen(); - } + Console.WriteLine($"Error! {e.Message}"); + }); + + //listener.OnReceiveRaw.Subscribe(d => + //{ + // Console.WriteLine($"Received {d.Data.Length} bytes from {d.Host}"); + //}); + + //recvClient.OnPacket.Subscribe(d => + //{ + // Listener_OnPacket(d.TimestampMS, d.TimestampMS - last, d.Packet); + // last = d.TimestampMS; + //}); - static void Listen() + var channel = Channel.CreateUnbounded(); + + // Not sure about the transform here, the packet may use memory from + // the memory pool and it may not be safe to pass it around like this + recvClient.StartRecordPipeline(p => WritePacket(channel, p), () => channel.Writer.Complete()); + + var writerTask = Task.Factory.StartNew(async () => + { + await WriteToDiskAsync(channel, CancellationToken.None); + }, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default).Unwrap(); + + recvClient.JoinDMXUniverse(1); + recvClient.JoinDMXUniverse(2); + + while (true) { - var recvClient = new SACNClient( - senderId: acnSourceId, - senderName: acnSourceName, - localAddress: IPAddress.Any); - - var sendClient = new SACNClient( - senderId: acnSourceId, - senderName: acnSourceName, - localAddress: Haukcode.Network.Utils.GetFirstBindAddress().IPAddress); - - recvClient.OnError.Subscribe(e => - { - Console.WriteLine($"Error! {e.Message}"); - }); - - //listener.OnReceiveRaw.Subscribe(d => - //{ - // Console.WriteLine($"Received {d.Data.Length} bytes from {d.Host}"); - //}); - - double last = 0; - recvClient.OnPacket.Subscribe(d => - { - Listener_OnPacket(d.TimestampMS, d.TimestampMS - last, d.Packet); - last = d.TimestampMS; - }); - - var channel = Channel.CreateUnbounded(); - - recvClient.StartRecordPipeline(channel, x => x.Packet); - recvClient.JoinDMXUniverse(1); - recvClient.JoinDMXUniverse(2); - - while (true) - { - sendClient.SendDmxData(null, 1, new byte[] { 1, 2, 3, 4, 5 }); - - Thread.Sleep(500); - } + sendClient.SendDmxData(null, 1, new byte[] { 1, 2, 3, 4, 5 }); + + Thread.Sleep(500); } + } + + private static async Task WritePacket(Channel channel, ReceiveDataPacket receiveData) + { + var dmxData = TransformPacket(receiveData); + + if (dmxData == null) + return; - private static void Listener_OnPacket(double timestampMS, double sinceLast, SACNPacket e) + await channel.Writer.WriteAsync(dmxData, CancellationToken.None); + } + + private static ReceiveDataPacket TransformPacket(ReceiveDataPacket receiveData) + { + var framingLayer = receiveData.Packet.RootLayer?.FramingLayer; + if (framingLayer == null) + return null; + + switch (framingLayer) { - var dataPacket = e as SACNDataPacket; - if (dataPacket == null) - return; + case DataFramingLayer dataFramingLayer: + var dmpLayer = dataFramingLayer.DMPLayer; + + if (dmpLayer == null || dmpLayer.Length < 1) + // Unknown/unsupported + return null; - Console.Write($"+{sinceLast:N2}\t"); - Console.Write($"Packet from {dataPacket.SourceName}\tu{dataPacket.UniverseId}\ts{dataPacket.SequenceId}\t"); - Console.WriteLine($"Data {string.Join(",", dataPacket.DMXData.Take(16))}..."); + if (dmpLayer.StartCode != 0) + // We only support start code 0 + return null; + + // Hack + var newBuf = new byte[dmpLayer.Data.Length]; + dmpLayer.Data.CopyTo(newBuf); + dmpLayer.Data = newBuf; + + return receiveData; + + case SyncFramingLayer syncFramingLayer: + return receiveData; } + + return null; + } + + private static async Task WriteToDiskAsync(Channel inputChannel, CancellationToken cancellationToken) + { + await foreach (var dmxData in inputChannel.Reader.ReadAllAsync(cancellationToken)) + { + Listener_OnPacket(dmxData.TimestampMS, dmxData.TimestampMS - last, dmxData); + last = dmxData.TimestampMS; + } + } + + private static void Listener_OnPacket(double timestampMS, double sinceLast, ReceiveDataPacket e) + { + var dataPacket = e.Packet.RootLayer.FramingLayer as DataFramingLayer; + + if (dataPacket == null) + return; + + Console.Write($"+{sinceLast:N2}\t"); + Console.Write($"Packet from {dataPacket.SourceName}\tu{dataPacket.UniverseId}\ts{dataPacket.SequenceId}\t"); + Console.WriteLine($"Data {string.Join(",", dataPacket.DMPLayer.Data.ToArray().Take(16))}..."); } } diff --git a/src/Haukcode.sACN/Haukcode.sACN.csproj b/src/Haukcode.sACN/Haukcode.sACN.csproj index c907489..c0d48e3 100644 --- a/src/Haukcode.sACN/Haukcode.sACN.csproj +++ b/src/Haukcode.sACN/Haukcode.sACN.csproj @@ -52,7 +52,7 @@ - + diff --git a/src/Haukcode.sACN/SACNClient.cs b/src/Haukcode.sACN/SACNClient.cs index 3f8b65c..7f5b21d 100644 --- a/src/Haukcode.sACN/SACNClient.cs +++ b/src/Haukcode.sACN/SACNClient.cs @@ -21,13 +21,10 @@ public class SACNClient : Client { public class SendData : HighPerfComm.SendData { - public ushort UniverseId { get; set; } - public IPEndPoint Destination { get; set; } - public SendData(ushort universeId, IPEndPoint destination) + public SendData(IPEndPoint destination) { - UniverseId = universeId; Destination = destination; } } @@ -40,7 +37,6 @@ public SendData(ushort universeId, IPEndPoint destination) private Socket? listenSocket; private readonly Socket sendSocket; private readonly IPEndPoint localEndPoint; - private readonly ISubject packetSubject; private readonly Dictionary sequenceIds = []; private readonly Dictionary sequenceIdsSync = []; private readonly Lock lockObject = new(); @@ -54,31 +50,24 @@ public SACNClient(Guid senderId, string senderName, IPAddress localAddress, int if (senderId == Guid.Empty) throw new ArgumentException("Invalid sender Id", nameof(senderId)); SenderId = senderId; - SenderName = senderName; if (port <= 0) throw new ArgumentException("Invalid port", nameof(port)); - this.localEndPoint = new IPEndPoint(localAddress, port); - this.packetSubject = new Subject(); + this.localEndPoint = new IPEndPoint(localAddress, port); this.sendSocket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); - ConfigureSendSocket(this.sendSocket); - } - - private void ConfigureSendSocket(Socket socket) - { - socket.SendBufferSize = SendBufferSize; + this.sendSocket.SendBufferSize = SendBufferSize; - Haukcode.Network.Utils.SetSocketOptions(socket); + Haukcode.Network.Utils.SetSocketOptions(this.sendSocket); // Multicast socket settings - socket.DontFragment = true; - socket.MulticastLoopback = false; + this.sendSocket.DontFragment = true; + this.sendSocket.MulticastLoopback = false; // Only local LAN group - socket.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.MulticastTimeToLive, 20); + this.sendSocket.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.MulticastTimeToLive, 20); } public IPEndPoint LocalEndPoint => this.localEndPoint; @@ -87,12 +76,6 @@ private void ConfigureSendSocket(Socket socket) public string SenderName { get; } - /// - /// Observable that provides all parsed packets. This is buffered on its own thread so the processing can - /// take any time necessary (memory consumption will go up though, there is no upper limit to amount of data buffered). - /// - public IObservable OnPacket => this.packetSubject.AsObservable(); - /// /// Gets a list of dmx universes this socket has joined to /// @@ -168,7 +151,7 @@ public Task SendDmxData( packet.DataFramingLayer.Options.ForceSynchronization = true; } - return QueuePacket(universeId, address, packet, important); + return QueuePacketForSending(universeId, address, packet, important); } /// @@ -193,7 +176,7 @@ public Task SendSync(IPAddress? address, ushort syncAddress) } }); - return QueuePacket(syncAddress, address, packet, true); + return QueuePacketForSending(syncAddress, address, packet, true); } /// @@ -203,7 +186,7 @@ public Task SendSync(IPAddress? address, ushort syncAddress) /// Destination /// Packet /// Important - private async Task QueuePacket(ushort universeId, IPAddress? destination, SACNPacket packet, bool important) + private async Task QueuePacketForSending(ushort universeId, IPAddress? destination, SACNPacket packet, bool important) { await base.QueuePacket(packet.Length, important, () => { @@ -231,15 +214,11 @@ await base.QueuePacket(packet.Length, important, () => } } - return new SendData(universeId, sendDataDestination); + return new SendData(sendDataDestination); }, packet.WriteToBuffer); } - public void WarmUpSockets(IEnumerable universeIds) - { - } - private byte GetNewSequenceId(ushort universeId) { lock (this.lockObject)