Skip to content

Commit

Permalink
Move the RenewedTime and thus the start of workItemTimeout to after a…
Browse files Browse the repository at this point in the history
…n item has been dequeued. (#12)
  • Loading branch information
waffles authored and ejsmith committed Aug 1, 2019
1 parent b3a6304 commit 837a27e
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 4 deletions.
6 changes: 3 additions & 3 deletions src/Foundatio.Redis/Queues/RedisQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private async Task EnsureMaintenanceRunningAsync() {
if (IsMaintenanceRunning)
return;

using (await _lock.LockAsync().AnyContext()) {
using (await _lock.LockAsync().AnyContext()) {
if (_maintenanceTask != null)
return;

Expand Down Expand Up @@ -245,7 +245,6 @@ protected override void StartWorkingImpl(Func<IQueueEntry<T>, CancellationToken,
protected override async Task<IQueueEntry<T>> DequeueImplAsync(CancellationToken linkedCancellationToken) {
bool isTraceLogLevelEnabled = _logger.IsEnabled(LogLevel.Trace);
if (isTraceLogLevelEnabled) _logger.LogTrace("Queue {Name} dequeuing item...", _options.Name);
long now = SystemClock.UtcNow.Ticks;

if (!IsMaintenanceRunning)
await EnsureMaintenanceRunningAsync().AnyContext();
Expand Down Expand Up @@ -277,6 +276,7 @@ protected override async Task<IQueueEntry<T>> DequeueImplAsync(CancellationToken
return null;

var wiTimeoutTtl = GetWorkItemTimeoutTimeTtl();
long now = SystemClock.UtcNow.Ticks;
await Run.WithRetriesAsync(() => Task.WhenAll(
_cache.SetAsync(GetDequeuedTimeKey(value), now, wiTimeoutTtl),
_cache.SetAsync(GetRenewedTimeKey(value), now, wiTimeoutTtl)
Expand Down Expand Up @@ -482,7 +482,7 @@ private async Task TrimDeadletterItemsAsync(int maxItems) {
var itemIds = (await Database.ListRangeAsync(_deadListName).AnyContext()).Skip(maxItems);
var tasks = new List<Task>();
foreach (var id in itemIds) {
tasks.AddRange(new Task[] {
tasks.AddRange(new Task[] {
Database.KeyDeleteAsync(GetPayloadKey(id)),
Database.KeyDeleteAsync(GetAttemptsKey(id)),
Database.KeyDeleteAsync(GetEnqueuedTimeKey(id)),
Expand Down
41 changes: 40 additions & 1 deletion tests/Foundatio.Redis.Tests/Queues/RedisQueueTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ public async Task CanTrimDeadletterItems() {

var workItemIds = new List<string>();
for (int i = 0; i < 10; i++) {
string id = await queue.EnqueueAsync(new SimpleWorkItem {Data = "blah", Id = i});
string id = await queue.EnqueueAsync(new SimpleWorkItem { Data = "blah", Id = i });
_logger.LogTrace(id);
workItemIds.Add(id);
}
Expand All @@ -370,6 +370,45 @@ public async Task CanTrimDeadletterItems() {
}
}

[Fact]
public async Task VerifyFirstDequeueTimeout() {

var workItemTimeout = TimeSpan.FromMilliseconds(100);
var itemData = "blah";
var itemId = 1;

var queue = GetQueue(retries: 0, workItemTimeout: workItemTimeout, retryDelay: TimeSpan.Zero, runQueueMaintenance: false) as RedisQueue<SimpleWorkItem>;
if (queue == null)
return;

using (queue) {
// Start DequeueAsync but allow it to yield.
var itemTask = queue.DequeueAsync();

// Wait longer than the workItemTimeout.
// This is the period between a queue having DequeueAsync called on it and the first item being enqueued.
await SystemClock.SleepAsync(workItemTimeout.Add(TimeSpan.FromMilliseconds(1)));

// Add an item. DequeueAsync can now return.
string id = await queue.EnqueueAsync(new SimpleWorkItem {
Data = itemData,
Id = itemId
});

// Run DoMaintenanceWorkAsync to verify that our item will not be auto-abandoned.
await queue.DoMaintenanceWorkAsync();

// Completing the item will throw if the item is abandoned.
var item = await itemTask;
await item.CompleteAsync();

var value = item.Value;
Assert.NotNull(value);
Assert.Equal(itemData, value.Data);
Assert.Equal(itemId, value.Id);
}
}

// TODO: Need to write tests that verify the cache data is correct after each operation.

[Fact(Skip = "Performance Test")]
Expand Down

0 comments on commit 837a27e

Please sign in to comment.