Skip to content

Commit

Permalink
ArrayBlockingQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
wanwanpp committed Apr 9, 2018
1 parent ca5830b commit 1d82eed
Show file tree
Hide file tree
Showing 3 changed files with 290 additions and 227 deletions.
27 changes: 27 additions & 0 deletions src/java/util/concurrent/ArrayBlockingQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public class ArrayBlockingQueue<E> extends AbstractQueue<E>
/**
* Circularly increment i.
*/
//循环自增
final int inc(int i) {
return (++i == items.length) ? 0 : i;
}
Expand Down Expand Up @@ -168,6 +169,7 @@ private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
//唤醒因空无法取出元素而阻塞的条件队列中的一个线程
notEmpty.signal();
}

Expand All @@ -181,6 +183,7 @@ private E extract() {
items[takeIndex] = null;
takeIndex = inc(takeIndex);
--count;
//唤醒因队列满了而阻塞的线程
notFull.signal();
return x;
}
Expand Down Expand Up @@ -307,14 +310,22 @@ public boolean add(E e) {
*
* @throws NullPointerException if the specified element is null
*/
/**
* 1. 加锁
* 2. 若元素队列已经满了,返回false
* 3. 没满咋插入元素,返回true
* 4. 释放锁
*/
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
//若容量达到上限了
if (count == items.length)
return false;
else {
//插入元素
insert(e);
return true;
}
Expand All @@ -330,6 +341,7 @@ public boolean offer(E e) {
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
//阻塞直到完成元素的插入
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
Expand All @@ -351,19 +363,30 @@ public void put(E e) throws InterruptedException {
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
/**
* 1. 加上可中断锁
* 2. 判断是否队列已满,若满了,就超时等待
* 3. 若超时就返回false
* 4. 若队列有空位置了且未超时,就添加元素,返回true
* 5. 释放锁
*/
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {

checkNotNull(e);
//转为纳秒
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
//可中断锁 线程被中断后,会抛出InterruptedException
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
//这里会更新nanos的值 进入awaitNanos后,可能被唤醒或者超时。
nanos = notFull.awaitNanos(nanos);
}
//插入元素
insert(e);
return true;
} finally {
Expand All @@ -381,6 +404,7 @@ public E poll() {
}
}

//阻塞直到取出了元素
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
Expand All @@ -393,6 +417,9 @@ public E take() throws InterruptedException {
}
}

/**
* 和offer(Long,TimeUtil)的逻辑类似
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
Expand Down
1 change: 0 additions & 1 deletion src/java/util/concurrent/LinkedBlockingQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ static class Node<E> {
/**
* The capacity bound, or Integer.MAX_VALUE if none
*/
//����
private final int capacity;

/**
Expand Down
Loading

0 comments on commit 1d82eed

Please sign in to comment.