Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updating stats jobs to use new Azure SDK #10323

Open
wants to merge 39 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
16f1755
Fixing gallery build error
Nov 20, 2024
9155726
Testfix
Nov 20, 2024
fabc79f
Bugfix
Oct 28, 2024
f8c5829
Added TopLevel
Oct 22, 2024
4a4b0f1
Has right .net sdk version now
Lanaparezanin Nov 25, 2024
b14c3fc
Changed toplevel error
Lanaparezanin Nov 25, 2024
97e310d
Changed toplevel error pt 2
Lanaparezanin Nov 25, 2024
3d55f83
Changed toplevel error pt 3
Lanaparezanin Nov 25, 2024
b67bd58
Merge branch 'dev' into splitting-stats-pr
Lanaparezanin Dec 13, 2024
cbf5066
Bugfix
Lanaparezanin Dec 14, 2024
bb807d0
Bugfix2
Lanaparezanin Dec 14, 2024
03e111c
Added a comment
Lanaparezanin Dec 14, 2024
7693ca6
Clarified comment
Lanaparezanin Dec 16, 2024
e9a646c
AzureStatsLogDestination is now using new SDK
Lanaparezanin Dec 27, 2024
901b38d
Temp changes
Lanaparezanin Jan 14, 2025
572ca1d
Changes
Lanaparezanin Jan 16, 2025
710e178
AzureStatsLogSource.cs has been updated
Lanaparezanin Jan 16, 2025
626d2e6
Migrated more files and tests
Lanaparezanin Jan 17, 2025
bb56055
Final fixes?
Lanaparezanin Jan 18, 2025
28b8c53
Fixed failing tests
Lanaparezanin Jan 21, 2025
f6b50de
Fixed blobleasemanager problem
Lanaparezanin Jan 23, 2025
e69fee8
Fixing
Lanaparezanin Jan 23, 2025
d613266
Test fixes
Lanaparezanin Jan 23, 2025
faca4bf
Buildfix
Lanaparezanin Jan 23, 2025
c87896b
Fixed cancellation problem
Lanaparezanin Jan 24, 2025
8d3c01d
merging dev
Lanaparezanin Jan 24, 2025
8e68ca6
Latest changes
Lanaparezanin Jan 28, 2025
76dc679
Polished BlobLeaseService.cs
Lanaparezanin Jan 29, 2025
f5c36ef
Polished AzureBlobLockResult.cs
Lanaparezanin Jan 29, 2025
a176ce0
Polished AzureBlobLeaseManager.cs
Lanaparezanin Jan 29, 2025
477bb5f
Polished Job.cs
Lanaparezanin Jan 29, 2025
629cdce
Fixed CDNLogsSanitizer scripts
Lanaparezanin Jan 29, 2025
2403857
Polished tests
Lanaparezanin Jan 29, 2025
505096a
Fixed extra semi-colon
Lanaparezanin Jan 31, 2025
6a13b42
Moved throw to new line
Lanaparezanin Jan 31, 2025
3651f31
Moved throw to new line pt.2
Lanaparezanin Jan 31, 2025
47a5092
Changed to explicit type
Lanaparezanin Jan 31, 2025
b778307
Added more explicit types
Lanaparezanin Jan 31, 2025
84c4699
Nit fixes
Lanaparezanin Jan 31, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/NuGet.Services.Storage/BlobLeaseService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using Azure;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Specialized;
using Microsoft.Extensions.Logging;

namespace NuGet.Services.Storage
{
Expand All @@ -20,7 +21,6 @@ public class BlobLeaseService : IBlobLeaseService
{
private static readonly TimeSpan MinLeaseTime = TimeSpan.FromSeconds(15);
private static readonly TimeSpan MaxLeaseTime = TimeSpan.FromSeconds(60);

private readonly BlobContainerClient _containerClient;
private readonly string _basePath;

Expand All @@ -34,11 +34,10 @@ public BlobLeaseService(BlobServiceClient blobServiceClient, string containerNam
{
throw new ArgumentException("The container name must be provided.", nameof(containerName));
}
if (string.IsNullOrEmpty(basePath))
Lanaparezanin marked this conversation as resolved.
Show resolved Hide resolved
if (basePath == null)
{
throw new ArgumentException("The base path must be provided.", nameof(basePath));
}

_containerClient = blobServiceClient.GetBlobContainerClient(containerName);
_basePath = string.IsNullOrEmpty(basePath) ? string.Empty : basePath.TrimEnd('/') + '/';
}
Expand All @@ -65,7 +64,6 @@ public async Task<bool> ReleaseAsync(string resourceName, string leaseId, Cancel
var blob = GetBlob(resourceName);
var leaseClient = blob.GetBlobLeaseClient(leaseId);
await leaseClient.ReleaseAsync(conditions: null, cancellationToken: cancellationToken);

return true;
}
catch (RequestFailedException ex) when (ex.Status == (int)HttpStatusCode.Conflict)
Expand Down Expand Up @@ -94,6 +92,7 @@ public async Task<BlobLeaseResult> RenewAsync(string resourceName, string leaseI
try
{
var lease = await leaseClient.RenewAsync(conditions: null, cancellationToken: cancellationToken);

return BlobLeaseResult.Success(lease);
}
catch (RequestFailedException ex) when (ex.Status == (int)HttpStatusCode.Conflict || ex.Status == (int)HttpStatusCode.PreconditionFailed)
Expand Down Expand Up @@ -135,6 +134,7 @@ private async Task<BlobLeaseResult> TryAcquireAsync(BlobClient blob, TimeSpan le
{
var leaseClient = blob.GetBlobLeaseClient();
var blobLease = await leaseClient.AcquireAsync(leaseTime, conditions: null, cancellationToken: cancellationToken);

return BlobLeaseResult.Success(blobLease);
}
catch (RequestFailedException ex) when (ex.Status == (int)HttpStatusCode.Conflict)
Expand Down
146 changes: 79 additions & 67 deletions src/Stats.AzureCdnLogs.Common/AzureHelpers/AzureBlobLeaseManager.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using Azure.Storage.Blobs;
using Stats.AzureCdnLogs.Common.Collect;
using NuGet.Services.Storage;
using Azure.Storage.Blobs.Specialized;

namespace Stats.AzureCdnLogs.Common
{
Expand All @@ -19,98 +21,108 @@ public class AzureBlobLeaseManager
public const int MaxRenewPeriodInSeconds = 60;
// The lease will be renewed with a short interval before the the lease expires
public const int OverlapRenewPeriodInSeconds = 20;
private BlobRequestOptions _blobRequestOptions;
private readonly ILogger<AzureBlobLeaseManager> _logger;
private BlobLeaseService _blobLeaseService;

public AzureBlobLeaseManager(ILogger<AzureBlobLeaseManager> logger, BlobRequestOptions blobRequestOptions = null)
public AzureBlobLeaseManager(ILogger<AzureBlobLeaseManager> logger, BlobServiceClient blobServiceClient, string containerName, string basePath)
{
_blobRequestOptions = blobRequestOptions;
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_logger = logger ??
throw new ArgumentNullException(nameof(logger));
if (blobServiceClient == null) throw new ArgumentNullException(nameof(blobServiceClient));
Lanaparezanin marked this conversation as resolved.
Show resolved Hide resolved

if (string.IsNullOrEmpty(containerName))
{
if (containerName == null)
throw new ArgumentNullException(nameof(containerName));
else throw new ArgumentException(nameof(containerName));
}

_blobLeaseService = new BlobLeaseService(blobServiceClient, containerName, basePath);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change introduces a potential issue: AzureBlobLeaseManager can be constructed with a BlobServiceClient configured for a certain storage account with a certain container and base path, and then AcquireLease can be called passing a blob that might reside in a different storage account, or different container or not use the base path passed into AzureBlobLeaseManager constructor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All 3 added arguments (blobServiceClient, containerName, basePath) seem to exist only to be passed into a BlobLeaseService constructor. I suggest to change AzureBlobLeaseManager constructor to accept a IBlobLeaseService object instead and update the job setup accordingly.

Copy link
Contributor

@agr agr Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still, we need to sort out the blob validation. One way of doing that could be to add a TryAcquireAsync overload in BlobLeaseService that would accept a blob URL instead of a resourceName, validate its values against the object configuration then infer resourceName from URL and call the original implementation. Then this new method (passing in full blob URL) can be used in AzureBlobLeaseManager.AcquireLease.

}

/// <summary>
/// Try to acquire a lease on the blob. If the acquire is successful the lease will be renewed at every 60 seconds.
/// In order to stop the renew task the <see cref="Stats.AzureCdnLogs.Common.AzureBlobLeaseManager.TryReleaseLease(CloudBlob)"/> needs to be invoked
/// In order to stop the renew task the <see cref="Stats.AzureCdnLogs.Common.AzureBlobLeaseManager.TryReleaseLockAsync(AzureBlobLockResult)"/> needs to be invoked
/// or the token to be cancelled.
/// </summary>
/// <param name="blob">The blob to acquire the lease on.</param>
/// <param name="token">A token to cancel the operation.</param>
/// <param name="renewStatusTask">The renew task.</param>
/// <returns>True if the lease was acquired. </returns>
public AzureBlobLockResult AcquireLease(CloudBlob blob, CancellationToken token)
/// <returns>An <see cref="AzureBlobLockResult"/> indicating the result of the lease acquisition.
/// If the lease is successfully acquired, the result will contain the lease ID and a cancellation token
/// source that can be used to stop the lease renewal task.</returns>
public async Task<AzureBlobLockResult> AcquireLease(BlobClient blob)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this used outside of stats job world? Can we rename this to AcquireLeaseAsync without breaking anything? If not, ignore this comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't, seems like this is the only reference:
image

{
blob.FetchAttributes();
if (token.IsCancellationRequested || blob.Properties.LeaseStatus == LeaseStatus.Locked)
try
{
_logger.LogInformation("AcquireLease: The operation was cancelled or the blob lease is already taken. Blob {BlobUri}, Cancellation status {IsCancellationRequested}, BlobLeaseStatus {BlobLeaseStatus}.",
blob.Uri.AbsoluteUri,
token.IsCancellationRequested,
blob.Properties.LeaseStatus);
return AzureBlobLockResult.FailedLockResult(blob);
}
var proposedLeaseId = Guid.NewGuid().ToString();
var leaseId = blob.AcquireLease(TimeSpan.FromSeconds(MaxRenewPeriodInSeconds), proposedLeaseId);
var lockResult = new AzureBlobLockResult(blob: blob, lockIsTaken: true, leaseId: leaseId, linkToken: token);

//start a task that will renew the lease until the token is cancelled or the Release methods was invoked
var renewStatusTask = new Task( (lockresult) =>
BlobLeaseResult leaseResult = await _blobLeaseService.TryAcquireAsync(blob.Name, TimeSpan.FromSeconds(MaxRenewPeriodInSeconds), CancellationToken.None);
if (!leaseResult.IsSuccess)
Copy link
Contributor

@erdembayar erdembayar Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Please invert the order. Please check the success first then it's easier to read.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check the success before what?

{
var blobLockResult = (AzureBlobLockResult)lockresult;
_logger.LogInformation("RenewLeaseTask: Started for BlobUri {BlobUri}. ThreadId {ThreadId}. IsCancellationRequested {IsCancellationRequested}. LeaseId {LeaseId}",
blob.Uri.AbsoluteUri,
Thread.CurrentThread.ManagedThreadId,
blobLockResult.BlobOperationToken.IsCancellationRequested,
blobLockResult.LeaseId);
_logger.LogInformation("AcquireLease: The operation was cancelled or the blob lease is already taken. Blob {BlobUri}.", blob.Uri.AbsoluteUri);
return AzureBlobLockResult.FailedLockResult(blob);
}
else
{
_logger.LogInformation("AcquireLease: Lease was acquired for BlobUri {BlobUri}.", blob.Uri.AbsoluteUri);
}

int sleepBeforeRenewInSeconds = MaxRenewPeriodInSeconds - OverlapRenewPeriodInSeconds < 0 ? MaxRenewPeriodInSeconds : MaxRenewPeriodInSeconds - OverlapRenewPeriodInSeconds;
if (!blobLockResult.BlobOperationToken.IsCancellationRequested)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why we had this check in the original code. Technically, it is checking for a different thing than the while loop below, but I think the check in while loop would cover all cases this check would test. Any ideas?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Starting a task might be an expensive operation, so it's beneficial to check before entering the task to improve performance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no starting tasks between line 66 and 68. They literally are run one after another.

string leaseId = leaseResult.LeaseId;
var lockResult = new AzureBlobLockResult(blob, true, leaseId, CancellationToken.None);
BlobClient leasedBlob = lockResult.Blob;
// Start a task that will renew the lease until the token is cancelled or the Release method is invoked
_ = Task.Run(async () =>
{
while (!lockResult.BlobOperationToken.Token.IsCancellationRequested)
{
while (!blobLockResult.BlobOperationToken.Token.IsCancellationRequested)
try
{
Thread.Sleep(sleepBeforeRenewInSeconds * 1000);

//it will renew the lease only if the lease was not explicitly released
try
{
if (!blobLockResult.Blob.Exists())
{
blobLockResult.BlobOperationToken.Cancel();
break;
}
AccessCondition acc = new AccessCondition { LeaseId = blobLockResult.LeaseId };
blob.RenewLease(accessCondition: acc, options: _blobRequestOptions, operationContext: null);
_logger.LogInformation("RenewLeaseTask: Lease was renewed for BlobUri {BlobUri} and LeaseId {LeaseId}.",
blob.Uri.AbsoluteUri,
blobLockResult.LeaseId);
}
catch (StorageException exception)
await Task.Delay(TimeSpan.FromSeconds(MaxRenewPeriodInSeconds - OverlapRenewPeriodInSeconds), lockResult.BlobOperationToken.Token);
Lanaparezanin marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not pass cancellation token into Task.Delay. It will throw if cancelled while waiting (unlike Thread.Sleep in the original code).

if (!await leasedBlob.ExistsAsync())
{
_logger.LogWarning(LogEvents.FailedBlobLease, exception, "RenewLeaseTask: The Lease could not be renewed for BlobUri {BlobUri}. ExpectedLeaseId {LeaseId}. CurrentLeaseId {CurrentLeaseId}.",
blob.Uri.AbsoluteUri,
leaseId,
blobLockResult.LeaseId);
blobLockResult.BlobOperationToken.Cancel();
break;
}
await _blobLeaseService.RenewAsync(blob.Name, leaseId, CancellationToken.None);
_logger.LogInformation("RenewLeaseTask: Lease was renewed for BlobUri {BlobUri} and LeaseId {LeaseId}.",
blob.Uri.AbsoluteUri,
leaseId);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "RenewLeaseTask: The Lease could not be renewed for BlobUri {BlobUri}. LeaseId {LeaseId}.",
blob.Uri.AbsoluteUri,
leaseId);
lockResult.BlobOperationToken.Cancel();
break;
}
}
}, lockResult, TaskCreationOptions.LongRunning);
renewStatusTask.Start();
return lockResult;



}, lockResult.BlobOperationToken.Token);
return lockResult;
}
catch (Exception ex)
{
_logger.LogError(ex, "AcquireLeaseAsync: Failed to acquire lease for BlobUri {BlobUri}.", blob.Uri.AbsoluteUri);
return AzureBlobLockResult.FailedLockResult(blob);
}

}

public async Task<AsyncOperationResult> TryReleaseLockAsync(AzureBlobLockResult releaseLock)
{
try
{
AccessCondition acc = new AccessCondition();
acc.LeaseId = releaseLock.LeaseId;
if(await releaseLock.Blob.ExistsAsync())
if (await releaseLock.Blob.ExistsAsync())
{
await releaseLock.Blob.ReleaseLeaseAsync(acc, options: _blobRequestOptions, operationContext: null);
releaseLock.BlobOperationToken.Cancel();
_logger.LogInformation("ReleaseLockAsync: ReleaseLeaseStatus: {LeaseReleased} on the {BlobUri}.", true, releaseLock.Blob.Uri);
return new AsyncOperationResult(true, null);
bool releaseResult = await _blobLeaseService.ReleaseAsync(releaseLock.Blob.Name, releaseLock.LeaseId, releaseLock.BlobOperationToken.Token);
if (releaseResult)
{
_logger.LogInformation("ReleaseLockAsync: ReleaseLeaseStatus: {LeaseReleased} on the {BlobUri}.", true, releaseLock.Blob.Uri);
return new AsyncOperationResult(true, null);
}
else
{
return new AsyncOperationResult(false, null);
}
}
else
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Threading;
using Microsoft.WindowsAzure.Storage.Blob;
using Azure.Storage.Blobs;

namespace Stats.AzureCdnLogs.Common
{
Expand All @@ -13,15 +13,15 @@ public class AzureBlobLockResult : IDisposable

public string LeaseId { get; }

public CloudBlob Blob { get; }
public BlobClient Blob { get; }

/// <summary>
/// It will be cancelled when the renew task could not renew the lease.
/// Operations can listen to this cancellation to stop execution once the lease could not be renewed.
/// </summary>
public CancellationTokenSource BlobOperationToken { get; }

public AzureBlobLockResult(CloudBlob blob, bool lockIsTaken, string leaseId, CancellationToken linkToken)
public AzureBlobLockResult(BlobClient blob, bool lockIsTaken, string leaseId, CancellationToken linkToken)
{
Blob = blob ?? throw new ArgumentNullException(nameof(blob));
LockIsTaken = lockIsTaken;
Expand All @@ -30,7 +30,7 @@ public AzureBlobLockResult(CloudBlob blob, bool lockIsTaken, string leaseId, Can
LeaseId = leaseId;
}

public static AzureBlobLockResult FailedLockResult(CloudBlob blob)
public static AzureBlobLockResult FailedLockResult(BlobClient blob)
{
return new AzureBlobLockResult(blob: blob, lockIsTaken: false, leaseId: null, linkToken: CancellationToken.None);
}
Expand Down
Loading