Skip to content

Commit

Permalink
Fix ContinuePendingRead to account for new records for the same key t…
Browse files Browse the repository at this point in the history
…hat were added and then went to dusk during the pending Read operation. (#911)
  • Loading branch information
TedHartMS authored Apr 16, 2024
1 parent dc4bc1d commit e9e56ed
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 21 deletions.
12 changes: 9 additions & 3 deletions cs/src/core/Index/Common/Contexts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Runtime.CompilerServices;
Expand Down Expand Up @@ -156,6 +155,7 @@ internal struct PendingContext<Input, Output, Context>
internal const ushort kNoOpFlags = 0;
internal const ushort kNoKey = 0x0001;
internal const ushort kIsAsync = 0x0002;
internal const ushort kHadStartAddress = 0x0004;

internal ReadCopyOptions readCopyOptions;

Expand Down Expand Up @@ -216,6 +216,12 @@ internal bool IsAsync
set => operationFlags = value ? (ushort)(operationFlags | kIsAsync) : (ushort)(operationFlags & ~kIsAsync);
}

internal bool HadStartAddress
{
get => (operationFlags & kHadStartAddress) != 0;
set => operationFlags = value ? (ushort)(operationFlags | kHadStartAddress) : (ushort)(operationFlags & ~kHadStartAddress);
}

internal long InitialEntryAddress
{
get => recordInfo.PreviousAddress;
Expand Down Expand Up @@ -555,7 +561,7 @@ internal void Recover(Guid token, ICheckpointManager checkpointManager, DeltaLog
using StreamReader s = new(new MemoryStream(metadata));
Initialize(s);
}

/// <summary>
/// Recover info from token
/// </summary>
Expand All @@ -582,7 +588,7 @@ internal void Recover(Guid token, ICheckpointManager checkpointManager, out byte
deltaTailAddress = deltaLog.NextAddress;
}
var cookie = s.ReadToEnd();
commitCookie = cookie.Length == 0 ? null : Convert.FromBase64String(cookie);
commitCookie = cookie.Length == 0 ? null : Convert.FromBase64String(cookie);
}

/// <summary>
Expand Down
45 changes: 35 additions & 10 deletions cs/src/core/Index/FASTER/Implementation/ContinuePending.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// Licensed under the MIT license.

using System.Diagnostics;
using System.Runtime.CompilerServices;

namespace FASTER.core
{
Expand Down Expand Up @@ -48,16 +47,42 @@ internal OperationStatus ContinuePendingRead<Input, Output, Context, FasterSessi
Debug.Fail("Expected to FindTag in InternalContinuePendingRead");
stackCtx.SetRecordSourceToHashEntry(hlog);

// During the pending operation, a record for the key may have been added to the log or readcache.
ref var value = ref hlog.GetContextRecordValue(ref request);
if (TryFindRecordInMemory(ref key, ref stackCtx, ref pendingContext))

// During the pending operation, a record for the key may have been added to the log or readcache. If we had a StartAddress we ignore this.
if (!pendingContext.HadStartAddress)
{
srcRecordInfo = ref stackCtx.recSrc.GetInfo();
if (TryFindRecordInMemory(ref key, ref stackCtx, ref pendingContext))
{
srcRecordInfo = ref stackCtx.recSrc.GetInfo();

// V threads cannot access V+1 records. Use the latest logical address rather than the traced address (logicalAddress) per comments in AcquireCPRLatchRMW.
if (fasterSession.Ctx.phase == Phase.PREPARE && IsEntryVersionNew(ref stackCtx.hei.entry))
return OperationStatus.CPR_SHIFT_DETECTED; // Pivot thread; retry
value = ref stackCtx.recSrc.GetValue();
// V threads cannot access V+1 records. Use the latest logical address rather than the traced address (logicalAddress) per comments in AcquireCPRLatchRMW.
if (fasterSession.Ctx.phase == Phase.PREPARE && IsEntryVersionNew(ref stackCtx.hei.entry))
return OperationStatus.CPR_SHIFT_DETECTED; // Pivot thread; retry
value = ref stackCtx.recSrc.GetValue();
}
else
{
// We didn't find a record for the key in memory, but if recSrc.LogicalAddress (which is the .PreviousAddress of the lowest record
// above InitialLatestLogicalAddress we could reach) is > InitialLatestLogicalAddress, then it means InitialLatestLogicalAddress is
// now below HeadAddress and there is at least one record below HeadAddress but above InitialLatestLogicalAddress. Reissue the Read(),
// using the LogicalAddress we just found as minAddress. We will either find an in-memory version of the key that was added after the
// TryFindRecordInMemory we just did, or do IO and find the record we just found or one above it. Read() updates InitialLatestLogicalAddress,
// so if we do IO, the next time we come to CompletePendingRead we will only search for a newer version of the key in any records added
// after our just-completed TryFindRecordInMemory.
if (stackCtx.recSrc.LogicalAddress > pendingContext.InitialLatestLogicalAddress
&& (!pendingContext.HasMinAddress || stackCtx.recSrc.LogicalAddress >= pendingContext.minAddress))
{
OperationStatus internalStatus;
do
{
internalStatus = InternalRead(ref key, pendingContext.keyHash, ref pendingContext.input.Get(), ref pendingContext.output,
startAddress: Constants.kInvalidAddress, ref pendingContext.userContext, ref pendingContext, fasterSession, pendingContext.serialNum);
}
while (HandleImmediateRetryStatus(internalStatus, fasterSession, ref pendingContext));
return internalStatus;
}
}
}

if (!TryTransientSLock<Input, Output, Context, FasterSession>(fasterSession, ref key, ref stackCtx, out var status))
Expand Down Expand Up @@ -193,7 +218,7 @@ internal OperationStatus ContinuePendingRMW<Input, Output, Context, FasterSessio
// above InitialLatestLogicalAddress we could reach) is > InitialLatestLogicalAddress, then it means InitialLatestLogicalAddress is
// now below HeadAddress and there is at least one record below HeadAddress but above InitialLatestLogicalAddress. We must do InternalRMW.
if (stackCtx.recSrc.LogicalAddress > pendingContext.InitialLatestLogicalAddress)
{
{
Debug.Assert(pendingContext.InitialLatestLogicalAddress < hlog.HeadAddress, "Failed to search all in-memory records");
break;
}
Expand All @@ -217,7 +242,7 @@ internal OperationStatus ContinuePendingRMW<Input, Output, Context, FasterSessio
TransientXUnlock<Input, Output, Context, FasterSession>(fasterSession, ref key, ref stackCtx);
}

// Must do this *after* Unlocking. Retries should drop down to InternalRMW
// Must do this *after* Unlocking. Retries should drop down to InternalRMW
CheckRetry:
if (!HandleImmediateRetryStatus(status, fasterSession, ref pendingContext))
return status;
Expand Down
1 change: 1 addition & 0 deletions cs/src/core/Index/FASTER/Implementation/InternalRead.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ internal OperationStatus InternalRead<Input, Output, Context, FasterSession>(ref
{
if (startAddress < hlog.BeginAddress)
return OperationStatus.NOTFOUND;
pendingContext.HadStartAddress = true;
stackCtx.hei.entry.Address = startAddress;
}

Expand Down
173 changes: 165 additions & 8 deletions cs/test/CompletePendingTests.cs
Original file line number Diff line number Diff line change
@@ -1,27 +1,45 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using System.Collections.Generic;
using System.Threading.Tasks;
using FASTER.core;
using NUnit.Framework;
using System.Collections.Generic;
using System.Threading.Tasks;
using static FASTER.test.TestUtils;

namespace FASTER.test
{
public struct LocalKeyStructComparer : IFasterEqualityComparer<KeyStruct>
{
internal long? forceCollisionHash;

public long GetHashCode64(ref KeyStruct key)
{
return forceCollisionHash.HasValue ? forceCollisionHash.Value : Utility.GetHashCode(key.kfield1);
}
public bool Equals(ref KeyStruct k1, ref KeyStruct k2)
{
return k1.kfield1 == k2.kfield1 && k1.kfield2 == k2.kfield2;
}

public override string ToString() => $"forceHashCollision: {forceCollisionHash}";
}

[TestFixture]
class CompletePendingTests
{
private FasterKV<KeyStruct, ValueStruct> fht;
private IDevice log;
LocalKeyStructComparer comparer = new();

[SetUp]
public void Setup()
{
// Clean up log files from previous test runs in case they weren't cleaned up
TestUtils.DeleteDirectory(TestUtils.MethodTestDir, wait:true);
DeleteDirectory(MethodTestDir, wait: true);

log = Devices.CreateLogDevice($"{TestUtils.MethodTestDir}/CompletePendingTests.log", preallocateFile: true, deleteOnClose: true);
fht = new FasterKV<KeyStruct, ValueStruct>(128, new LogSettings { LogDevice = log, MemorySizeBits = 29 });
log = Devices.CreateLogDevice($"{MethodTestDir}/CompletePendingTests.log", preallocateFile: true, deleteOnClose: true);
fht = new FasterKV<KeyStruct, ValueStruct>(128, new LogSettings { LogDevice = log, MemorySizeBits = 29 }, comparer: comparer);
}

[TearDown]
Expand All @@ -31,15 +49,15 @@ public void TearDown()
fht = null;
log?.Dispose();
log = null;
TestUtils.DeleteDirectory(TestUtils.MethodTestDir, wait: true);
DeleteDirectory(MethodTestDir, wait: true);
}

const int numRecords = 1000;

static KeyStruct NewKeyStruct(int key) => new() { kfield1 = key, kfield2 = key + numRecords * 10 };
static ValueStruct NewValueStruct(int key) => new() { vfield1 = key, vfield2 = key + numRecords * 10 };

static InputStruct NewInputStruct(int key) => new(){ ifield1 = key + numRecords * 30, ifield2 = key + numRecords * 40 };
static InputStruct NewInputStruct(int key) => new() { ifield1 = key + numRecords * 30, ifield2 = key + numRecords * 40 };
static ContextStruct NewContextStruct(int key) => new() { cfield1 = key + numRecords * 50, cfield2 = key + numRecords * 60 };

static void VerifyStructs(int key, ref KeyStruct keyStruct, ref InputStruct inputStruct, ref OutputStruct outputStruct, ref ContextStruct contextStruct, bool useRMW)
Expand Down Expand Up @@ -126,7 +144,7 @@ internal static void VerifyOneNotFound(CompletedOutputIterator<KeyStruct, ValueS

[Test]
[Category("FasterKV")]
public async ValueTask ReadAndCompleteWithPendingOutput([Values]bool useRMW, [Values]bool isAsync)
public async ValueTask ReadAndCompleteWithPendingOutput([Values] bool useRMW, [Values] bool isAsync)
{
using var session = fht.For(new FunctionsWithContext<ContextStruct>()).NewSession<FunctionsWithContext<ContextStruct>>();
Assert.IsNull(session.completedOutputs); // Do not instantiate until we need it
Expand Down Expand Up @@ -214,5 +232,144 @@ public async ValueTask ReadAndCompleteWithPendingOutput([Values]bool useRMW, [Va
Assert.AreEqual(address, recordMetadata.Address);
}
}

public enum StartAddressMode
{
UseStartAddress,
NoStartAddress
}

public class PendingReadFunctions<TContext> : FunctionsBase<KeyStruct, ValueStruct, InputStruct, OutputStruct, Empty>
{
public override void ReadCompletionCallback(ref KeyStruct key, ref InputStruct input, ref OutputStruct output, Empty ctx, Status status, RecordMetadata recordMetadata)
{
Assert.IsTrue(status.Found);
Assert.AreEqual(key.kfield1, output.value.vfield1);
// Do not compare field2; that's our updated value, and the key won't be found if we change kfield2
}

// Read functions
public override bool SingleReader(ref KeyStruct key, ref InputStruct input, ref ValueStruct value, ref OutputStruct dst, ref ReadInfo readInfo)
{
Assert.IsFalse(readInfo.RecordInfo.IsNull());
dst.value = value;
return true;
}

public override bool ConcurrentReader(ref KeyStruct key, ref InputStruct input, ref ValueStruct value, ref OutputStruct dst, ref ReadInfo readInfo)
=> SingleReader(ref key, ref input, ref value, ref dst, ref readInfo);
}

[Test]
[Category("FasterKV")]
public void ReadPendingWithNewSameKey([Values] StartAddressMode startAddressMode, [Values(FlushMode.NoFlush, FlushMode.OnDisk)] FlushMode secondRecordFlushMode)
{
const int valueMult = 1000;

using var session = fht.For(new PendingReadFunctions<ContextStruct>()).NewSession<PendingReadFunctions<ContextStruct>>();

// Store off startAddress before initial upsert
var startAddress = startAddressMode == StartAddressMode.UseStartAddress ? fht.Log.TailAddress : Constants.kInvalidAddress;

// Insert first record
var firstValue = 0; // same as key
var keyStruct = new KeyStruct { kfield1 = firstValue, kfield2 = firstValue * valueMult };
var valueStruct = new ValueStruct { vfield1 = firstValue, vfield2 = firstValue * valueMult };
session.Upsert(ref keyStruct, ref valueStruct);

// Flush to make the Read() go pending.
fht.Log.FlushAndEvict(wait: true);

ReadOptions readOptions = new() { StartAddress = startAddress };
var (status, outputStruct) = session.Read(keyStruct, ref readOptions);
Assert.IsTrue(status.IsPending, $"Expected status.IsPending: {status}");

// Insert next record with the same key and flush this too if requested.
var secondValue = firstValue + 1;
valueStruct.vfield2 = secondValue * valueMult;
session.Upsert(ref keyStruct, ref valueStruct);
if (secondRecordFlushMode == FlushMode.OnDisk)
fht.Log.FlushAndEvict(wait: true);

session.CompletePendingWithOutputs(out var completedOutputs, wait: true);
(status, outputStruct) = GetSinglePendingResult(completedOutputs);

if (startAddressMode == StartAddressMode.UseStartAddress)
Assert.AreEqual(firstValue * valueMult, outputStruct.value.vfield2, "UseStartAddress should have returned first value");
else
Assert.AreEqual(secondValue * valueMult, outputStruct.value.vfield2, "NoStartAddress should have returned second value");
}

[Test]
[Category("FasterKV")]
public void ReadPendingWithNewDifferentKeyInChain([Values] StartAddressMode startAddressMode, [Values(FlushMode.NoFlush, FlushMode.OnDisk)] FlushMode secondRecordFlushMode)
{
const int valueMult = 1000;

using var session = fht.For(new PendingReadFunctions<ContextStruct>()).NewSession<PendingReadFunctions<ContextStruct>>();

// Store off startAddress before initial upsert
var startAddress = startAddressMode == StartAddressMode.UseStartAddress ? fht.Log.TailAddress : Constants.kInvalidAddress;

// Insert first record
var firstValue = 0; // same as key
var keyStruct = new KeyStruct { kfield1 = firstValue, kfield2 = firstValue * valueMult };
var valueStruct = new ValueStruct { vfield1 = firstValue, vfield2 = firstValue * valueMult };
session.Upsert(ref keyStruct, ref valueStruct);

// Force collisions to test having another key in the chain
comparer.forceCollisionHash = keyStruct.GetHashCode64(ref keyStruct);

// Flush to make the Read() go pending.
fht.Log.FlushAndEvict(wait: true);

ReadOptions readOptions = new() { StartAddress = startAddress };
var (status, outputStruct) = session.Read(keyStruct, ref readOptions);
Assert.IsTrue(status.IsPending, $"Expected status.IsPending: {status}");

// Insert next record with a different key and flush this too if requested.
var secondValue = firstValue + 1;
keyStruct = new() { kfield1 = secondValue, kfield2 = secondValue * valueMult };
valueStruct = new() { vfield1 = secondValue, vfield2 = secondValue * valueMult };
session.Upsert(ref keyStruct, ref valueStruct);
if (secondRecordFlushMode == FlushMode.OnDisk)
fht.Log.FlushAndEvict(wait: true);

session.CompletePendingWithOutputs(out var completedOutputs, wait: true);
(status, outputStruct) = GetSinglePendingResult(completedOutputs);

Assert.AreEqual(firstValue * valueMult, outputStruct.value.vfield2, "Should have returned first value");
}

[Test]
[Category("FasterKV")]
public void ReadPendingWithNoNewKey([Values] StartAddressMode startAddressMode)
{
// Basic test of pending read
const int valueMult = 1000;

using var session = fht.For(new PendingReadFunctions<ContextStruct>()).NewSession<PendingReadFunctions<ContextStruct>>();

// Store off startAddress before initial upsert
var startAddress = startAddressMode == StartAddressMode.UseStartAddress ? fht.Log.TailAddress : Constants.kInvalidAddress;

// Insert first record
var firstValue = 0; // same as key
var keyStruct = new KeyStruct { kfield1 = firstValue, kfield2 = firstValue * valueMult };
var valueStruct = new ValueStruct { vfield1 = firstValue, vfield2 = firstValue * valueMult };
session.Upsert(ref keyStruct, ref valueStruct);

// Flush to make the Read() go pending.
fht.Log.FlushAndEvict(wait: true);

ReadOptions readOptions = new() { StartAddress = startAddress };
var (status, outputStruct) = session.Read(keyStruct, ref readOptions);
Assert.IsTrue(status.IsPending, $"Expected status.IsPending: {status}");

session.CompletePendingWithOutputs(out var completedOutputs, wait: true);
(status, outputStruct) = GetSinglePendingResult(completedOutputs);

Assert.AreEqual(firstValue * valueMult, outputStruct.value.vfield2, "Should have returned first value");
}
}
}

0 comments on commit e9e56ed

Please sign in to comment.