Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
HakanL committed Dec 12, 2024
1 parent 5ab2b91 commit 7e08740
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 87 deletions.
172 changes: 118 additions & 54 deletions example/Program.cs
Original file line number Diff line number Diff line change
@@ -1,76 +1,140 @@
using Haukcode.sACN;
using Haukcode.sACN.Model;
using System;
using System.Buffers;
using System.Linq;
using System.Net;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace Haukcode.sACN.ConsoleExample
namespace Haukcode.sACN.ConsoleExample;

public class Program
{
public class Program
private static readonly Guid acnSourceId = new Guid("{B32625A6-C280-4389-BD25-E0D13F5B50E0}");
private static readonly string acnSourceName = "DMXPlayer";

private static MemoryPool<byte> memoryPool = MemoryPool<byte>.Shared;
private static double last = 0;

public static void Main(string[] args)
{
Listen();
}

static void Listen()
{
private static readonly Guid acnSourceId = new Guid("{B32625A6-C280-4389-BD25-E0D13F5B50E0}");
private static readonly string acnSourceName = "DMXPlayer";
var recvClient = new SACNClient(
senderId: acnSourceId,
senderName: acnSourceName,
localAddress: IPAddress.Any);

var sendClient = new SACNClient(
senderId: acnSourceId,
senderName: acnSourceName,
localAddress: Haukcode.Network.Utils.GetFirstBindAddress().IPAddress);

public static void Main(string[] args)
recvClient.OnError.Subscribe(e =>
{
Listen();
}
Console.WriteLine($"Error! {e.Message}");
});

//listener.OnReceiveRaw.Subscribe(d =>
//{
// Console.WriteLine($"Received {d.Data.Length} bytes from {d.Host}");
//});

//recvClient.OnPacket.Subscribe(d =>
//{
// Listener_OnPacket(d.TimestampMS, d.TimestampMS - last, d.Packet);
// last = d.TimestampMS;
//});

static void Listen()
var channel = Channel.CreateUnbounded<ReceiveDataPacket>();

// Not sure about the transform here, the packet may use memory from
// the memory pool and it may not be safe to pass it around like this
recvClient.StartRecordPipeline(p => WritePacket(channel, p), () => channel.Writer.Complete());

var writerTask = Task.Factory.StartNew(async () =>
{
await WriteToDiskAsync(channel, CancellationToken.None);
}, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default).Unwrap();

recvClient.JoinDMXUniverse(1);
recvClient.JoinDMXUniverse(2);

while (true)
{
var recvClient = new SACNClient(
senderId: acnSourceId,
senderName: acnSourceName,
localAddress: IPAddress.Any);

var sendClient = new SACNClient(
senderId: acnSourceId,
senderName: acnSourceName,
localAddress: Haukcode.Network.Utils.GetFirstBindAddress().IPAddress);

recvClient.OnError.Subscribe(e =>
{
Console.WriteLine($"Error! {e.Message}");
});

//listener.OnReceiveRaw.Subscribe(d =>
//{
// Console.WriteLine($"Received {d.Data.Length} bytes from {d.Host}");
//});

double last = 0;
recvClient.OnPacket.Subscribe(d =>
{
Listener_OnPacket(d.TimestampMS, d.TimestampMS - last, d.Packet);
last = d.TimestampMS;
});

var channel = Channel.CreateUnbounded<SACNPacket>();

recvClient.StartRecordPipeline(channel, x => x.Packet);
recvClient.JoinDMXUniverse(1);
recvClient.JoinDMXUniverse(2);

while (true)
{
sendClient.SendDmxData(null, 1, new byte[] { 1, 2, 3, 4, 5 });

Thread.Sleep(500);
}
sendClient.SendDmxData(null, 1, new byte[] { 1, 2, 3, 4, 5 });

Thread.Sleep(500);
}
}

private static async Task WritePacket(Channel<ReceiveDataPacket> channel, ReceiveDataPacket receiveData)
{
var dmxData = TransformPacket(receiveData);

if (dmxData == null)
return;

private static void Listener_OnPacket(double timestampMS, double sinceLast, SACNPacket e)
await channel.Writer.WriteAsync(dmxData, CancellationToken.None);
}

private static ReceiveDataPacket TransformPacket(ReceiveDataPacket receiveData)
{
var framingLayer = receiveData.Packet.RootLayer?.FramingLayer;
if (framingLayer == null)
return null;

switch (framingLayer)
{
var dataPacket = e as SACNDataPacket;
if (dataPacket == null)
return;
case DataFramingLayer dataFramingLayer:
var dmpLayer = dataFramingLayer.DMPLayer;

if (dmpLayer == null || dmpLayer.Length < 1)
// Unknown/unsupported
return null;

Console.Write($"+{sinceLast:N2}\t");
Console.Write($"Packet from {dataPacket.SourceName}\tu{dataPacket.UniverseId}\ts{dataPacket.SequenceId}\t");
Console.WriteLine($"Data {string.Join(",", dataPacket.DMXData.Take(16))}...");
if (dmpLayer.StartCode != 0)
// We only support start code 0
return null;

// Hack
var newBuf = new byte[dmpLayer.Data.Length];
dmpLayer.Data.CopyTo(newBuf);
dmpLayer.Data = newBuf;

return receiveData;

case SyncFramingLayer syncFramingLayer:
return receiveData;
}

return null;
}

private static async Task WriteToDiskAsync(Channel<ReceiveDataPacket> inputChannel, CancellationToken cancellationToken)
{
await foreach (var dmxData in inputChannel.Reader.ReadAllAsync(cancellationToken))
{
Listener_OnPacket(dmxData.TimestampMS, dmxData.TimestampMS - last, dmxData);
last = dmxData.TimestampMS;
}
}

private static void Listener_OnPacket(double timestampMS, double sinceLast, ReceiveDataPacket e)
{
var dataPacket = e.Packet.RootLayer.FramingLayer as DataFramingLayer;

if (dataPacket == null)
return;

Console.Write($"+{sinceLast:N2}\t");
Console.Write($"Packet from {dataPacket.SourceName}\tu{dataPacket.UniverseId}\ts{dataPacket.SequenceId}\t");
Console.WriteLine($"Data {string.Join(",", dataPacket.DMPLayer.Data.ToArray().Take(16))}...");
}
}
2 changes: 1 addition & 1 deletion src/Haukcode.sACN/Haukcode.sACN.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
</When>
<Otherwise>
<ItemGroup>
<PackageReference Include="Haukcode.HighPerfComm" Version="1.0.12" />
<PackageReference Include="Haukcode.HighPerfComm" Version="1.0.13" />
</ItemGroup>
</Otherwise>
</Choose>
Expand Down
43 changes: 11 additions & 32 deletions src/Haukcode.sACN/SACNClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,10 @@ public class SACNClient : Client<SACNClient.SendData, ReceiveDataPacket>
{
public class SendData : HighPerfComm.SendData
{
public ushort UniverseId { get; set; }

public IPEndPoint Destination { get; set; }

public SendData(ushort universeId, IPEndPoint destination)
public SendData(IPEndPoint destination)
{
UniverseId = universeId;
Destination = destination;
}
}
Expand All @@ -40,7 +37,6 @@ public SendData(ushort universeId, IPEndPoint destination)
private Socket? listenSocket;
private readonly Socket sendSocket;
private readonly IPEndPoint localEndPoint;
private readonly ISubject<ReceiveDataPacket> packetSubject;
private readonly Dictionary<ushort, byte> sequenceIds = [];
private readonly Dictionary<ushort, byte> sequenceIdsSync = [];
private readonly Lock lockObject = new();
Expand All @@ -54,31 +50,24 @@ public SACNClient(Guid senderId, string senderName, IPAddress localAddress, int
if (senderId == Guid.Empty)
throw new ArgumentException("Invalid sender Id", nameof(senderId));
SenderId = senderId;

SenderName = senderName;

if (port <= 0)
throw new ArgumentException("Invalid port", nameof(port));
this.localEndPoint = new IPEndPoint(localAddress, port);

this.packetSubject = new Subject<ReceiveDataPacket>();
this.localEndPoint = new IPEndPoint(localAddress, port);

this.sendSocket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
ConfigureSendSocket(this.sendSocket);
}

private void ConfigureSendSocket(Socket socket)
{
socket.SendBufferSize = SendBufferSize;
this.sendSocket.SendBufferSize = SendBufferSize;

Haukcode.Network.Utils.SetSocketOptions(socket);
Haukcode.Network.Utils.SetSocketOptions(this.sendSocket);

// Multicast socket settings
socket.DontFragment = true;
socket.MulticastLoopback = false;
this.sendSocket.DontFragment = true;
this.sendSocket.MulticastLoopback = false;

// Only local LAN group
socket.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.MulticastTimeToLive, 20);
this.sendSocket.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.MulticastTimeToLive, 20);
}

public IPEndPoint LocalEndPoint => this.localEndPoint;
Expand All @@ -87,12 +76,6 @@ private void ConfigureSendSocket(Socket socket)

public string SenderName { get; }

/// <summary>
/// Observable that provides all parsed packets. This is buffered on its own thread so the processing can
/// take any time necessary (memory consumption will go up though, there is no upper limit to amount of data buffered).
/// </summary>
public IObservable<ReceiveDataPacket> OnPacket => this.packetSubject.AsObservable();

/// <summary>
/// Gets a list of dmx universes this socket has joined to
/// </summary>
Expand Down Expand Up @@ -168,7 +151,7 @@ public Task SendDmxData(
packet.DataFramingLayer.Options.ForceSynchronization = true;
}

return QueuePacket(universeId, address, packet, important);
return QueuePacketForSending(universeId, address, packet, important);
}

/// <summary>
Expand All @@ -193,7 +176,7 @@ public Task SendSync(IPAddress? address, ushort syncAddress)
}
});

return QueuePacket(syncAddress, address, packet, true);
return QueuePacketForSending(syncAddress, address, packet, true);
}

/// <summary>
Expand All @@ -203,7 +186,7 @@ public Task SendSync(IPAddress? address, ushort syncAddress)
/// <param name="destination">Destination</param>
/// <param name="packet">Packet</param>
/// <param name="important">Important</param>
private async Task QueuePacket(ushort universeId, IPAddress? destination, SACNPacket packet, bool important)
private async Task QueuePacketForSending(ushort universeId, IPAddress? destination, SACNPacket packet, bool important)
{
await base.QueuePacket(packet.Length, important, () =>
{
Expand Down Expand Up @@ -231,15 +214,11 @@ await base.QueuePacket(packet.Length, important, () =>
}
}

return new SendData(universeId, sendDataDestination);
return new SendData(sendDataDestination);
},
packet.WriteToBuffer);
}

public void WarmUpSockets(IEnumerable<ushort> universeIds)
{
}

private byte GetNewSequenceId(ushort universeId)
{
lock (this.lockObject)
Expand Down

0 comments on commit 7e08740

Please sign in to comment.