Skip to content

Commit

Permalink
Problem with queue (#64)
Browse files Browse the repository at this point in the history
  • Loading branch information
larousso authored Jan 31, 2024
1 parent 24606de commit 146b613
Showing 1 changed file with 28 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Objects;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;

Expand All @@ -41,8 +42,12 @@ public class ReactorKafkaEventPublisher<E extends Event, Meta, Context> implemen

private AtomicBoolean stop = new AtomicBoolean(false);
private final String topic;
private final Sinks.Many<EventEnvelope<E, Meta, Context>> queue;
private final Flux<EventEnvelope<E, Meta, Context>> eventSource;
private final Integer queueBufferSize;

private final AtomicReference<Sinks.Many<EventEnvelope<E, Meta, Context>>> queue = new AtomicReference<>();
// private final Sinks.Many<EventEnvelope<E, Meta, Context>> queue;
private final AtomicReference<Flux<EventEnvelope<E, Meta, Context>>> eventSource = new AtomicReference<>();
// private final Flux<EventEnvelope<E, Meta, Context>> eventSource;
private final SenderOptions<String, EventEnvelope<E, Meta, Context>> senderOptions;
private final Duration restartInterval;
private final Duration maxRestartInterval;
Expand All @@ -59,16 +64,27 @@ public ReactorKafkaEventPublisher(SenderOptions<String, EventEnvelope<E, Meta, C

public ReactorKafkaEventPublisher(SenderOptions<String, EventEnvelope<E, Meta, Context>> senderOptions, String topic, Integer queueBufferSize, Duration restartInterval, Duration maxRestartInterval) {
this.topic = topic;
int queueBufferSize1 = queueBufferSize == null ? 10000 : queueBufferSize;
this.queueBufferSize = queueBufferSize == null ? 10000 : queueBufferSize;
this.restartInterval = restartInterval == null ? Duration.of(1, ChronoUnit.SECONDS) : restartInterval;
this.maxRestartInterval = maxRestartInterval == null ? Duration.of(1, ChronoUnit.MINUTES) : maxRestartInterval;

this.queue = Sinks.many().replay().limit(queueBufferSize1); // .multicast().onBackpressureBuffer(queueBufferSize1);
this.eventSource = queue.asFlux();
this.senderOptions = senderOptions;


// this.queue = Sinks.many().multicast().onBackpressureBuffer(queueBufferSize1, true); //replay().limit(queueBufferSize1); // .multicast().onBackpressureBuffer(queueBufferSize1);
// this.eventSource = queue.asFlux();
reinitQueue();
this.senderOptions = senderOptions.stopOnError(true);
this.kafkaSender = KafkaSender.create(senderOptions);
}

private void reinitQueue() {
if (this.queue.get() != null) {
this.queue.get().tryEmitComplete();
}
this.queue.set(Sinks.many().unicast().onBackpressureBuffer()); //replay().limit(queueBufferSize1); // .multicast().onBackpressureBuffer(queueBufferSize1);
this.eventSource.set(queue.get().asFlux());
}

record CountAndMaxSeqNum(Long count, Long lastSeqNum) {
static CountAndMaxSeqNum empty() {
return new CountAndMaxSeqNum(0L, 0L);
Expand Down Expand Up @@ -124,8 +140,7 @@ public <TxCtx> void start(EventStore<TxCtx, E, Meta, Context> eventStore, Concur
.concatMap(countAndLastSeqNum -> {
// Flux.defer(() -> {
LOGGER.debug("Starting consuming in memory queue for {}. Event lower than {} are ignored", topic, countAndLastSeqNum.lastSeqNum);
//return reactorQueue.asFlux()
return eventSource
return eventSource.get()
.filter(e -> e.sequenceNum > countAndLastSeqNum.lastSeqNum)
.transform(publishToKafka(
eventStore,
Expand All @@ -134,7 +149,10 @@ public <TxCtx> void start(EventStore<TxCtx, E, Meta, Context> eventStore, Concur
bufferTimeout(200, Duration.ofMillis(20))
));
})
.doOnError(e -> LOGGER.error("Error publishing events to kafka", e))
.doOnError(e -> {
reinitQueue();
LOGGER.error("Error publishing events to kafka", e);
})
.retryWhen(Retry.backoff(Long.MAX_VALUE, restartInterval)
.transientErrors(true)
.maxBackoff(maxRestartInterval)
Expand Down Expand Up @@ -179,7 +197,7 @@ public CompletionStage<Tuple0> publish(List<EventEnvelope<E, Meta, Context>> eve
return Flux
.fromIterable(events)
.map(t -> {
queue.tryEmitNext(t).orThrow();
queue.get().tryEmitNext(t).orThrow();
return Tuple.empty();
})
.retryWhen(Retry.fixedDelay(50, Duration.ofMillis(1))
Expand Down

0 comments on commit 146b613

Please sign in to comment.