From c90754e9af3599f45f6c5661c9a0890791d23405 Mon Sep 17 00:00:00 2001 From: CLFutureX <775523362@qq.com> Date: Fri, 2 Aug 2024 17:14:23 +0800 Subject: [PATCH 1/3] feat(s3stream):Separate persistence settings from response handling --- .../main/java/com/automq/stream/s3/S3Storage.java | 15 ++++++++++----- .../com/automq/stream/s3/WalWriteRequest.java | 10 +++++++++- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index ff5ebeab81..9014d54576 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -612,16 +612,23 @@ private void handleAppendRequest(WalWriteRequest request) { } private void handleAppendCallback(WalWriteRequest request) { - suppress(() -> handleAppendCallback0(request), LOGGER); + suppress(() -> { + final long startTime = System.nanoTime(); + handleAppendCallback0(request); + StorageOperationStats.getInstance().appendCallbackStats.record(TimerUtil.durationElapsedAs(startTime, TimeUnit.NANOSECONDS)); + }, LOGGER); } private void handleAppendCallback0(WalWriteRequest request) { - final long startTime = System.nanoTime(); + request.persisted(); List waitingAckRequests; Lock lock = getStreamCallbackLock(request.record.getStreamId()); lock.lock(); try { waitingAckRequests = callbackSequencer.after(request); + if (waitingAckRequests.isEmpty()) { + return; + } waitingAckRequests.forEach(r -> r.record.retain()); for (WalWriteRequest waitingAckRequest : waitingAckRequests) { boolean full = deltaWALCache.put(waitingAckRequest.record); @@ -637,7 +644,6 @@ private void handleAppendCallback0(WalWriteRequest request) { for (WalWriteRequest waitingAckRequest : waitingAckRequests) { waitingAckRequest.cf.complete(null); } - StorageOperationStats.getInstance().appendCallbackStats.record(TimerUtil.durationElapsedAs(startTime, TimeUnit.NANOSECONDS)); } private Lock getStreamCallbackLock(long streamId) { @@ -970,7 +976,6 @@ public void before(WalWriteRequest request) { * @return popped sequence persisted request. */ public List after(WalWriteRequest request) { - request.persisted = true; // Try to pop sequential persisted requests from the queue. long streamId = request.record.getStreamId(); @@ -987,7 +992,7 @@ public List after(WalWriteRequest request) { for (; ; ) { peek = streamRequests.peek(); - if (peek == null || !peek.persisted) { + if (peek == null || !peek.isPersisted()) { break; } poll = streamRequests.poll(); diff --git a/s3stream/src/main/java/com/automq/stream/s3/WalWriteRequest.java b/s3stream/src/main/java/com/automq/stream/s3/WalWriteRequest.java index 7a86575018..59c6f482ac 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/WalWriteRequest.java +++ b/s3stream/src/main/java/com/automq/stream/s3/WalWriteRequest.java @@ -28,7 +28,7 @@ public class WalWriteRequest implements Comparable { * * @see S3Storage.WALCallbackSequencer */ - boolean persisted; + private volatile boolean persisted; /** * Whether the record has been put to the {@link LogCache} @@ -54,6 +54,14 @@ public int compareTo(WalWriteRequest o) { return record.compareTo(o.record); } + public void persisted() { + this.persisted = true; + } + + public boolean isPersisted() { + return this.persisted; + } + @Override public String toString() { return "WalWriteRequest{" + From d2029603afea30db5f3358ad08f258c895cc11d2 Mon Sep 17 00:00:00 2001 From: CLFutureX <775523362@qq.com> Date: Fri, 2 Aug 2024 17:21:19 +0800 Subject: [PATCH 2/3] feat(s3stream):separate persistence settings from response handling From dc9d9dda78d516de17aa066a557a29cca49a876b Mon Sep 17 00:00:00 2001 From: CLFutureX <775523362@qq.com> Date: Sat, 3 Aug 2024 09:08:10 +0800 Subject: [PATCH 3/3] feat(s3stream):persistence settings encapsulation and lock scope optimization --- .../java/com/automq/stream/s3/S3Storage.java | 80 +++++++++---------- 1 file changed, 40 insertions(+), 40 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index 83c65587cb..7e3091c996 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -80,6 +80,12 @@ public class S3Storage implements Storage { private static final FastReadFailFastException FAST_READ_FAIL_FAST_EXCEPTION = new FastReadFailFastException(); private static final int NUM_STREAM_CALLBACK_LOCKS = 128; + /** + * Stream callback locks. Used to ensure the stream callbacks will not be called concurrently. + * + * @see #handleAppendCallback + */ + private final static Lock[] STREAM_CALLBACK_LOCKS = IntStream.range(0, NUM_STREAM_CALLBACK_LOCKS).mapToObj(i -> new ReentrantLock()).toArray(Lock[]::new); private final long maxDeltaWALCacheSize; private final Config config; private final WriteAheadLog deltaWAL; @@ -88,7 +94,7 @@ public class S3Storage implements Storage { */ private final LogCache deltaWALCache; /** - * WAL out of order callback sequencer. {@link #streamCallbackLocks} will ensure the memory safety. + * WAL out of order callback sequencer. {@link #STREAM_CALLBACK_LOCKS} will ensure the memory safety. */ private final WALCallbackSequencer callbackSequencer = new WALCallbackSequencer(); private final WALConfirmOffsetCalculator confirmOffsetCalculator = new WALConfirmOffsetCalculator(); @@ -111,12 +117,7 @@ public class S3Storage implements Storage { private final ObjectManager objectManager; private final ObjectStorage objectStorage; private final S3BlockCache blockCache; - /** - * Stream callback locks. Used to ensure the stream callbacks will not be called concurrently. - * - * @see #handleAppendCallback - */ - private final Lock[] streamCallbackLocks = IntStream.range(0, NUM_STREAM_CALLBACK_LOCKS).mapToObj(i -> new ReentrantLock()).toArray(Lock[]::new); + private final HashedWheelTimer timeoutDetect = new HashedWheelTimer( ThreadUtils.createThreadFactory("storage-timeout-detect", true), 1, TimeUnit.SECONDS, 100); private long lastLogTimestamp = 0L; @@ -621,34 +622,27 @@ private void handleAppendCallback(WalWriteRequest request) { } private void handleAppendCallback0(WalWriteRequest request) { - request.persisted(); - List waitingAckRequests; - Lock lock = getStreamCallbackLock(request.record.getStreamId()); - lock.lock(); - try { - waitingAckRequests = callbackSequencer.after(request); - if (waitingAckRequests.isEmpty()) { - return; - } - waitingAckRequests.forEach(r -> r.record.retain()); - for (WalWriteRequest waitingAckRequest : waitingAckRequests) { - boolean full = deltaWALCache.put(waitingAckRequest.record); - waitingAckRequest.confirmed = true; - if (full) { - // cache block is full, trigger WAL upload. - uploadDeltaWAL(); - } + + List waitingAckRequests = callbackSequencer.after(request); + if (waitingAckRequests.isEmpty()) { + return; + } + for (WalWriteRequest waitingAckRequest : waitingAckRequests) { + waitingAckRequest.record.retain(); + boolean full = deltaWALCache.put(waitingAckRequest.record); + waitingAckRequest.confirmed = true; + if (full) { + // cache block is full, trigger WAL upload. + uploadDeltaWAL(); } - } finally { - lock.unlock(); } for (WalWriteRequest waitingAckRequest : waitingAckRequests) { waitingAckRequest.cf.complete(null); } } - private Lock getStreamCallbackLock(long streamId) { - return streamCallbackLocks[(int) ((streamId & Long.MAX_VALUE) % NUM_STREAM_CALLBACK_LOCKS)]; + private static Lock getStreamCallbackLock(long streamId) { + return STREAM_CALLBACK_LOCKS[(int) ((streamId & Long.MAX_VALUE) % NUM_STREAM_CALLBACK_LOCKS)]; } @SuppressWarnings("UnusedReturnValue") @@ -977,6 +971,7 @@ public void before(WalWriteRequest request) { * @return popped sequence persisted request. */ public List after(WalWriteRequest request) { + request.persisted(); // Try to pop sequential persisted requests from the queue. long streamId = request.record.getStreamId(); @@ -985,21 +980,26 @@ public List after(WalWriteRequest request) { if (peek == null || peek.offset != request.offset) { return Collections.emptyList(); } - LinkedList rst = new LinkedList<>(); - WalWriteRequest poll = streamRequests.poll(); - assert poll == peek; - rst.add(poll); - - for (; ; ) { - peek = streamRequests.peek(); - if (peek == null || !peek.isPersisted()) { - break; - } - poll = streamRequests.poll(); + Lock lock = getStreamCallbackLock(request.record.getStreamId()); + lock.lock(); + try { + WalWriteRequest poll = streamRequests.poll(); assert poll == peek; - assert poll.record.getBaseOffset() == rst.getLast().record.getLastOffset(); rst.add(poll); + + for (; ; ) { + peek = streamRequests.peek(); + if (peek == null || !peek.isPersisted()) { + break; + } + poll = streamRequests.poll(); + assert poll == peek; + assert poll.record.getBaseOffset() == rst.getLast().record.getLastOffset(); + rst.add(poll); + } + } finally { + lock.unlock(); } return rst;