From 0f3e0e17763cd248c4f78d6f0170d600dd7ee42c Mon Sep 17 00:00:00 2001 From: wxm <115806199+youfanx@users.noreply.github.com> Date: Tue, 21 May 2024 18:01:03 +0800 Subject: [PATCH] Create CircularBlockingQueue.java --- .../org/rx/bean/CircularBlockingQueue.java | 82 +++++++++++++++++++ 1 file changed, 82 insertions(+) create mode 100644 rxlib/src/main/java/org/rx/bean/CircularBlockingQueue.java diff --git a/rxlib/src/main/java/org/rx/bean/CircularBlockingQueue.java b/rxlib/src/main/java/org/rx/bean/CircularBlockingQueue.java new file mode 100644 index 00000000..ec041eb8 --- /dev/null +++ b/rxlib/src/main/java/org/rx/bean/CircularBlockingQueue.java @@ -0,0 +1,82 @@ +package org.rx.bean; + +import lombok.Getter; +import lombok.Setter; +import org.rx.core.*; +import org.rx.util.function.TripleFunc; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.locks.ReentrantLock; + +import static org.rx.core.Extends.ifNull; + +public class CircularBlockingQueue extends LinkedBlockingQueue implements EventPublisher> { + private static final long serialVersionUID = 4685018531330571106L; + public final Delegate, NEventArgs> onConsume = Delegate.create(); + public TripleFunc, T, Boolean> onFull; + final ReentrantLock pLock = Reflects.readField(this, "putLock"); + TimeoutFuture consumeTimer; + @Getter + long consumePeriod; + + public synchronized void setConsumePeriod(long consumePeriod) { + if ((this.consumePeriod = consumePeriod) > 0) { + if (consumeTimer != null) { + consumeTimer.cancel(); + } + consumeTimer = Tasks.timer().setTimeout(() -> { + T t; + NEventArgs e = new NEventArgs<>(); + while ((t = poll()) != null) { + e.setValue(t); + raiseEvent(onConsume, e); + } + }, d -> consumePeriod, null, Constants.TIMER_PERIOD_FLAG); + } else { + if (consumeTimer != null) { + consumeTimer.cancel(); + } + } + } + + public CircularBlockingQueue(int capacity) { + this(capacity, null); + onFull = (q, t) -> { + pLock.lock(); + try { + boolean ok; + do { + q.poll(); + ok = q.innerOffer(t); + } + while (!ok); + return true; + } finally { + pLock.unlock(); + } + }; + } + + public CircularBlockingQueue(int capacity, TripleFunc, T, Boolean> onFull) { + super(capacity); + this.onFull = onFull; + } + +// @Override +// public boolean add(T t) { +// return offer(t); +// } + + @Override + public boolean offer(T t) { + boolean r = super.offer(t); + if (!r && onFull != null) { + return ifNull(onFull.apply(this, t), false); + } + return r; + } + + protected boolean innerOffer(T t) { + return super.offer(t); + } +}