Skip to content

Commit

Permalink
[proxima-direct-core] #206 add support for sequential IDs to InMemSto…
Browse files Browse the repository at this point in the history
…rage
  • Loading branch information
je-ik committed Apr 27, 2021
1 parent b317034 commit d332fee
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import cz.o2.proxima.functional.UnaryFunction;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.storage.Partition;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.commitlog.Position;
import java.io.Serializable;
import java.net.URI;
Expand Down Expand Up @@ -305,6 +306,16 @@ default boolean hasExternalizableOffsets() {
return false;
}

/**
* Signals the user that this {@link CommitLogReader} is able to recover sequential IDs sent to it
* in {@link StreamElement StreamElements} through its associated writer.
*
* @return {@code true} if StreamElement read from the commit log contain valid sequential IDs.
*/
default boolean restoresSequentialIds() {
return false;
}

/**
* Convert instance of this reader to {@link Factory} suitable for serialization.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.o2.proxima.commitlog;
package cz.o2.proxima.direct.commitlog;

import static org.mockito.Mockito.*;

import cz.o2.proxima.direct.commitlog.CommitLogReader;
import cz.o2.proxima.direct.commitlog.CommitLogReaders;
import cz.o2.proxima.direct.commitlog.LogObserver;
import cz.o2.proxima.storage.Partition;
import cz.o2.proxima.storage.ThroughputLimiter;
import cz.o2.proxima.storage.commitlog.Position;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,11 @@ public List<Partition> getPartitions() {
return Collections.unmodifiableList(partitions);
}

@Override
public boolean restoresSequentialIds() {
return true;
}

@Override
public ObserveHandle observe(String name, Position position, LogObserver observer) {
return observe(name, position, false, observer);
Expand Down Expand Up @@ -1170,6 +1175,16 @@ private static <T> AttributeDescriptor<T> getAttributeOfEntity(
private static StreamElement cloneAndUpdateAttribute(
EntityDescriptor entity, StreamElement elem) {

if (elem.hasSequentialId()) {
return StreamElement.upsert(
entity,
getAttributeOfEntity(entity, elem),
elem.getSequentialId(),
elem.getKey(),
elem.getAttribute(),
elem.getStamp(),
elem.getValue());
}
return StreamElement.upsert(
entity,
getAttributeOfEntity(entity, elem),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import cz.o2.proxima.direct.batch.BatchLogReader;
import cz.o2.proxima.direct.commitlog.CommitLogReader;
import cz.o2.proxima.direct.commitlog.LogObserver;
import cz.o2.proxima.direct.commitlog.LogObserverUtils;
import cz.o2.proxima.direct.commitlog.ObserveHandle;
import cz.o2.proxima.direct.commitlog.Offset;
import cz.o2.proxima.direct.core.AttributeWriterBase;
Expand Down Expand Up @@ -57,6 +58,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Assert;
import org.junit.Test;

/** Test suite for {@link InMemStorage}. */
Expand Down Expand Up @@ -661,6 +663,50 @@ public void testCachedViewWithMultiplePartitions() {
accessor.getCachedView(direct.getContext());
}

@Test(timeout = 10000)
public void testReadWriteSequentialIds() throws InterruptedException {
InMemStorage storage = new InMemStorage();
DataAccessor accessor =
storage.createAccessor(
direct, createFamilyDescriptor(URI.create("inmem:///inmemstoragetest")));
CommitLogReader reader =
accessor
.getCommitLogReader(direct.getContext())
.orElseThrow(() -> new IllegalStateException("Missing commit log reader"));
AttributeWriterBase writer =
accessor
.getWriter(direct.getContext())
.orElseThrow(() -> new IllegalStateException("Missing writer"));

List<StreamElement> result = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(1);
LogObserver observer =
LogObserverUtils.toList(
result,
Assert::assertTrue,
elem -> {
latch.countDown();
return false;
});
reader.observe("test", observer);
writer
.online()
.write(
StreamElement.upsert(
entity,
data,
1L,
"key",
data.getName(),
System.currentTimeMillis(),
new byte[] {1, 2, 3}),
(succ, exc) -> {});
latch.await();
assertEquals(1, result.size());
assertTrue(result.get(0).hasSequentialId());
assertEquals(1L, result.get(0).getSequentialId());
}

private AttributeFamilyDescriptor createFamilyDescriptor(URI storageUri) {
return createFamilyDescriptor(storageUri, 1);
}
Expand Down

0 comments on commit d332fee

Please sign in to comment.