Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CM-1079 How to Synchronize Concurrent Operations With a Barrier in C# #2005

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
using System.Net;
using System.Text;

namespace SimpleHTTPServer;

public class ChunkUploadServer
{
readonly int chunkSize;
readonly int threadCount;

public ChunkUploadServer(int chunkSize, int threadCount)
{
this.chunkSize = chunkSize;
this.threadCount = threadCount;
}

public async Task StartServer(int port)
{
var uri = $"http://localhost:{port}/upload/";

var listener = new HttpListener();
listener.Prefixes.Add(uri);
listener.Start();

Console.WriteLine($"Server started listening at {uri}");

// Wait for a client request
var context = await listener.GetContextAsync();
if (context.Request.HttpMethod == HttpMethod.Post.Method)
{
await ProcessUpload(context);
}
else
{
context.Response.StatusCode = (int)HttpStatusCode.MethodNotAllowed;
context.Response.Close();
}
}

public async Task ProcessUpload(HttpListenerContext context)
{
var fileName = Path.GetFileName(context.Request.Headers["X-Filename"]);

if (context.Request.ContentLength64 > chunkSize * threadCount)
{
Console.WriteLine("File size exceeds chunk capacity.");
context.Response.Abort();
return;
}

var barrier = new Barrier(threadCount + 1);

for (int i = 0; i < threadCount; i++)
{
int chunkNumber = i + 1;
var threadTask = Task.Run(() =>
{
Console.WriteLine($"Thread {chunkNumber} processed stream chunk.");

barrier.SignalAndWait();
});
}

barrier.SignalAndWait();

var message = $"File '{fileName}' uploaded successfully...";

context.Response.ContentLength64 = Encoding.UTF8.GetByteCount(message);
context.Response.StatusCode = (int)HttpStatusCode.Created;

using (Stream s = context.Response.OutputStream)
using (StreamWriter writer = new StreamWriter(s))
await writer.WriteAsync(message);

context.Response.Close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using SimpleHTTPServer;

class Program
{
public static void Main(string[] args)
{
var port = 8090;
var chunkSize = 1024 * 1024;
var threadCount = 4;

var server = new ChunkUploadServer(chunkSize, threadCount);
server.StartServer(port).Wait();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.5.002.0
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SynchronizeWithBarrier", "SynchronizeWithBarrier.csproj", "{7C32891D-EBC5-4C45-B844-D6EE099CD459}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{7C32891D-EBC5-4C45-B844-D6EE099CD459}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{7C32891D-EBC5-4C45-B844-D6EE099CD459}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7C32891D-EBC5-4C45-B844-D6EE099CD459}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7C32891D-EBC5-4C45-B844-D6EE099CD459}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {C587068E-4DEC-419F-9213-3B21A0DA8046}
EndGlobalSection
EndGlobal
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.0.31903.59
MinimumVisualStudioVersion = 10.0.40219.1
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SynchronizeWithBarrier", "SynchronizeWithBarrier\SynchronizeWithBarrier.csproj", "{68F96422-6D64-41BD-8F49-1EE10CFE34E1}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tests", "Tests\Tests.csproj", "{AC241346-1198-4A49-93BB-A2D62D01326C}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{68F96422-6D64-41BD-8F49-1EE10CFE34E1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{68F96422-6D64-41BD-8F49-1EE10CFE34E1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{68F96422-6D64-41BD-8F49-1EE10CFE34E1}.Release|Any CPU.ActiveCfg = Release|Any CPU
{68F96422-6D64-41BD-8F49-1EE10CFE34E1}.Release|Any CPU.Build.0 = Release|Any CPU
{AC241346-1198-4A49-93BB-A2D62D01326C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{AC241346-1198-4A49-93BB-A2D62D01326C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{AC241346-1198-4A49-93BB-A2D62D01326C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{AC241346-1198-4A49-93BB-A2D62D01326C}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
EndGlobal
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
using System.Net;
using SimpleHTTPServer;

[TestClass]
public class ChunkUploadServerTests
{
private HttpClientHandler clientHandler;

[TestInitialize]
public void TestInitialize()
{
clientHandler = new HttpClientHandler
{
ServerCertificateCustomValidationCallback = (sender, cert, chain, sslPolicyErrors) => { return true; }
};
}

[TestMethod]
public async Task WhenCreatingAServer_ThenItStartsSuccessfully()
{
// Arrange
int chunkSize = 1024 * 1024;
int threadCount = 4;
int port = 8090;

var server = new ChunkUploadServer(chunkSize, threadCount);
server.StartServer(port);

// Act
using var client = new HttpClient(clientHandler);
var response = await client.GetAsync($"http://localhost:{port}/upload/");
Assert.AreEqual(HttpStatusCode.MethodNotAllowed, response.StatusCode);
}

[TestMethod]
[ExpectedException(typeof(HttpRequestException))]
public async Task WhenUploadingLargeFiles_ThenShouldRejectLargeFilesExceedingChunkCapacityByThrowingException()
{
// Arrange
int chunkSize = 1024 * 1024;
int threadCount = 4;
int port = 8091;
long fileSize = (chunkSize * threadCount) + 1; // Exceeds total chunk size

var server = new ChunkUploadServer(chunkSize, threadCount);
server.StartServer(port);

// Act
using var client = new HttpClient(clientHandler);
using var content = new ByteArrayContent(new byte[fileSize]);
content.Headers.Add("X-Filename", "test.txt");
var response = await client.PostAsync($"http://localhost:{port}/upload/", content);
}

[TestMethod]
public async Task WhenUploadingAFile_ThenShouldProcessChunksWithMultipleThreads()
{
// Arrange
int chunkSize = 1024 * 1024;
int threadCount = 4;
int port = 8092;
long fileSize = chunkSize * threadCount;

var server = new ChunkUploadServer(chunkSize, threadCount);
server.StartServer(port);

// Act
using var client = new HttpClient(clientHandler);
using var content = new ByteArrayContent(new byte[fileSize]);
content.Headers.Add("X-Filename", "test.txt");
var response = await client.PostAsync($"http://localhost:{port}/upload/", content);

// Assert
Assert.AreEqual(HttpStatusCode.Created, response.StatusCode);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>

<IsPackable>false</IsPackable>
<IsTestProject>true</IsTestProject>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="coverlet.collector" Version="6.0.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.8.0" />
<PackageReference Include="MSTest.TestAdapter" Version="3.1.1" />
<PackageReference Include="MSTest.TestFramework" Version="3.1.1" />
</ItemGroup>

<ItemGroup>
<Using Include="Microsoft.VisualStudio.TestTools.UnitTesting" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\SynchronizeWithBarrier\SynchronizeWithBarrier.csproj" />
</ItemGroup>

</Project>