diff --git a/demo/demo-in-memory/src/main/java/com/example/demo/Bank.java b/demo/demo-in-memory/src/main/java/com/example/demo/Bank.java index 4a8fa6a7..c2f8a3cf 100644 --- a/demo/demo-in-memory/src/main/java/com/example/demo/Bank.java +++ b/demo/demo-in-memory/src/main/java/com/example/demo/Bank.java @@ -3,6 +3,7 @@ import akka.actor.ActorSystem; import com.fasterxml.uuid.Generators; import com.fasterxml.uuid.impl.TimeBasedGenerator; +import fr.maif.eventsourcing.AutoSnapshotingStrategy; import fr.maif.eventsourcing.EventProcessorImpl; import fr.maif.eventsourcing.ProcessingSuccess; import fr.maif.eventsourcing.TransactionManager; @@ -33,7 +34,7 @@ public Bank(ActorSystem actorSystem, this.eventProcessor = new EventProcessorImpl<>( eventStore, transactionManager, - new DefaultAggregateStore<>(eventStore, eventHandler, actorSystem, transactionManager), + new DefaultAggregateStore<>(new AutoSnapshotingStrategy.NoOpSnapshotingStrategy(), eventStore, eventHandler, actorSystem, transactionManager), commandHandler, eventHandler, List.empty() diff --git a/demo/demo-postgres-kafka/src/main/java/com/example/demo/BankAggregateStore.java b/demo/demo-postgres-kafka/src/main/java/com/example/demo/BankAggregateStore.java index 08596616..ff3236e4 100644 --- a/demo/demo-postgres-kafka/src/main/java/com/example/demo/BankAggregateStore.java +++ b/demo/demo-postgres-kafka/src/main/java/com/example/demo/BankAggregateStore.java @@ -1,6 +1,7 @@ package com.example.demo; import fr.maif.concurrent.CompletionStages; +import fr.maif.eventsourcing.AutoSnapshotingStrategy; import fr.maif.eventsourcing.EventHandler; import fr.maif.eventsourcing.EventStore; import fr.maif.eventsourcing.TransactionManager; @@ -20,7 +21,7 @@ public class BankAggregateStore extends DefaultAggregateStore { public BankAggregateStore(EventStore eventStore, EventHandler eventEventHandler, TransactionManager transactionManager) { - super(eventStore, eventEventHandler, transactionManager); + super(new AutoSnapshotingStrategy.NoOpSnapshotingStrategy(), eventStore, eventEventHandler, transactionManager); } @Override diff --git a/thoth-core-akka/src/main/java/fr/maif/akka/eventsourcing/DefaultAggregateStore.java b/thoth-core-akka/src/main/java/fr/maif/akka/eventsourcing/DefaultAggregateStore.java index 15b00325..e249408b 100644 --- a/thoth-core-akka/src/main/java/fr/maif/akka/eventsourcing/DefaultAggregateStore.java +++ b/thoth-core-akka/src/main/java/fr/maif/akka/eventsourcing/DefaultAggregateStore.java @@ -14,12 +14,12 @@ public class DefaultAggregateStore, E extends Event, Meta, Co private final Materializer materializer; - public DefaultAggregateStore(EventStore eventStore, EventHandler eventEventHandler, ActorSystem system, TransactionManager transactionManager) { - this(eventStore, eventEventHandler, Materializer.createMaterializer(system), transactionManager); + public DefaultAggregateStore(AutoSnapshotingStrategy autoSnapshotingStrategy, EventStore eventStore, EventHandler eventEventHandler, ActorSystem system, TransactionManager transactionManager) { + this(autoSnapshotingStrategy, eventStore, eventEventHandler, Materializer.createMaterializer(system), transactionManager); } - public DefaultAggregateStore(EventStore eventStore, EventHandler eventEventHandler, Materializer materializer, TransactionManager transactionManager) { - super(eventStore, eventEventHandler, transactionManager); + public DefaultAggregateStore(AutoSnapshotingStrategy autoSnapshotingStrategy, EventStore eventStore, EventHandler eventEventHandler, Materializer materializer, TransactionManager transactionManager) { + super(autoSnapshotingStrategy, eventStore, eventEventHandler, transactionManager); this.materializer = materializer; } diff --git a/thoth-core-akka/src/test/java/fr/maif/eventsourcing/EventProcessorTest.java b/thoth-core-akka/src/test/java/fr/maif/eventsourcing/EventProcessorTest.java index 732192c3..7f7a0603 100644 --- a/thoth-core-akka/src/test/java/fr/maif/eventsourcing/EventProcessorTest.java +++ b/thoth-core-akka/src/test/java/fr/maif/eventsourcing/EventProcessorTest.java @@ -249,7 +249,7 @@ private EventProcessorImpl( inMemoryEventStore, fakeTransactionManager, - new DefaultAggregateStore<>(inMemoryEventStore, vikingEventHandler, actorSystem, fakeTransactionManager), + new DefaultAggregateStore<>(new AutoSnapshotingStrategy.NoOpSnapshotingStrategy(), inMemoryEventStore, vikingEventHandler, actorSystem, fakeTransactionManager), new VikingCommandHandler(), vikingEventHandler, projections diff --git a/thoth-core-akka/src/test/java/fr/maif/eventsourcing/impl/DefaultAggregateStoreTest.java b/thoth-core-akka/src/test/java/fr/maif/eventsourcing/impl/DefaultAggregateStoreTest.java index 38580712..2e04a057 100644 --- a/thoth-core-akka/src/test/java/fr/maif/eventsourcing/impl/DefaultAggregateStoreTest.java +++ b/thoth-core-akka/src/test/java/fr/maif/eventsourcing/impl/DefaultAggregateStoreTest.java @@ -9,6 +9,7 @@ import fr.maif.Helpers.VikingEvent; import fr.maif.akka.eventsourcing.DefaultAggregateStore; import fr.maif.concurrent.CompletionStages; +import fr.maif.eventsourcing.AutoSnapshotingStrategy; import fr.maif.eventsourcing.EventEnvelope; import fr.maif.eventsourcing.EventProcessorTest; import fr.maif.eventsourcing.EventStore; @@ -65,7 +66,7 @@ class DefaultAggregateStoreTest { void testReloadEventAndBuildAggregateWithoutSnapshots() { EventStore eventStore = mock(EventStore.class); - DefaultAggregateStore aggregateStore = new DefaultAggregateStore<>(eventStore, new Helpers.VikingEventHandler(), actorSystem, new EventProcessorTest.FakeTransactionManager()); + DefaultAggregateStore aggregateStore = new DefaultAggregateStore<>(new AutoSnapshotingStrategy.NoOpSnapshotingStrategy(), eventStore, new Helpers.VikingEventHandler(), actorSystem, new EventProcessorTest.FakeTransactionManager()); Query query = Query.builder().withEntityId(entityId).build(); when(eventStore.loadEventsByQuery(Tuple(), query)).thenReturn(Source.from(List.of(eventEnvelope1, eventEnvelope2)).runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), actorSystem)); @@ -80,7 +81,7 @@ void testReloadEventAndBuildAggregateWithoutSnapshots() { void testReloadEventAndBuildAggregateWithSnapshots() { EventStore eventStore = mock(EventStore.class); - DefaultAggregateStore aggregateStore = spy(new DefaultAggregateStore(eventStore, new Helpers.VikingEventHandler(), actorSystem, new EventProcessorTest.FakeTransactionManager()) { + DefaultAggregateStore aggregateStore = spy(new DefaultAggregateStore(new AutoSnapshotingStrategy.NoOpSnapshotingStrategy(), eventStore, new Helpers.VikingEventHandler(), actorSystem, new EventProcessorTest.FakeTransactionManager()) { @Override public CompletionStage> getSnapshot(Tuple0 transactionContext, String id) { return CompletionStages.successful(Option.some(new Viking(id, "Rollo", 1L))); diff --git a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/DefaultAggregateStore.java b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/DefaultAggregateStore.java index 4f95829a..d28112a8 100644 --- a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/DefaultAggregateStore.java +++ b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/DefaultAggregateStore.java @@ -1,11 +1,6 @@ package fr.maif.reactor.eventsourcing; -import fr.maif.eventsourcing.AggregateStore; -import fr.maif.eventsourcing.Event; -import fr.maif.eventsourcing.EventHandler; -import fr.maif.eventsourcing.EventStore; -import fr.maif.eventsourcing.State; -import fr.maif.eventsourcing.TransactionManager; +import fr.maif.eventsourcing.*; import fr.maif.eventsourcing.impl.AbstractDefaultAggregateStore; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; @@ -16,8 +11,8 @@ public class DefaultAggregateStore, E extends Event, Meta, Context, TxCtx> extends AbstractDefaultAggregateStore implements AggregateStore { - public DefaultAggregateStore(EventStore eventStore, EventHandler eventEventHandler, TransactionManager transactionManager) { - super(eventStore, eventEventHandler, transactionManager); + public DefaultAggregateStore(AutoSnapshotingStrategy autoSnapshotingStrategy, EventStore eventStore, EventHandler eventEventHandler, TransactionManager transactionManager) { + super(autoSnapshotingStrategy, eventStore, eventEventHandler, transactionManager); } @Override diff --git a/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/DefaultAggregateStoreTest.java b/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/DefaultAggregateStoreTest.java index 80ce73f0..40f55e0e 100644 --- a/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/DefaultAggregateStoreTest.java +++ b/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/DefaultAggregateStoreTest.java @@ -4,6 +4,7 @@ import fr.maif.Helpers.Viking; import fr.maif.Helpers.VikingEvent; import fr.maif.concurrent.CompletionStages; +import fr.maif.eventsourcing.AutoSnapshotingStrategy; import fr.maif.eventsourcing.EventEnvelope; import fr.maif.eventsourcing.EventStore; import fr.maif.eventsourcing.EventStore.Query; @@ -64,7 +65,7 @@ class DefaultAggregateStoreTest { void testReloadEventAndBuildAggregateWithoutSnapshots() { EventStore eventStore = mock(EventStore.class); - DefaultAggregateStore aggregateStore = new DefaultAggregateStore<>(eventStore, new Helpers.VikingEventHandler(), new EventProcessorTest.FakeTransactionManager()); + DefaultAggregateStore aggregateStore = new DefaultAggregateStore<>(new AutoSnapshotingStrategy.NoOpSnapshotingStrategy(), eventStore, new Helpers.VikingEventHandler(), new EventProcessorTest.FakeTransactionManager()); Query query = Query.builder().withEntityId(entityId).build(); when(eventStore.loadEventsByQuery(Tuple(), query)).thenReturn(Flux.fromIterable(List.of(eventEnvelope1, eventEnvelope2))); @@ -79,7 +80,7 @@ void testReloadEventAndBuildAggregateWithoutSnapshots() { void testReloadEventAndBuildAggregateWithSnapshots() { EventStore eventStore = mock(EventStore.class); - DefaultAggregateStore aggregateStore = spy(new DefaultAggregateStore(eventStore, new Helpers.VikingEventHandler(), new EventProcessorTest.FakeTransactionManager()) { + DefaultAggregateStore aggregateStore = spy(new DefaultAggregateStore(new AutoSnapshotingStrategy.NoOpSnapshotingStrategy(), eventStore, new Helpers.VikingEventHandler(), new EventProcessorTest.FakeTransactionManager()) { @Override public CompletionStage> getSnapshot(Tuple0 transactionContext, String id) { return CompletionStages.successful(Option.some(new Viking(id, "Rollo", 30, 1L))); diff --git a/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/EventProcessorTest.java b/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/EventProcessorTest.java index 38c4baaa..6e0c6e89 100644 --- a/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/EventProcessorTest.java +++ b/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/EventProcessorTest.java @@ -1,13 +1,6 @@ package fr.maif.reactor.eventsourcing; -import fr.maif.eventsourcing.AggregateStore; -import fr.maif.eventsourcing.CommandHandler; -import fr.maif.eventsourcing.EventEnvelope; -import fr.maif.eventsourcing.EventProcessorImpl; -import fr.maif.eventsourcing.Events; -import fr.maif.eventsourcing.ProcessingSuccess; -import fr.maif.eventsourcing.Projection; -import fr.maif.eventsourcing.TransactionManager; +import fr.maif.eventsourcing.*; import fr.maif.reactor.eventsourcing.DefaultAggregateStore; import fr.maif.reactor.eventsourcing.InMemoryEventStore; import io.vavr.API; @@ -283,7 +276,7 @@ private EventProcessorImpl( inMemoryEventStore, fakeTransactionManager, - new DefaultAggregateStore<>(inMemoryEventStore, vikingEventHandler, fakeTransactionManager), + new DefaultAggregateStore<>(new AutoSnapshotingStrategy.NoOpSnapshotingStrategy(), inMemoryEventStore, vikingEventHandler, fakeTransactionManager), new VikingCommandHandler(), vikingEventHandler, projections diff --git a/thoth-core/src/main/java/fr/maif/eventsourcing/AutoSnapshoting.java b/thoth-core/src/main/java/fr/maif/eventsourcing/AutoSnapshoting.java new file mode 100644 index 00000000..3fb49cac --- /dev/null +++ b/thoth-core/src/main/java/fr/maif/eventsourcing/AutoSnapshoting.java @@ -0,0 +1,7 @@ +package fr.maif.eventsourcing; + +public interface AutoSnapshoting { + + + +} diff --git a/thoth-core/src/main/java/fr/maif/eventsourcing/AutoSnapshotingStrategy.java b/thoth-core/src/main/java/fr/maif/eventsourcing/AutoSnapshotingStrategy.java new file mode 100644 index 00000000..9bc8574f --- /dev/null +++ b/thoth-core/src/main/java/fr/maif/eventsourcing/AutoSnapshotingStrategy.java @@ -0,0 +1,20 @@ +package fr.maif.eventsourcing; + +public interface AutoSnapshotingStrategy { + boolean shouldSnapshot(Integer eventCount); + + + record NoOpSnapshotingStrategy() implements AutoSnapshotingStrategy { + @Override + public boolean shouldSnapshot(Integer eventCount) { + return false; + } + } + + record CountSnapshotingStrategy(int count) implements AutoSnapshotingStrategy { + @Override + public boolean shouldSnapshot(Integer eventCount) { + return eventCount > count; + } + } +} diff --git a/thoth-core/src/main/java/fr/maif/eventsourcing/impl/AbstractDefaultAggregateStore.java b/thoth-core/src/main/java/fr/maif/eventsourcing/impl/AbstractDefaultAggregateStore.java index 3f019f1a..59aa0267 100644 --- a/thoth-core/src/main/java/fr/maif/eventsourcing/impl/AbstractDefaultAggregateStore.java +++ b/thoth-core/src/main/java/fr/maif/eventsourcing/impl/AbstractDefaultAggregateStore.java @@ -1,25 +1,23 @@ package fr.maif.eventsourcing.impl; -import fr.maif.eventsourcing.AggregateStore; -import fr.maif.eventsourcing.Event; -import fr.maif.eventsourcing.EventEnvelope; -import fr.maif.eventsourcing.EventHandler; -import fr.maif.eventsourcing.EventStore; -import fr.maif.eventsourcing.State; -import fr.maif.eventsourcing.TransactionManager; +import fr.maif.eventsourcing.*; import io.vavr.control.Option; import org.reactivestreams.Publisher; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; public abstract class AbstractDefaultAggregateStore, E extends Event, Meta, Context, TxCtx> implements AggregateStore { + private final AutoSnapshotingStrategy autoSnapshotingStrategy; private final EventStore eventStore; private final EventHandler eventEventHandler; private final TransactionManager transactionManager; - public AbstractDefaultAggregateStore(EventStore eventStore, EventHandler eventEventHandler, TransactionManager transactionManager) { + public AbstractDefaultAggregateStore(AutoSnapshotingStrategy autoSnapshotingStrategy, EventStore eventStore, EventHandler eventEventHandler, TransactionManager transactionManager) { + this.autoSnapshotingStrategy = autoSnapshotingStrategy; this.eventStore = eventStore; this.eventEventHandler = eventEventHandler; this.transactionManager = transactionManager; @@ -42,13 +40,22 @@ public CompletionStage> getAggregate(TxCtx ctx, String entityId) { // If a snapshot is defined, we read events from seq num of the snapshot : s -> EventStore.Query.builder().withSequenceFrom(s.sequenceNum()).withEntityId(entityId).build() ); - + AtomicInteger eventCount = new AtomicInteger(0); return fold(this.eventStore.loadEventsByQuery(ctx, query), mayBeSnapshot, - (Option mayBeState, EventEnvelope event) -> - this.eventEventHandler.applyEvent(mayBeState, event.event) - .map((S state) -> (S) state.withSequenceNum(event.sequenceNum)) - ); + (Option mayBeState, EventEnvelope event) -> { + eventCount.incrementAndGet(); + return this.eventEventHandler.applyEvent(mayBeState, event.event) + .map((S state) -> (S) state.withSequenceNum(event.sequenceNum)); + } + ).thenCompose(mayBeAggregate -> { + if (autoSnapshotingStrategy.shouldSnapshot(eventCount.get())) { + return this.storeSnapshot(ctx, entityId, mayBeAggregate) + .thenApply(__ -> mayBeAggregate) + .exceptionally(e -> mayBeAggregate); + } + return CompletableFuture.completedStage(mayBeAggregate); + }); }); } diff --git a/thoth-jooq-akka/src/main/java/fr/maif/eventsourcing/ReactivePostgresKafkaEventProcessor.java b/thoth-jooq-akka/src/main/java/fr/maif/eventsourcing/ReactivePostgresKafkaEventProcessor.java index f19130c3..0f1ccc63 100644 --- a/thoth-jooq-akka/src/main/java/fr/maif/eventsourcing/ReactivePostgresKafkaEventProcessor.java +++ b/thoth-jooq-akka/src/main/java/fr/maif/eventsourcing/ReactivePostgresKafkaEventProcessor.java @@ -80,7 +80,7 @@ public PostgresKafkaEventProcessorConfig( metaFormat, contextFormat ); - this.aggregateStore = aggregateStore == null ? new DefaultAggregateStore<>(this.eventStore, eventHandler, system, transactionManager) : aggregateStore; + this.aggregateStore = aggregateStore == null ? new DefaultAggregateStore<>(new AutoSnapshotingStrategy.NoOpSnapshotingStrategy(), this.eventStore, eventHandler, system, transactionManager) : aggregateStore; this.commandHandler = commandHandler; this.eventHandler = eventHandler; this.projections = projections; diff --git a/thoth-jooq-akka/src/main/java/fr/maif/eventsourcing/ReactivePostgresKafkaEventProcessorBuilder.java b/thoth-jooq-akka/src/main/java/fr/maif/eventsourcing/ReactivePostgresKafkaEventProcessorBuilder.java index e8e0dd9b..2ade2f98 100644 --- a/thoth-jooq-akka/src/main/java/fr/maif/eventsourcing/ReactivePostgresKafkaEventProcessorBuilder.java +++ b/thoth-jooq-akka/src/main/java/fr/maif/eventsourcing/ReactivePostgresKafkaEventProcessorBuilder.java @@ -366,6 +366,14 @@ public BuilderWithAggregateStore withAggregateStore(Aggrega } public BuilderWithAggregateStore withDefaultAggregateStore() { + return withDefaultAggregateStore(new AutoSnapshotingStrategy.NoOpSnapshotingStrategy()); + } + + public BuilderWithAggregateStore withDefaultAggregateStore(int snapshotEveryEventCount) { + return withDefaultAggregateStore(new AutoSnapshotingStrategy.CountSnapshotingStrategy(snapshotEveryEventCount)); + } + + public BuilderWithAggregateStore withDefaultAggregateStore(AutoSnapshotingStrategy autoSnapshotingStrategy) { return new BuilderWithAggregateStore<>( system, pgAsyncPool, @@ -378,7 +386,7 @@ public BuilderWithAggregateStore withDefaultAggregateStore( concurrentReplayStrategy, eventStore, eventHandler, - new DefaultAggregateStore<>(eventStore, eventHandler, system, transactionManager)); + new DefaultAggregateStore<>(autoSnapshotingStrategy, eventStore, eventHandler, system, transactionManager)); } } diff --git a/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactivePostgresKafkaEventProcessorBuilder.java b/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactivePostgresKafkaEventProcessorBuilder.java index 6de716cb..31df4fc2 100644 --- a/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactivePostgresKafkaEventProcessorBuilder.java +++ b/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactivePostgresKafkaEventProcessorBuilder.java @@ -338,6 +338,14 @@ public BuilderWithAggregateStore withAggregateStore(Aggrega } public BuilderWithAggregateStore withDefaultAggregateStore() { + return withDefaultAggregateStore(new AutoSnapshotingStrategy.NoOpSnapshotingStrategy()); + } + + public BuilderWithAggregateStore withDefaultAggregateStore(int snapshotEveryEventCount) { + return withDefaultAggregateStore(new AutoSnapshotingStrategy.CountSnapshotingStrategy(snapshotEveryEventCount)); + } + + public BuilderWithAggregateStore withDefaultAggregateStore(AutoSnapshotingStrategy autoSnapshotingStrategy) { return new BuilderWithAggregateStore<>( pgAsyncPool, tableNames, @@ -349,7 +357,7 @@ public BuilderWithAggregateStore withDefaultAggregateStore( concurrentReplayStrategy, eventStore, eventHandler, - new DefaultAggregateStore<>(eventStore, eventHandler, transactionManager)); + new DefaultAggregateStore<>(autoSnapshotingStrategy, eventStore, eventHandler, transactionManager)); } } diff --git a/thoth-jooq/src/main/java/fr/maif/eventsourcing/PostgresKafkaEventProcessor.java b/thoth-jooq/src/main/java/fr/maif/eventsourcing/PostgresKafkaEventProcessor.java index 1ea7496c..77af5b45 100644 --- a/thoth-jooq/src/main/java/fr/maif/eventsourcing/PostgresKafkaEventProcessor.java +++ b/thoth-jooq/src/main/java/fr/maif/eventsourcing/PostgresKafkaEventProcessor.java @@ -81,7 +81,7 @@ public PostgresKafkaEventProcessorConfig( metaFormat, contextFormat ); - this.aggregateStore = aggregateStore == null ? new DefaultAggregateStore<>(this.eventStore, eventHandler, transactionManager) : aggregateStore; + this.aggregateStore = aggregateStore == null ? new DefaultAggregateStore<>(new AutoSnapshotingStrategy.NoOpSnapshotingStrategy(), this.eventStore, eventHandler, transactionManager) : aggregateStore; this.commandHandler = commandHandler; this.eventHandler = eventHandler; this.projections = projections; @@ -154,7 +154,7 @@ public PostgresKafkaEventProcessorConfig( this.eventHandler = eventHandler; this.projections = projections; this.eventPublisher = eventPublisher; - this.aggregateStore = new DefaultAggregateStore<>(this.eventStore, eventHandler, transactionManager); + this.aggregateStore = new DefaultAggregateStore<>(new AutoSnapshotingStrategy.NoOpSnapshotingStrategy(), this.eventStore, eventHandler, transactionManager); } } } diff --git a/thoth-jooq/src/main/java/fr/maif/eventsourcing/PostgresKafkaEventProcessorBuilder.java b/thoth-jooq/src/main/java/fr/maif/eventsourcing/PostgresKafkaEventProcessorBuilder.java index 5a216503..106d92e5 100644 --- a/thoth-jooq/src/main/java/fr/maif/eventsourcing/PostgresKafkaEventProcessorBuilder.java +++ b/thoth-jooq/src/main/java/fr/maif/eventsourcing/PostgresKafkaEventProcessorBuilder.java @@ -370,6 +370,14 @@ public BuilderWithAggregateStore withAggregateStore(Aggrega } public BuilderWithAggregateStore withDefaultAggregateStore() { + return withDefaultAggregateStore(new AutoSnapshotingStrategy.NoOpSnapshotingStrategy()); + } + + public BuilderWithAggregateStore withDefaultAggregateStore(int snapshotEveryEventCount) { + return withDefaultAggregateStore(new AutoSnapshotingStrategy.CountSnapshotingStrategy(snapshotEveryEventCount)); + } + + public BuilderWithAggregateStore withDefaultAggregateStore(AutoSnapshotingStrategy autoSnapshotingStrategy) { return new BuilderWithAggregateStore<>( dataSource, @@ -382,7 +390,7 @@ public BuilderWithAggregateStore withDefaultAggregateStore( concurrentReplayStrategy, eventStore, eventHandler, - new DefaultAggregateStore<>(eventStore, eventHandler, transactionManager)); + new DefaultAggregateStore<>(autoSnapshotingStrategy, eventStore, eventHandler, transactionManager)); } } diff --git a/thoth-tck/src/test/java/fr/maif/eventsourcing/datastore/InMemoryDataStoreTest.java b/thoth-tck/src/test/java/fr/maif/eventsourcing/datastore/InMemoryDataStoreTest.java index e8bdce95..ea0b6948 100644 --- a/thoth-tck/src/test/java/fr/maif/eventsourcing/datastore/InMemoryDataStoreTest.java +++ b/thoth-tck/src/test/java/fr/maif/eventsourcing/datastore/InMemoryDataStoreTest.java @@ -7,15 +7,12 @@ import akka.stream.javadsl.Source; import fr.maif.akka.eventsourcing.DefaultAggregateStore; +import fr.maif.eventsourcing.*; import org.mockito.Mockito; import org.testng.annotations.BeforeMethod; import akka.actor.ActorSystem; import akka.stream.javadsl.Sink; -import fr.maif.eventsourcing.EventEnvelope; -import fr.maif.eventsourcing.EventProcessorImpl; -import fr.maif.eventsourcing.EventStore; -import fr.maif.eventsourcing.TransactionManager; import fr.maif.akka.eventsourcing.InMemoryEventStore; import io.vavr.Tuple; import io.vavr.Tuple0; @@ -33,7 +30,7 @@ public void init() { this.eventProcessor = new EventProcessorImpl<>( eventStore, transactionManager, - new DefaultAggregateStore<>(eventStore, eventHandler, actorSystem, transactionManager), + new DefaultAggregateStore<>(new AutoSnapshotingStrategy.NoOpSnapshotingStrategy(), eventStore, eventHandler, actorSystem, transactionManager), new TestCommandHandler<>(), eventHandler, io.vavr.collection.List.empty()