From 4c6bd9d19a713d3be644330dd5160fe31b47ce38 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Wed, 5 Oct 2022 19:47:57 -0700 Subject: [PATCH] [C#] FasterLog CommitAsync wait if no new commit done (#753) --- cs/src/core/FasterLog/FasterLog.cs | 60 +++++++++++++++++++++++------- 1 file changed, 47 insertions(+), 13 deletions(-) diff --git a/cs/src/core/FasterLog/FasterLog.cs b/cs/src/core/FasterLog/FasterLog.cs index 17dcfcbbd..d9d0d6297 100644 --- a/cs/src/core/FasterLog/FasterLog.cs +++ b/cs/src/core/FasterLog/FasterLog.cs @@ -931,14 +931,29 @@ public bool CommitStrongly(out long commitTail, out long actualCommitNum, bool s public async ValueTask CommitAsync(CancellationToken token = default) { token.ThrowIfCancellationRequested(); + + // Take a lower-bound of the content of this commit in case our request is filtered but we need to wait + var tail = TailAddress; + var lastCommit = commitNum; + var task = CommitTask; - if (!CommitInternal(out var tailAddress, out var actualCommitNum, true, null, -1, null)) - return; + var success = CommitInternal(out var actualTail, out var actualCommitNum, true, null, -1, null); - while (CommittedUntilAddress < tailAddress || persistedCommitNum < actualCommitNum) + if (success) { - var linkedCommitInfo = await task.WithCancellationAsync(token).ConfigureAwait(false); - task = linkedCommitInfo.NextTask; + while (CommittedUntilAddress < actualTail || persistedCommitNum < actualCommitNum) + { + var linkedCommitInfo = await task.WithCancellationAsync(token).ConfigureAwait(false); + task = linkedCommitInfo.NextTask; + } + } + else + { + while (CommittedUntilAddress < tail || persistedCommitNum < lastCommit) + { + var linkedCommitInfo = await task.WithCancellationAsync(token).ConfigureAwait(false); + task = linkedCommitInfo.NextTask; + } } } @@ -951,18 +966,37 @@ public async ValueTask CommitAsync(CancellationToken token = default) public async ValueTask> CommitAsync(Task prevCommitTask, CancellationToken token = default) { token.ThrowIfCancellationRequested(); + + // Take a lower-bound of the content of this commit in case our request is filtered but we need to spin + var tail = TailAddress; + var lastCommit = commitNum; + if (prevCommitTask == null) prevCommitTask = CommitTask; - if (!CommitInternal(out var tailAddress, out var actualCommitNum, true, null, -1, null)) - return prevCommitTask; + var success = CommitInternal(out var actualTail, out var actualCommitNum, true, null, -1, null); - while (CommittedUntilAddress < tailAddress || persistedCommitNum < actualCommitNum) + + if (success) { - var linkedCommitInfo = await prevCommitTask.WithCancellationAsync(token).ConfigureAwait(false); - if (linkedCommitInfo.CommitInfo.UntilAddress < tailAddress || persistedCommitNum < actualCommitNum) - prevCommitTask = linkedCommitInfo.NextTask; - else - return linkedCommitInfo.NextTask; + while (CommittedUntilAddress < actualTail || persistedCommitNum < actualCommitNum) + { + var linkedCommitInfo = await prevCommitTask.WithCancellationAsync(token).ConfigureAwait(false); + if (linkedCommitInfo.CommitInfo.UntilAddress < actualTail || persistedCommitNum < actualCommitNum) + prevCommitTask = linkedCommitInfo.NextTask; + else + return linkedCommitInfo.NextTask; + } + } + else + { + while (CommittedUntilAddress < tail || persistedCommitNum < lastCommit) + { + var linkedCommitInfo = await prevCommitTask.WithCancellationAsync(token).ConfigureAwait(false); + if (linkedCommitInfo.CommitInfo.UntilAddress < actualTail || persistedCommitNum < actualCommitNum) + prevCommitTask = linkedCommitInfo.NextTask; + else + return linkedCommitInfo.NextTask; + } } return prevCommitTask;