Skip to content

Commit

Permalink
IGNITE-23597 Cache latest term values in log manager (#5005)
Browse files Browse the repository at this point in the history
  • Loading branch information
ibessonov authored Jan 9, 2025
1 parent 3739f46 commit 4f7f341
Show file tree
Hide file tree
Showing 3 changed files with 247 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.raft.storage;

import static org.apache.ignite.internal.util.IgniteUtils.isPow2;

import org.apache.ignite.raft.jraft.entity.LogId;

/**
* Cyclic buffer to cache several last term values for log storage.
*/
public class TermCache {
private final int mask;
private final long[] indexes;
private final long[] terms;

// Head position. -1 means the cache is empty.
private int head = -1;

// Tail position. Might be equal to head if the cache only has a single term.
private int tail;

/**
* Constructor.
*
* @param capacity Cache capacity. Must be a power of 2. Should be a small value, term update is a rare operation.
*/
public TermCache(int capacity) {
assert isPow2(capacity) : "Capacity must be a power of 2";

this.mask = capacity - 1;
this.indexes = new long[capacity];
this.terms = new long[capacity];
}

/**
* Should be called when appending a new log entry.
*/
public void append(LogId id) {
if (isEmpty()) {
head = 0;
indexes[tail] = id.getIndex();
terms[tail] = id.getTerm();

return;
}

// Term has not changed, nothing to update.
if (terms[tail] == id.getTerm()) {
return;
}

tail = next(tail);
indexes[tail] = id.getIndex();
terms[tail] = id.getTerm();

// Handle buffer overflow by moving head to the next position.
if (tail == head) {
head = next(head);
}
}

private int prev(int i) {
return (i - 1) & mask;
}

private int next(int i) {
return (i + 1) & mask;
}

private boolean isEmpty() {
return head == -1;
}

private int findIndex(long idx) {
// Could be replaced with a binary search, but why bother for such a small cache.
for (int i = tail; i != head; i = prev(i)) {
if (idx >= indexes[i]) {
return i;
}
}

return head;
}

/**
* Lookup term for the given index. Returns {@code -1} if the index is not found in the cache.
*/
public long lookup(long idx) {
if (isEmpty() || idx < indexes[head]) {
return -1;
}

return terms[findIndex(idx)];
}

/**
* Resets the cache to the initial state.
*/
public void reset() {
head = -1;
tail = 0;
}

/**
* Truncates the cache to the given index, deleting all information for indexes greater than or equal to the given one.
*/
public void truncateTail(long idx) {
if (isEmpty() || idx < indexes[head]) {
reset();

return;
}

tail = findIndex(idx);

if (indexes[tail] == idx) {
if (head == tail) {
reset();
} else {
tail = prev(tail);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.ignite.raft.jraft.storage.impl;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.RingBuffer;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -30,13 +29,13 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.raft.storage.TermCache;
import org.apache.ignite.raft.jraft.FSMCaller;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.conf.Configuration;
import org.apache.ignite.raft.jraft.conf.ConfigurationEntry;
import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
import org.apache.ignite.raft.jraft.core.NodeMetrics;
import org.apache.ignite.raft.jraft.disruptor.DisruptorEventType;
import org.apache.ignite.raft.jraft.disruptor.NodeIdAware;
import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.entity.EnumOutter.EntryType;
Expand All @@ -59,7 +58,6 @@
import org.apache.ignite.raft.jraft.util.DisruptorMetricSet;
import org.apache.ignite.raft.jraft.util.Requires;
import org.apache.ignite.raft.jraft.util.SegmentList;
import org.apache.ignite.raft.jraft.util.ThreadHelper;
import org.apache.ignite.raft.jraft.util.Utils;

/**
Expand All @@ -83,6 +81,7 @@ public class LogManagerImpl implements LogManager {
private LogId diskId = new LogId(0, 0); // Last log entry written to disk.
private LogId appliedId = new LogId(0, 0);
private final SegmentList<LogEntry> logsInMemory = new SegmentList<>(true);
private final TermCache termCache = new TermCache(8);
private volatile long firstLogIndex;
private volatile long lastLogIndex;
private volatile LogId lastSnapshotId = new LogId(0, 0);
Expand Down Expand Up @@ -321,6 +320,10 @@ public void appendEntries(final List<LogEntry> entries, final StableClosure done
if (!entries.isEmpty()) {
done.setFirstLogIndex(entries.get(0).getId().getIndex());
this.logsInMemory.addAll(entries);

for (LogEntry entry : entries) {
this.termCache.append(entry.getId());
}
}
done.setEntries(entries);

Expand Down Expand Up @@ -821,9 +824,10 @@ public long getTerm(final long index) {
if (index > this.lastLogIndex || index < this.firstLogIndex) {
return 0;
}
final LogEntry entry = getEntryFromMemory(index);
if (entry != null) {
return entry.getId().getTerm();

long term = termCache.lookup(index);
if (term != -1) {
return term;
}
}
finally {
Expand Down Expand Up @@ -913,10 +917,12 @@ private long unsafeGetTerm(final long index) {
if (index > this.lastLogIndex || index < this.firstLogIndex) {
return 0;
}
final LogEntry entry = getEntryFromMemory(index);
if (entry != null) {
return entry.getId().getTerm();

long term = termCache.lookup(index);
if (term != -1) {
return term;
}

return getTermFromLogStorage(index);
}

Expand Down Expand Up @@ -1029,6 +1035,7 @@ private boolean reset(final long nextLogIndex) {
this.writeLock.lock();
try {
this.logsInMemory.clear();
this.termCache.reset();
this.firstLogIndex = nextLogIndex;
this.lastLogIndex = nextLogIndex - 1;
this.configManager.truncatePrefix(this.firstLogIndex);
Expand All @@ -1050,6 +1057,7 @@ private void unsafeTruncateSuffix(final long lastIndexKept, final Lock lock) {
}

this.logsInMemory.removeFromLastWhen(entry -> entry.getId().getIndex() > lastIndexKept);
termCache.truncateTail(lastIndexKept + 1);

this.lastLogIndex = lastIndexKept;
final long lastTermKept = unsafeGetTerm(lastIndexKept);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.raft.storage;


import static org.junit.jupiter.api.Assertions.assertEquals;

import org.apache.ignite.raft.jraft.entity.LogId;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class TermCacheTest {
private TermCache termCache;

@BeforeEach
void setUp() {
termCache = new TermCache(4);
}

@Test
void testAppendAndLookup() {
termCache.append(new LogId(1, 1));
termCache.append(new LogId(2, 2));
termCache.append(new LogId(3, 3));

assertEquals(-1, termCache.lookup(0));
assertEquals(1, termCache.lookup(1));
assertEquals(2, termCache.lookup(2));
assertEquals(3, termCache.lookup(3));
assertEquals(3, termCache.lookup(4));
}

@Test
void testAppendSameTerm() {
termCache.append(new LogId(1, 1));
termCache.append(new LogId(2, 1));

assertEquals(1, termCache.lookup(1));
assertEquals(1, termCache.lookup(2));
}

@Test
void testReset() {
termCache.append(new LogId(1, 1));
termCache.reset();

assertEquals(-1, termCache.lookup(1));
}

@Test
void testTruncateTail() {
termCache.append(new LogId(1, 1));
termCache.append(new LogId(2, 1));
termCache.append(new LogId(3, 2));

termCache.truncateTail(2);

assertEquals(1, termCache.lookup(1));
assertEquals(1, termCache.lookup(2));
assertEquals(1, termCache.lookup(3));
}

@Test
void testTruncateTailExactMatch() {
termCache.append(new LogId(1, 1));
termCache.append(new LogId(2, 2));
termCache.append(new LogId(3, 2));

termCache.truncateTail(2);

assertEquals(1, termCache.lookup(1));
assertEquals(1, termCache.lookup(2));
assertEquals(1, termCache.lookup(3));
}
}

0 comments on commit 4f7f341

Please sign in to comment.