Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[LOGBACK-1436] Add counter for tracking dropped and discarded events #905

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

/**
* This appender and derived classes, log events asynchronously. In order to
Expand All @@ -45,6 +46,9 @@
*/
public class AsyncAppenderBase<E> extends UnsynchronizedAppenderBase<E> implements AppenderAttachable<E> {

private final AtomicInteger droppedEvents = new AtomicInteger(0);
private final AtomicInteger discardedEvents = new AtomicInteger(0);

AppenderAttachableImpl<E> aai = new AppenderAttachableImpl<E>();
BlockingQueue<E> blockingQueue;

Expand Down Expand Up @@ -160,6 +164,7 @@ public void stop() {
@Override
protected void append(E eventObject) {
if (isQueueBelowDiscardingThreshold() && isDiscardable(eventObject)) {
discardedEvents.incrementAndGet();
return;
}
preprocess(eventObject);
Expand All @@ -172,7 +177,10 @@ public boolean isQueueBelowDiscardingThreshold() {

private void put(E eventObject) {
if (neverBlock) {
blockingQueue.offer(eventObject);
boolean inserted = blockingQueue.offer(eventObject);
if (!inserted) {
droppedEvents.incrementAndGet();
}
} else {
putUninterruptibly(eventObject);
}
Expand All @@ -196,6 +204,27 @@ private void putUninterruptibly(E eventObject) {
}
}

/**
* Returns the total number of discarded events due to reaching the discardingThreshold,
* since the creation of this appender.
*
* @return number of discarded events.
*/
public int getDiscardedEvents() {
return discardedEvents.get();
}

/**
* Returns the total number of dropped events since the creation of this appender.
* This can only return a non-zero value if the appender has been configured with
* {@link #setNeverBlock(boolean)} set to true and exceeding the queue size.
*
* @return number of dropped events.
*/
public int getDroppedEvents() {
return droppedEvents.get();
}

public int getQueueSize() {
return queueSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class AsyncAppenderBaseTest {
AsyncAppenderBase<Integer> asyncAppenderBase = new AsyncAppenderBase<Integer>();
LossyAsyncAppender lossyAsyncAppender = new LossyAsyncAppender();
DelayingListAppender<Integer> delayingListAppender = new DelayingListAppender<Integer>();
BlockingListAppender<Integer> blockingListAppender = new BlockingListAppender<Integer>();
DiscardingAsyncAppender<Integer> discardingAsyncAppender = new DiscardingAsyncAppender<Integer>();
ListAppender<Integer> listAppender = new ListAppender<Integer>();
OnConsoleStatusListener onConsoleStatusListener = new OnConsoleStatusListener();
StatusChecker statusChecker = new StatusChecker(context);
Expand All @@ -51,6 +53,7 @@ public void setUp() {

asyncAppenderBase.setContext(context);
lossyAsyncAppender.setContext(context);
discardingAsyncAppender.setContext(context);

listAppender.setContext(context);
listAppender.setName("list");
Expand All @@ -59,6 +62,10 @@ public void setUp() {
delayingListAppender.setContext(context);
delayingListAppender.setName("list");
delayingListAppender.start();

blockingListAppender.setContext(context);
blockingListAppender.setName("blocking list");
blockingListAppender.start();
}

@Test
Expand Down Expand Up @@ -301,6 +308,51 @@ public void verifyInterruptionOfWorkerIsSwallowed() {
Assertions.assertFalse(asyncAppenderBase.worker.isInterrupted());
}

@Test
public void verifyDiscardedEventsCounter() throws InterruptedException {
int bufferSize = 5;
int discardingThreshold = 3;
// One event will have been removed by the consuming thread.
int expectedDiscardedEvents = bufferSize - 1 - discardingThreshold;
discardingAsyncAppender.addAppender(blockingListAppender);
discardingAsyncAppender.setQueueSize(bufferSize);
discardingAsyncAppender.setDiscardingThreshold(discardingThreshold);
discardingAsyncAppender.start();

for (int i = 0; i < bufferSize; i++) {
discardingAsyncAppender.doAppend(i);
// Make sure the consuming thread is in a blocked state.
blockingListAppender.waitForAppend();
}
blockingListAppender.unblock();
discardingAsyncAppender.stop();

Assertions.assertEquals(expectedDiscardedEvents, discardingAsyncAppender.getDiscardedEvents());
}

@Test
public void verifyDroppedEventsCounter() throws InterruptedException {
int bufferSize = 5;
int loopLen = bufferSize * 2;
// One event will have been removed by the consuming thread.
int expectedDroppedEvents = bufferSize - 1;
asyncAppenderBase.setMaxFlushTime(1);
asyncAppenderBase.addAppender(blockingListAppender);
asyncAppenderBase.setQueueSize(bufferSize);
asyncAppenderBase.setNeverBlock(true);
asyncAppenderBase.start();

for (int i = 0; i < loopLen; i++) {
asyncAppenderBase.doAppend(i);
// Make sure the consuming thread is in a blocked state.
blockingListAppender.waitForAppend();
}
blockingListAppender.unblock();
asyncAppenderBase.stop();

Assertions.assertEquals(expectedDroppedEvents, asyncAppenderBase.getDroppedEvents());
}

private void verify(ListAppender<Integer> la, int atLeast) {
// ListAppender passes as parameter should be stopped at this stage
Assertions.assertFalse(la.isStarted());
Expand All @@ -309,6 +361,38 @@ private void verify(ListAppender<Integer> la, int atLeast) {
statusChecker.assertContainsMatch("Worker thread will flush remaining events before exiting.");
}

static class BlockingListAppender<E> extends ListAppender<E> {

private final CountDownLatch latch = new CountDownLatch(1);
private final CountDownLatch firstAppend = new CountDownLatch(1);

public void unblock() {
latch.countDown();
}

public void waitForAppend() throws InterruptedException {
firstAppend.await();
}

@Override
protected void append(E e) {
try {
firstAppend.countDown();
latch.await();
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
super.append(e);
}
}

static class DiscardingAsyncAppender<I extends Number> extends AsyncAppenderBase<Integer> {
@Override
protected boolean isDiscardable(Integer i) {
return true;
}
}

static class LossyAsyncAppender extends AsyncAppenderBase<Integer> {
@Override
protected boolean isDiscardable(Integer i) {
Expand Down