Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Autosnapshoting #58

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion demo/demo-in-memory/src/main/java/com/example/demo/Bank.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import akka.actor.ActorSystem;
import com.fasterxml.uuid.Generators;
import com.fasterxml.uuid.impl.TimeBasedGenerator;
import fr.maif.eventsourcing.AutoSnapshotingStrategy;
import fr.maif.eventsourcing.EventProcessorImpl;
import fr.maif.eventsourcing.ProcessingSuccess;
import fr.maif.eventsourcing.TransactionManager;
Expand Down Expand Up @@ -33,7 +34,7 @@ public Bank(ActorSystem actorSystem,
this.eventProcessor = new EventProcessorImpl<>(
eventStore,
transactionManager,
new DefaultAggregateStore<>(eventStore, eventHandler, actorSystem, transactionManager),
new DefaultAggregateStore<>(new AutoSnapshotingStrategy.NoOpSnapshotingStrategy(), eventStore, eventHandler, actorSystem, transactionManager),
commandHandler,
eventHandler,
List.empty()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.example.demo;

import fr.maif.concurrent.CompletionStages;
import fr.maif.eventsourcing.AutoSnapshotingStrategy;
import fr.maif.eventsourcing.EventHandler;
import fr.maif.eventsourcing.EventStore;
import fr.maif.eventsourcing.TransactionManager;
Expand All @@ -20,7 +21,7 @@
public class BankAggregateStore extends DefaultAggregateStore<Account, BankEvent, Tuple0, Tuple0, Connection> {

public BankAggregateStore(EventStore<Connection, BankEvent, Tuple0, Tuple0> eventStore, EventHandler<Account, BankEvent> eventEventHandler, TransactionManager<Connection> transactionManager) {
super(eventStore, eventEventHandler, transactionManager);
super(new AutoSnapshotingStrategy.NoOpSnapshotingStrategy(), eventStore, eventEventHandler, transactionManager);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ public class DefaultAggregateStore<S extends State<S>, E extends Event, Meta, Co

private final Materializer materializer;

public DefaultAggregateStore(EventStore<TxCtx, E, Meta, Context> eventStore, EventHandler<S, E> eventEventHandler, ActorSystem system, TransactionManager<TxCtx> transactionManager) {
this(eventStore, eventEventHandler, Materializer.createMaterializer(system), transactionManager);
public DefaultAggregateStore(AutoSnapshotingStrategy autoSnapshotingStrategy, EventStore<TxCtx, E, Meta, Context> eventStore, EventHandler<S, E> eventEventHandler, ActorSystem system, TransactionManager<TxCtx> transactionManager) {
this(autoSnapshotingStrategy, eventStore, eventEventHandler, Materializer.createMaterializer(system), transactionManager);
}

public DefaultAggregateStore(EventStore<TxCtx, E, Meta, Context> eventStore, EventHandler<S, E> eventEventHandler, Materializer materializer, TransactionManager<TxCtx> transactionManager) {
super(eventStore, eventEventHandler, transactionManager);
public DefaultAggregateStore(AutoSnapshotingStrategy autoSnapshotingStrategy, EventStore<TxCtx, E, Meta, Context> eventStore, EventHandler<S, E> eventEventHandler, Materializer materializer, TransactionManager<TxCtx> transactionManager) {
super(autoSnapshotingStrategy, eventStore, eventEventHandler, transactionManager);
this.materializer = materializer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ private EventProcessorImpl<String, Viking, VikingCommand, VikingEvent, Tuple0, S
return new EventProcessorImpl<>(
inMemoryEventStore,
fakeTransactionManager,
new DefaultAggregateStore<>(inMemoryEventStore, vikingEventHandler, actorSystem, fakeTransactionManager),
new DefaultAggregateStore<>(new AutoSnapshotingStrategy.NoOpSnapshotingStrategy(), inMemoryEventStore, vikingEventHandler, actorSystem, fakeTransactionManager),
new VikingCommandHandler(),
vikingEventHandler,
projections
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import fr.maif.Helpers.VikingEvent;
import fr.maif.akka.eventsourcing.DefaultAggregateStore;
import fr.maif.concurrent.CompletionStages;
import fr.maif.eventsourcing.AutoSnapshotingStrategy;
import fr.maif.eventsourcing.EventEnvelope;
import fr.maif.eventsourcing.EventProcessorTest;
import fr.maif.eventsourcing.EventStore;
Expand Down Expand Up @@ -65,7 +66,7 @@ class DefaultAggregateStoreTest {
void testReloadEventAndBuildAggregateWithoutSnapshots() {

EventStore<Tuple0, VikingEvent, Tuple0, Tuple0> eventStore = mock(EventStore.class);
DefaultAggregateStore<Viking, VikingEvent, Tuple0, Tuple0, Tuple0> aggregateStore = new DefaultAggregateStore<>(eventStore, new Helpers.VikingEventHandler(), actorSystem, new EventProcessorTest.FakeTransactionManager());
DefaultAggregateStore<Viking, VikingEvent, Tuple0, Tuple0, Tuple0> aggregateStore = new DefaultAggregateStore<>(new AutoSnapshotingStrategy.NoOpSnapshotingStrategy(), eventStore, new Helpers.VikingEventHandler(), actorSystem, new EventProcessorTest.FakeTransactionManager());

Query query = Query.builder().withEntityId(entityId).build();
when(eventStore.loadEventsByQuery(Tuple(), query)).thenReturn(Source.from(List.of(eventEnvelope1, eventEnvelope2)).runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), actorSystem));
Expand All @@ -80,7 +81,7 @@ void testReloadEventAndBuildAggregateWithoutSnapshots() {
void testReloadEventAndBuildAggregateWithSnapshots() {

EventStore<Tuple0, VikingEvent, Tuple0, Tuple0> eventStore = mock(EventStore.class);
DefaultAggregateStore<Viking, VikingEvent, Tuple0, Tuple0, Tuple0> aggregateStore = spy(new DefaultAggregateStore<Viking, VikingEvent, Tuple0, Tuple0, Tuple0>(eventStore, new Helpers.VikingEventHandler(), actorSystem, new EventProcessorTest.FakeTransactionManager()) {
DefaultAggregateStore<Viking, VikingEvent, Tuple0, Tuple0, Tuple0> aggregateStore = spy(new DefaultAggregateStore<Viking, VikingEvent, Tuple0, Tuple0, Tuple0>(new AutoSnapshotingStrategy.NoOpSnapshotingStrategy(), eventStore, new Helpers.VikingEventHandler(), actorSystem, new EventProcessorTest.FakeTransactionManager()) {
@Override
public CompletionStage<Option<Viking>> getSnapshot(Tuple0 transactionContext, String id) {
return CompletionStages.successful(Option.some(new Viking(id, "Rollo", 1L)));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
package fr.maif.reactor.eventsourcing;

import fr.maif.eventsourcing.AggregateStore;
import fr.maif.eventsourcing.Event;
import fr.maif.eventsourcing.EventHandler;
import fr.maif.eventsourcing.EventStore;
import fr.maif.eventsourcing.State;
import fr.maif.eventsourcing.TransactionManager;
import fr.maif.eventsourcing.*;
import fr.maif.eventsourcing.impl.AbstractDefaultAggregateStore;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
Expand All @@ -16,8 +11,8 @@
public class DefaultAggregateStore<S extends State<S>, E extends Event, Meta, Context, TxCtx> extends AbstractDefaultAggregateStore<S, E, Meta, Context, TxCtx> implements AggregateStore<S, String, TxCtx> {


public DefaultAggregateStore(EventStore<TxCtx, E, Meta, Context> eventStore, EventHandler<S, E> eventEventHandler, TransactionManager<TxCtx> transactionManager) {
super(eventStore, eventEventHandler, transactionManager);
public DefaultAggregateStore(AutoSnapshotingStrategy autoSnapshotingStrategy, EventStore<TxCtx, E, Meta, Context> eventStore, EventHandler<S, E> eventEventHandler, TransactionManager<TxCtx> transactionManager) {
super(autoSnapshotingStrategy, eventStore, eventEventHandler, transactionManager);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import fr.maif.Helpers.Viking;
import fr.maif.Helpers.VikingEvent;
import fr.maif.concurrent.CompletionStages;
import fr.maif.eventsourcing.AutoSnapshotingStrategy;
import fr.maif.eventsourcing.EventEnvelope;
import fr.maif.eventsourcing.EventStore;
import fr.maif.eventsourcing.EventStore.Query;
Expand Down Expand Up @@ -64,7 +65,7 @@ class DefaultAggregateStoreTest {
void testReloadEventAndBuildAggregateWithoutSnapshots() {

EventStore<Tuple0, VikingEvent, Tuple0, Tuple0> eventStore = mock(EventStore.class);
DefaultAggregateStore<Viking, VikingEvent, Tuple0, Tuple0, Tuple0> aggregateStore = new DefaultAggregateStore<>(eventStore, new Helpers.VikingEventHandler(), new EventProcessorTest.FakeTransactionManager());
DefaultAggregateStore<Viking, VikingEvent, Tuple0, Tuple0, Tuple0> aggregateStore = new DefaultAggregateStore<>(new AutoSnapshotingStrategy.NoOpSnapshotingStrategy(), eventStore, new Helpers.VikingEventHandler(), new EventProcessorTest.FakeTransactionManager());

Query query = Query.builder().withEntityId(entityId).build();
when(eventStore.loadEventsByQuery(Tuple(), query)).thenReturn(Flux.fromIterable(List.of(eventEnvelope1, eventEnvelope2)));
Expand All @@ -79,7 +80,7 @@ void testReloadEventAndBuildAggregateWithoutSnapshots() {
void testReloadEventAndBuildAggregateWithSnapshots() {

EventStore<Tuple0, VikingEvent, Tuple0, Tuple0> eventStore = mock(EventStore.class);
DefaultAggregateStore<Viking, VikingEvent, Tuple0, Tuple0, Tuple0> aggregateStore = spy(new DefaultAggregateStore<Viking, VikingEvent, Tuple0, Tuple0, Tuple0>(eventStore, new Helpers.VikingEventHandler(), new EventProcessorTest.FakeTransactionManager()) {
DefaultAggregateStore<Viking, VikingEvent, Tuple0, Tuple0, Tuple0> aggregateStore = spy(new DefaultAggregateStore<Viking, VikingEvent, Tuple0, Tuple0, Tuple0>(new AutoSnapshotingStrategy.NoOpSnapshotingStrategy(), eventStore, new Helpers.VikingEventHandler(), new EventProcessorTest.FakeTransactionManager()) {
@Override
public CompletionStage<Option<Viking>> getSnapshot(Tuple0 transactionContext, String id) {
return CompletionStages.successful(Option.some(new Viking(id, "Rollo", 30, 1L)));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
package fr.maif.reactor.eventsourcing;

import fr.maif.eventsourcing.AggregateStore;
import fr.maif.eventsourcing.CommandHandler;
import fr.maif.eventsourcing.EventEnvelope;
import fr.maif.eventsourcing.EventProcessorImpl;
import fr.maif.eventsourcing.Events;
import fr.maif.eventsourcing.ProcessingSuccess;
import fr.maif.eventsourcing.Projection;
import fr.maif.eventsourcing.TransactionManager;
import fr.maif.eventsourcing.*;
import fr.maif.reactor.eventsourcing.DefaultAggregateStore;
import fr.maif.reactor.eventsourcing.InMemoryEventStore;
import io.vavr.API;
Expand Down Expand Up @@ -283,7 +276,7 @@ private EventProcessorImpl<String, Viking, VikingCommand, VikingEvent, Tuple0, S
return new EventProcessorImpl<>(
inMemoryEventStore,
fakeTransactionManager,
new DefaultAggregateStore<>(inMemoryEventStore, vikingEventHandler, fakeTransactionManager),
new DefaultAggregateStore<>(new AutoSnapshotingStrategy.NoOpSnapshotingStrategy(), inMemoryEventStore, vikingEventHandler, fakeTransactionManager),
new VikingCommandHandler(),
vikingEventHandler,
projections
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package fr.maif.eventsourcing;

public interface AutoSnapshoting {



}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package fr.maif.eventsourcing;

public interface AutoSnapshotingStrategy {
boolean shouldSnapshot(Integer eventCount);


record NoOpSnapshotingStrategy() implements AutoSnapshotingStrategy {
@Override
public boolean shouldSnapshot(Integer eventCount) {
return false;
}
}

record CountSnapshotingStrategy(int count) implements AutoSnapshotingStrategy {
@Override
public boolean shouldSnapshot(Integer eventCount) {
return eventCount > count;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,25 +1,23 @@
package fr.maif.eventsourcing.impl;

import fr.maif.eventsourcing.AggregateStore;
import fr.maif.eventsourcing.Event;
import fr.maif.eventsourcing.EventEnvelope;
import fr.maif.eventsourcing.EventHandler;
import fr.maif.eventsourcing.EventStore;
import fr.maif.eventsourcing.State;
import fr.maif.eventsourcing.TransactionManager;
import fr.maif.eventsourcing.*;
import io.vavr.control.Option;
import org.reactivestreams.Publisher;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;

public abstract class AbstractDefaultAggregateStore<S extends State<S>, E extends Event, Meta, Context, TxCtx> implements AggregateStore<S, String, TxCtx> {

private final AutoSnapshotingStrategy autoSnapshotingStrategy;
private final EventStore<TxCtx, E, Meta, Context> eventStore;
private final EventHandler<S, E> eventEventHandler;
private final TransactionManager<TxCtx> transactionManager;

public AbstractDefaultAggregateStore(EventStore<TxCtx, E, Meta, Context> eventStore, EventHandler<S, E> eventEventHandler, TransactionManager<TxCtx> transactionManager) {
public AbstractDefaultAggregateStore(AutoSnapshotingStrategy autoSnapshotingStrategy, EventStore<TxCtx, E, Meta, Context> eventStore, EventHandler<S, E> eventEventHandler, TransactionManager<TxCtx> transactionManager) {
this.autoSnapshotingStrategy = autoSnapshotingStrategy;
this.eventStore = eventStore;
this.eventEventHandler = eventEventHandler;
this.transactionManager = transactionManager;
Expand All @@ -42,13 +40,22 @@ public CompletionStage<Option<S>> getAggregate(TxCtx ctx, String entityId) {
// If a snapshot is defined, we read events from seq num of the snapshot :
s -> EventStore.Query.builder().withSequenceFrom(s.sequenceNum()).withEntityId(entityId).build()
);

AtomicInteger eventCount = new AtomicInteger(0);
return fold(this.eventStore.loadEventsByQuery(ctx, query),
mayBeSnapshot,
(Option<S> mayBeState, EventEnvelope<E, Meta, Context> event) ->
this.eventEventHandler.applyEvent(mayBeState, event.event)
.map((S state) -> (S) state.withSequenceNum(event.sequenceNum))
);
(Option<S> mayBeState, EventEnvelope<E, Meta, Context> event) -> {
eventCount.incrementAndGet();
return this.eventEventHandler.applyEvent(mayBeState, event.event)
.map((S state) -> (S) state.withSequenceNum(event.sequenceNum));
}
).thenCompose(mayBeAggregate -> {
if (autoSnapshotingStrategy.shouldSnapshot(eventCount.get())) {
return this.storeSnapshot(ctx, entityId, mayBeAggregate)
.thenApply(__ -> mayBeAggregate)
.exceptionally(e -> mayBeAggregate);
}
return CompletableFuture.completedStage(mayBeAggregate);
});
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public PostgresKafkaEventProcessorConfig(
metaFormat,
contextFormat
);
this.aggregateStore = aggregateStore == null ? new DefaultAggregateStore<>(this.eventStore, eventHandler, system, transactionManager) : aggregateStore;
this.aggregateStore = aggregateStore == null ? new DefaultAggregateStore<>(new AutoSnapshotingStrategy.NoOpSnapshotingStrategy(), this.eventStore, eventHandler, system, transactionManager) : aggregateStore;
this.commandHandler = commandHandler;
this.eventHandler = eventHandler;
this.projections = projections;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,14 @@ public BuilderWithAggregateStore<S, E, Meta, Context> withAggregateStore(Aggrega
}

public BuilderWithAggregateStore<S, E, Meta, Context> withDefaultAggregateStore() {
return withDefaultAggregateStore(new AutoSnapshotingStrategy.NoOpSnapshotingStrategy());
}

public BuilderWithAggregateStore<S, E, Meta, Context> withDefaultAggregateStore(int snapshotEveryEventCount) {
return withDefaultAggregateStore(new AutoSnapshotingStrategy.CountSnapshotingStrategy(snapshotEveryEventCount));
}

public BuilderWithAggregateStore<S, E, Meta, Context> withDefaultAggregateStore(AutoSnapshotingStrategy autoSnapshotingStrategy) {
return new BuilderWithAggregateStore<>(
system,
pgAsyncPool,
Expand All @@ -378,7 +386,7 @@ public BuilderWithAggregateStore<S, E, Meta, Context> withDefaultAggregateStore(
concurrentReplayStrategy,
eventStore,
eventHandler,
new DefaultAggregateStore<>(eventStore, eventHandler, system, transactionManager));
new DefaultAggregateStore<>(autoSnapshotingStrategy, eventStore, eventHandler, system, transactionManager));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,14 @@ public BuilderWithAggregateStore<S, E, Meta, Context> withAggregateStore(Aggrega
}

public BuilderWithAggregateStore<S, E, Meta, Context> withDefaultAggregateStore() {
return withDefaultAggregateStore(new AutoSnapshotingStrategy.NoOpSnapshotingStrategy());
}

public BuilderWithAggregateStore<S, E, Meta, Context> withDefaultAggregateStore(int snapshotEveryEventCount) {
return withDefaultAggregateStore(new AutoSnapshotingStrategy.CountSnapshotingStrategy(snapshotEveryEventCount));
}

public BuilderWithAggregateStore<S, E, Meta, Context> withDefaultAggregateStore(AutoSnapshotingStrategy autoSnapshotingStrategy) {
return new BuilderWithAggregateStore<>(
pgAsyncPool,
tableNames,
Expand All @@ -349,7 +357,7 @@ public BuilderWithAggregateStore<S, E, Meta, Context> withDefaultAggregateStore(
concurrentReplayStrategy,
eventStore,
eventHandler,
new DefaultAggregateStore<>(eventStore, eventHandler, transactionManager));
new DefaultAggregateStore<>(autoSnapshotingStrategy, eventStore, eventHandler, transactionManager));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public PostgresKafkaEventProcessorConfig(
metaFormat,
contextFormat
);
this.aggregateStore = aggregateStore == null ? new DefaultAggregateStore<>(this.eventStore, eventHandler, transactionManager) : aggregateStore;
this.aggregateStore = aggregateStore == null ? new DefaultAggregateStore<>(new AutoSnapshotingStrategy.NoOpSnapshotingStrategy(), this.eventStore, eventHandler, transactionManager) : aggregateStore;
this.commandHandler = commandHandler;
this.eventHandler = eventHandler;
this.projections = projections;
Expand Down Expand Up @@ -154,7 +154,7 @@ public PostgresKafkaEventProcessorConfig(
this.eventHandler = eventHandler;
this.projections = projections;
this.eventPublisher = eventPublisher;
this.aggregateStore = new DefaultAggregateStore<>(this.eventStore, eventHandler, transactionManager);
this.aggregateStore = new DefaultAggregateStore<>(new AutoSnapshotingStrategy.NoOpSnapshotingStrategy(), this.eventStore, eventHandler, transactionManager);
}
}
}
Loading