Skip to content

Commit

Permalink
Added removeN() and peekAtOffset() methods on BigQueue to allow entri…
Browse files Browse the repository at this point in the history
…es to be examined without commiting the removal.
  • Loading branch information
Denis Dowling authored and Denis Dowling committed Dec 21, 2016
1 parent 7bf0027 commit d704111
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 13 deletions.
42 changes: 32 additions & 10 deletions src/main/java/com/leansoft/bigqueue/BigQueueImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,8 @@ public byte[] dequeue() throws IOException {

// Set queueFrontIndex to current queue front and THEN increment
long queueFrontIndex = this.incrementQueueFrontIndex();
byte[] data = this.innerArray.get(queueFrontIndex);

return data;
return this.innerArray.get(queueFrontIndex);
} finally {
queueFrontWriteLock.unlock();
}
Expand All @@ -127,13 +126,42 @@ public void removeAll() throws IOException {
}
}

@Override
public void removeN(long n) throws IOException {
try {
queueFrontWriteLock.lock();

if (n < 0)
return;

long p = getQueueFrontIndex() + n;
long head = this.innerArray.getHeadIndex();
if (p > head)
p = head;

this.setQueueFrontIndex(p);

} finally {
queueFrontWriteLock.unlock();
}
}

@Override
public byte[] peek() throws IOException {
if (this.isEmpty()) {
return null;
}
byte[] data = this.innerArray.get(getQueueFrontIndex());
return data;

return this.innerArray.get(getQueueFrontIndex());
}

@Override
public byte[] peekAtOffset(long offset) throws IOException {
long p = getQueueFrontIndex() + offset;
if (offset < 0 || p >= this.innerArray.getHeadIndex())
return null;
else
return this.innerArray.get(p);
}

private final static int QUEUE_FRONT_POSITION = 0;
Expand All @@ -160,12 +188,6 @@ public ListenableFuture<byte[]> peekAsync() {
return peekFuture;
}

/**
* apply an implementation of a ItemIterator interface for each queue item
*
* @param iterator
* @throws IOException
*/
@Override
public void applyForEach(ItemIterator iterator) throws IOException {
try {
Expand Down
24 changes: 22 additions & 2 deletions src/main/java/com/leansoft/bigqueue/IBigQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,19 @@ public interface IBigQueue extends Closeable {
/**
* Removes all items of a queue, this will empty the queue and delete all back data files.
*
* @throws IOException exception throws if there is any IO error during dequeue operation.
* @throws IOException exception throws if there is any IO error during remove operation.
*/
public void removeAll() throws IOException;


/**
* Remove n items from a queue in one operation. This operation is useful if the queue is being examined using the
* peek() or peekAtOffset() methods and a batch of previously examined entries needs to be removed.
*
* @param n items to remove
* @throws IOException if there is any IO error during the remove operation.
*/
public void removeN(long n) throws IOException;

/**
* Retrieves the item at the front of a queue
*
Expand All @@ -62,6 +71,17 @@ public interface IBigQueue extends Closeable {
*/
public byte[] peek() throws IOException;

/**
* Retrieves the item at an offset along a queue. This method is used together with removeN to implement
* a simple transaction model on a queue where a number of entries can be sequentially examined on the queue
* using peekAtOffset() and then the actual dequeue can be committed by calling removeN() or rolled back by
* resetting the offset back to 0.
*
* @param offset along the queue to examine
* @return data from the queue at the given offset or null if the offset is outside the queue bounds
* @throws IOException exception throws if there is any IO error during peek operation.
*/
public byte[] peekAtOffset(long offset) throws IOException;

/**
* Retrieves the item at the front of a queue asynchronously.
Expand Down
38 changes: 37 additions & 1 deletion src/test/java/com/leansoft/bigqueue/BigQueueUnitTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,44 @@ public void run() {
}
*/

@Test
public void peekAtOffset() throws IOException {
bigQueue = new BigQueueImpl(testDir, "peek_test");
assertNotNull(bigQueue);

int loop = 1000;
for(int i = 0; i < loop; i++)
bigQueue.enqueue(("" + i).getBytes());

assertTrue(bigQueue.size() == loop);

byte[] data = bigQueue.peek();
assertEquals("0", new String(data));

for(int i = 0; i < loop; i++) {
data = bigQueue.peekAtOffset(i);
assertEquals("" + i, new String(data));
}

int remove_size = 100;
for (int i = 0; i < 10; i++) {
bigQueue.removeN(remove_size);
assertTrue(bigQueue.size() == loop - remove_size * (i+1));

for (int j = 0; j < bigQueue.size(); j++) {
data = bigQueue.peekAtOffset(j);
assertEquals("" + (j + (i+1) * remove_size), new String(data));
}
}
assertTrue(bigQueue.isEmpty());
assertTrue(bigQueue.peek() == null);
assertTrue(bigQueue.peekAtOffset(100) == null);

bigQueue.gc();
bigQueue.close();
}

@After
@After
public void clean() throws IOException {
if (bigQueue != null) {
bigQueue.removeAll();
Expand Down

0 comments on commit d704111

Please sign in to comment.