Skip to content

Commit

Permalink
Improve more precise speed limits
Browse files Browse the repository at this point in the history
Co-authored-by: DismissedLight <[email protected]>
  • Loading branch information
Eric-Joker and Lightczx committed Aug 7, 2024
1 parent ed5821c commit af1414f
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 72 deletions.
24 changes: 2 additions & 22 deletions src/Starward/Services/Download/InstallGameManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
using System.Collections.Concurrent;
using System.Diagnostics.CodeAnalysis;
using System.Threading.RateLimiting;
using System.Threading;
using System.Threading.Tasks;

namespace Starward.Services.Download;

Expand Down Expand Up @@ -38,13 +36,6 @@ private InstallGameManager()
public static TokenBucketRateLimiter RateLimiter { get; private set; }


public static bool IsEnableSpeedLimit => SpeedLimitBytesPerSecond != int.MaxValue;


// BUFFER_SIZE越大限速时保留速度也会越大,可以用来抵消迷之原因造成的超速¿
// speedLimit<=2MB/s → 16Bytes else 1KB
public static int BUFFER_SIZE => (SpeedLimitBytesPerSecond <= (1 << 21)) ? (1 << 4) : (1 << 10);


public event EventHandler<InstallGameStateModel> InstallTaskAdded;

Expand All @@ -54,27 +45,20 @@ private InstallGameManager()



public static event EventHandler LimitStateChanged;




public static void SetRateLimit()
{
// 小于speedLimitBytesPerSecond的最大能被BUFFER_SIZE整除的值
var speedLimitBytesPerPeriod = Math.Max(SpeedLimitBytesPerSecond / 25 / BUFFER_SIZE * BUFFER_SIZE, BUFFER_SIZE);
var speedLimitBytesPerPeriod = SpeedLimitBytesPerSecond / 25;
RateLimiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions
{
TokenLimit = speedLimitBytesPerPeriod,
// 0.04: 将每秒切割为上面的25份,间隔越小速度越精准。
// 因补充令牌逻辑运行耗时远大于期望,若间隔极小,将无法达到最高限速。
ReplenishmentPeriod = TimeSpan.FromSeconds(Math.Max(BUFFER_SIZE / (double)SpeedLimitBytesPerSecond, 0.04)),
ReplenishmentPeriod = TimeSpan.FromSeconds(0.04),
TokensPerPeriod = speedLimitBytesPerPeriod,
QueueProcessingOrder = QueueProcessingOrder.OldestFirst,
AutoReplenishment = true
});
if (LimitStateChanged != null && LimitStateChanged.GetInvocationList().Length > 0)
Task.Run(() => LimitStateChanged.Invoke(null, EventArgs.Empty));
}


Expand Down Expand Up @@ -105,8 +89,6 @@ public void AddInstallService(InstallGameService service)
model.InstallFailed += Model_InstallFailed;
model.InstallCanceled -= Model_InstallCanceled;
model.InstallCanceled += Model_InstallCanceled;
LimitStateChanged -= model._manager_LimitStateChanged;
LimitStateChanged += model._manager_LimitStateChanged;
InstallTaskAdded?.Invoke(this, model);
}

Expand All @@ -121,7 +103,6 @@ private void Model_InstallFinished(object? sender, EventArgs e)
model.InstallFinished -= Model_InstallFinished;
model.InstallFailed -= Model_InstallFailed;
model.InstallCanceled -= Model_InstallCanceled;
LimitStateChanged -= model._manager_LimitStateChanged;
InstallTaskRemoved?.Invoke(this, model);
WeakReferenceMessenger.Default.Send(new InstallGameFinishedMessage(model.GameBiz));
NotificationBehavior.Instance.Success(Lang.InstallGameManager_DownloadTaskCompleted, $"{InstallTaskToString(model.Service.InstallTask)} - {model.GameBiz.ToGameName()} - {model.GameBiz.ToGameServer()}", 0);
Expand Down Expand Up @@ -150,7 +131,6 @@ private void Model_InstallCanceled(object? sender, EventArgs e)
model.InstallFinished -= Model_InstallFinished;
model.InstallFailed -= Model_InstallFailed;
model.InstallCanceled -= Model_InstallCanceled;
LimitStateChanged -= model._manager_LimitStateChanged;
InstallTaskRemoved?.Invoke(this, model);
}
}
Expand Down
48 changes: 12 additions & 36 deletions src/Starward/Services/Download/InstallGameService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
using System.Text;
using System.Text.Json.Nodes;
using System.Threading;
using System.Threading.RateLimiting;
using System.Threading.Tasks;
using Vanara.PInvoke;

Expand Down Expand Up @@ -1073,14 +1072,6 @@ protected void Finish()



public long HTTP_BUFFER_SIZE = InstallGameManager.BUFFER_SIZE;



public bool IsEnableSpeedLimit = InstallGameManager.IsEnableSpeedLimit;



protected int _totalCount;
public int TotalCount => _totalCount;

Expand Down Expand Up @@ -1203,9 +1194,7 @@ protected async Task DownloadItemAsync(InstallGameItem item, CancellationToken c
{
file_target = item.WriteAsTempFile ? file_tmp : file;
}
var httpBuffer = ArrayPool<byte>.Shared.Rent((int)Interlocked.Read(ref HTTP_BUFFER_SIZE));
var buffer = ArrayPool<byte>.Shared.Rent(BUFFER_SIZE);
int bufferOffset = 0;
try
{
using var fs = File.Open(file_target, FileMode.OpenOrCreate);
Expand All @@ -1218,43 +1207,30 @@ protected async Task DownloadItemAsync(InstallGameItem item, CancellationToken c
response.EnsureSuccessStatusCode();
using var hs = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false);
int length;
while ((length = await hs.ReadAsync(IsEnableSpeedLimit ? httpBuffer : buffer, cancellationToken).ConfigureAwait(false)) != 0)
while ((length = await hs.ReadAsync(buffer, cancellationToken).ConfigureAwait(false)) != 0)
{
if (IsEnableSpeedLimit)
{
RateLimitLease lease;
do
int totalWritten = 0;
if (InstallGameManager.SpeedLimitBytesPerSecond != int.MaxValue)
while (totalWritten < length)
{
lease = await InstallGameManager.RateLimiter.AcquireAsync(length, cancellationToken).ConfigureAwait(false);
if (!lease.IsAcquired && lease.TryGetMetadata(MetadataName.RetryAfter, out TimeSpan retryAfter))
int remaining = length - totalWritten;
int tokensAcquired = TokenBucketRateLimiterExtension.Acquire(InstallGameManager.RateLimiter, remaining, out TimeSpan retryAfter);
if (tokensAcquired == 0)
await Task.Delay((int)Math.Max(Math.Sqrt(retryAfter.TotalMilliseconds), 1), cancellationToken).ConfigureAwait(false);
} while (!lease.IsAcquired);
int remainingSpace = buffer.Length - bufferOffset;
if (length > remainingSpace)
{
Buffer.BlockCopy(httpBuffer, 0, buffer, bufferOffset, remainingSpace);
await fs.WriteAsync(buffer.AsMemory(0, buffer.Length), cancellationToken).ConfigureAwait(false);
Buffer.BlockCopy(httpBuffer, remainingSpace, buffer, 0, length - remainingSpace);
bufferOffset = length - remainingSpace;
}
else
{
Buffer.BlockCopy(httpBuffer, 0, buffer, bufferOffset, length);
bufferOffset += length;
else
{
await fs.WriteAsync(buffer.AsMemory(totalWritten, tokensAcquired), cancellationToken).ConfigureAwait(false);
totalWritten += tokensAcquired;
}
}
}
else
await fs.WriteAsync(buffer.AsMemory(0, length), cancellationToken).ConfigureAwait(false);
Interlocked.Add(ref _finishBytes, length);
}
// Write any remaining data in buffer
if (bufferOffset > 0)
await fs.WriteAsync(buffer.AsMemory(0, bufferOffset), cancellationToken).ConfigureAwait(false);
}
}
finally
{
ArrayPool<byte>.Shared.Return(httpBuffer);
ArrayPool<byte>.Shared.Return(buffer);
}
}
Expand Down
14 changes: 0 additions & 14 deletions src/Starward/Services/Download/InstallGameStateModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ private void ComputeSpeed(InstallGameState state)
private void _service_StateChanged(object? sender, InstallGameState e)
{
uiContext.Post(_ => UpdateState(), null);

}


Expand All @@ -281,17 +280,4 @@ private void Service_InstallFailed(object? sender, Exception e)
}



public void _manager_LimitStateChanged(object? sender, EventArgs e)
{
if (Service.State is InstallGameState.Download && Service.HTTP_BUFFER_SIZE != InstallGameManager.BUFFER_SIZE || Service.IsEnableSpeedLimit != InstallGameManager.IsEnableSpeedLimit)
{
Service.Pause();
Task.WhenAll(Service.TaskItems).Wait();
Service.HTTP_BUFFER_SIZE = InstallGameManager.BUFFER_SIZE;
Service.IsEnableSpeedLimit = InstallGameManager.IsEnableSpeedLimit;
Service.Continue();
InstallStarted?.Invoke(this, EventArgs.Empty);
}
}
}
20 changes: 20 additions & 0 deletions src/Starward/Services/Download/TokenBucketRateLimiterExtension.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System;
using System.Threading;
using System.Threading.RateLimiting;
using System.Runtime.CompilerServices;

namespace Starward.Services.Download;

internal static class TokenBucketRateLimiterExtension
{
public static int Acquire(this TokenBucketRateLimiter rateLimiter, int permits, out TimeSpan retryAfter)
{
var a = (int)Volatile.Read(ref PrivateGetTokenCount(rateLimiter));
int count = Math.Min(permits, a);
return rateLimiter.AttemptAcquire(count).TryGetMetadata(MetadataName.RetryAfter, out retryAfter) ? 0 : count;
}

// private double _tokenCount;
[UnsafeAccessor(UnsafeAccessorKind.Field, Name = "_tokenCount")]
private static extern ref double PrivateGetTokenCount(TokenBucketRateLimiter rateLimiter);
}

0 comments on commit af1414f

Please sign in to comment.