diff --git a/src/EventStore.Plugins/Authorization/Operations.cs b/src/EventStore.Plugins/Authorization/Operations.cs index a8edfca..f1a6056 100644 --- a/src/EventStore.Plugins/Authorization/Operations.cs +++ b/src/EventStore.Plugins/Authorization/Operations.cs @@ -74,6 +74,11 @@ public static class Gossip { public static readonly OperationDefinition Update = new(Resource, "update"); public static readonly OperationDefinition ClientRead = new($"{Resource}/client", "read"); } + + public static class Transform { + const string Resource = "transform"; + public static readonly OperationDefinition Set = new(Resource, "set"); + } } public static class Streams { diff --git a/src/EventStore.Plugins/Transforms/ChunkDataReadStream.cs b/src/EventStore.Plugins/Transforms/ChunkDataReadStream.cs new file mode 100644 index 0000000..2a880b8 --- /dev/null +++ b/src/EventStore.Plugins/Transforms/ChunkDataReadStream.cs @@ -0,0 +1,43 @@ +using System; +using System.IO; + +namespace EventStore.Plugins.Transforms; + +public class ChunkDataReadStream(Stream chunkFileStream) : Stream { + public Stream ChunkFileStream => chunkFileStream; + + public sealed override bool CanRead => true; + public sealed override bool CanSeek => true; + public sealed override bool CanWrite => false; + public sealed override void Write(byte[] buffer, int offset, int count) => throw new InvalidOperationException(); + public sealed override void Flush() => throw new InvalidOperationException(); + public sealed override void SetLength(long value) => throw new InvalidOperationException(); + public override long Length => throw new NotSupportedException(); + + // reads must always return exactly `count` bytes as we never read past the (flushed) writer checkpoint + public override int Read(byte[] buffer, int offset, int count) => ChunkFileStream.Read(buffer, offset, count); + + // seeks need to support only `SeekOrigin.Begin` + public override long Seek(long offset, SeekOrigin origin) { + if (origin != SeekOrigin.Begin) + throw new NotSupportedException(); + + return ChunkFileStream.Seek(offset, origin); + } + + public override long Position { + get => ChunkFileStream.Position; + set => ChunkFileStream.Position = value; + } + + protected override void Dispose(bool disposing) { + try { + if (!disposing) + return; + + chunkFileStream.Dispose(); + } finally { + base.Dispose(disposing); + } + } +} diff --git a/src/EventStore.Plugins/Transforms/ChunkDataWriteStream.cs b/src/EventStore.Plugins/Transforms/ChunkDataWriteStream.cs new file mode 100644 index 0000000..1f82195 --- /dev/null +++ b/src/EventStore.Plugins/Transforms/ChunkDataWriteStream.cs @@ -0,0 +1,61 @@ +using System; +using System.IO; +using System.Security.Cryptography; + +namespace EventStore.Plugins.Transforms; + +public class ChunkDataWriteStream(Stream chunkFileStream, HashAlgorithm checksumAlgorithm) : Stream { + public Stream ChunkFileStream => chunkFileStream; + public HashAlgorithm ChecksumAlgorithm => checksumAlgorithm; + + public sealed override bool CanRead => false; + public sealed override bool CanSeek => false; + public sealed override bool CanWrite => true; + public sealed override int Read(byte[] buffer, int offset, int count) => throw new InvalidOperationException(); + public sealed override long Seek(long offset, SeekOrigin origin) => throw new InvalidOperationException(); + + public override void Write(byte[] buffer, int offset, int count) { + ChunkFileStream.Write(buffer, offset, count); + ChecksumAlgorithm.TransformBlock(buffer, 0, count, null, 0); + } + + public override void Flush() => ChunkFileStream.Flush(); + public override void SetLength(long value) => ChunkFileStream.SetLength(value); + public override long Length => ChunkFileStream.Length; + public override long Position { + get => ChunkFileStream.Position; + set { + if (ChunkFileStream.Position != 0) + throw new InvalidOperationException("Writer's position can only be moved from 0 to a higher value."); + + ReadAndChecksum(value); + + if (ChunkFileStream.Position != value) + throw new Exception($"Writer's position ({ChunkFileStream.Position:N0}) is not at the expected position ({value:N0})"); + } + } + + private void ReadAndChecksum(long count) { + var buffer = new byte[4096]; + long toRead = count; + while (toRead > 0) { + int read = ChunkFileStream.Read(buffer, 0, (int)Math.Min(toRead, buffer.Length)); + if (read == 0) + break; + + ChecksumAlgorithm.TransformBlock(buffer, 0, read, null, 0); + toRead -= read; + } + } + + protected override void Dispose(bool disposing) { + try { + if (!disposing) + return; + + chunkFileStream.Dispose(); + } finally { + base.Dispose(disposing); + } + } +} diff --git a/src/EventStore.Plugins/Transforms/IChunkReadTransform.cs b/src/EventStore.Plugins/Transforms/IChunkReadTransform.cs new file mode 100644 index 0000000..8456883 --- /dev/null +++ b/src/EventStore.Plugins/Transforms/IChunkReadTransform.cs @@ -0,0 +1,5 @@ +namespace EventStore.Plugins.Transforms; + +public interface IChunkReadTransform { + ChunkDataReadStream TransformData(ChunkDataReadStream stream); +} diff --git a/src/EventStore.Plugins/Transforms/IChunkTransform.cs b/src/EventStore.Plugins/Transforms/IChunkTransform.cs new file mode 100644 index 0000000..b792908 --- /dev/null +++ b/src/EventStore.Plugins/Transforms/IChunkTransform.cs @@ -0,0 +1,6 @@ +namespace EventStore.Plugins.Transforms; + +public interface IChunkTransform { + IChunkReadTransform Read { get; } + IChunkWriteTransform Write { get; } +} diff --git a/src/EventStore.Plugins/Transforms/IChunkTransformFactory.cs b/src/EventStore.Plugins/Transforms/IChunkTransformFactory.cs new file mode 100644 index 0000000..882b6c7 --- /dev/null +++ b/src/EventStore.Plugins/Transforms/IChunkTransformFactory.cs @@ -0,0 +1,12 @@ +using System; +using System.IO; + +namespace EventStore.Plugins.Transforms; + +public interface IChunkTransformFactory { + TransformType Type { get; } + int TransformDataPosition(int dataPosition); + ReadOnlyMemory CreateTransformHeader(); + ReadOnlyMemory ReadTransformHeader(Stream stream); + IChunkTransform CreateTransform(ReadOnlyMemory transformHeader); +} diff --git a/src/EventStore.Plugins/Transforms/IChunkWriteTransform.cs b/src/EventStore.Plugins/Transforms/IChunkWriteTransform.cs new file mode 100644 index 0000000..e45c770 --- /dev/null +++ b/src/EventStore.Plugins/Transforms/IChunkWriteTransform.cs @@ -0,0 +1,9 @@ +using System; + +namespace EventStore.Plugins.Transforms; + +public interface IChunkWriteTransform { + ChunkDataWriteStream TransformData(ChunkDataWriteStream stream); + void CompleteData(int footerSize, int alignmentSize); + void WriteFooter(ReadOnlySpan footer, out int fileSize); +} diff --git a/src/EventStore.Plugins/Transforms/IDbTransform.cs b/src/EventStore.Plugins/Transforms/IDbTransform.cs new file mode 100644 index 0000000..ea836eb --- /dev/null +++ b/src/EventStore.Plugins/Transforms/IDbTransform.cs @@ -0,0 +1,7 @@ +namespace EventStore.Plugins.Transforms; + +public interface IDbTransform { + string Name { get; } + TransformType Type { get; } + IChunkTransformFactory ChunkFactory { get; } +} diff --git a/src/EventStore.Plugins/Transforms/TransformType.cs b/src/EventStore.Plugins/Transforms/TransformType.cs new file mode 100644 index 0000000..5440759 --- /dev/null +++ b/src/EventStore.Plugins/Transforms/TransformType.cs @@ -0,0 +1,6 @@ +namespace EventStore.Plugins.Transforms; + +public enum TransformType { + Identity = 0, + Encryption_AesGcm = 1, +}