Skip to content

Commit

Permalink
ThreadPoolExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
wanwanpp committed Mar 10, 2018
1 parent 71d8a4a commit 0f7dd2c
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 15 deletions.
1 change: 1 addition & 0 deletions jdkTest/src/com/jdk/test/TestThreadPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public void testState() {
final int STOP = 1 << COUNT_BITS;
final int TIDYING = 2 << COUNT_BITS;
final int TERMINATED = 3 << COUNT_BITS;
System.out.println("-1的二进制"+Integer.toBinaryString(-1));
System.out.println("running: " + Integer.toBinaryString(RUNNING));
System.out.println("SHUTDOWN: " + Integer.toBinaryString(SHUTDOWN));
System.out.println("STOP: " + Integer.toBinaryString(STOP));
Expand Down
6 changes: 3 additions & 3 deletions src/java/lang/ThreadLocal.java
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ T childValue(T parentValue) {
* the table starts running out of space.
*/
//静态内部类
//就是一个Entry数组,不想HashMap使用了数组加链表
//就是一个Entry数组,不像HashMap使用了数组加链表
//这里哈希冲突时用的线性探测法,不是链地址法
static class ThreadLocalMap {

Expand All @@ -281,9 +281,9 @@ static class ThreadLocalMap {
* entry can be expunged from table. Such entries are referred to
* as "stale entries" in the code that follows.
*/
//继承自虚引用,当垃圾回收时引用所指的对象会被GC
//继承自弱引用,当垃圾回收时引用所指的对象会被GC
//Entry类继承了WeakReference<ThreadLocal<?>>,即每个Entry对象都有一个ThreadLocal的弱引用(作为key),
// 这是为了防止内存泄露。一旦线程结束,key变为一个不可达的对象,这个Entry就可以被GC了
//这是为了防止内存泄露。一旦线程结束,key变为一个不可达的对象,这个Entry就可以被GC了
static class Entry extends WeakReference<ThreadLocal> {
/**
* The value associated with this ThreadLocal.
Expand Down
3 changes: 3 additions & 0 deletions src/java/util/concurrent/AbstractExecutorService.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
/**
* 封装成RunnableFuture再执行execute方法。
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
Expand Down
28 changes: 16 additions & 12 deletions src/java/util/concurrent/ThreadPoolExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -374,12 +374,12 @@ public class ThreadPoolExecutor extends AbstractExecutorService {
* that workerCount is 0 (which sometimes entails a recheck -- see
* below).
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//running \ 0 = running。前3位表示运行状态,后29位表示工作线程的数量。
//running \ 0 = running。前3位表示运行状态,后29位表示工作线程的数量。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;//29
// 11111111111111111111111111111 29个1
// 11111111111111111111111111111 29个1 2的29次方减一
private static final int CAPACITY = (1 << COUNT_BITS) - 1;


//线程池的五种状态

//RUNNING:可以接受新任务并且处理进入队列中的任务
Expand All @@ -394,9 +394,6 @@ public class ThreadPoolExecutor extends AbstractExecutorService {
//TIDYING: 1000000000000000000000000000000
//TERMINATEd: 1100000000000000000000000000000

//这个类中将二进制数分为了两部分,高位代表线程池状态( runState),低位代表活动线程数( workerCount),
// CAPACITY代表最大的活动线程数,为2^29-1

private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
Expand Down Expand Up @@ -507,7 +504,7 @@ private void decrementWorkerCount() {
* Tracks largest attained pool size. Accessed only under
* mainLock.
*/
//跟踪最大的泳池大小。只有在mainlock访问。
//达到的最大线程池大小。只有在mainlock访问。
private int largestPoolSize;

/**
Expand Down Expand Up @@ -646,7 +643,7 @@ private final class Worker
/**
* Per-thread task counter
*/
//每个工作线程的任务计数器
//已完成任务的计数器
volatile long completedTasks;

/**
Expand Down Expand Up @@ -958,8 +955,14 @@ private List<Runnable> drainQueue() {
* state).
* @return true if successful
*/
/**
* 1.更新ctl的值
* 2.创建Worker实例,并添加至workers中
* 3.启动Worker中线程
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//这个循环主要是为了使ctl的值自增。为什么不直接用ctl遍历的incrementAndGet呢?因为这里我们不一定要自增成功,还要判断当前线程的数量合不合法。
for (; ; ) {
int c = ctl.get();//大多数情况就是ctl的值
int rs = runStateOf(c);
Expand All @@ -974,8 +977,7 @@ private boolean addWorker(Runnable firstTask, boolean core) {

for (; ; ) {
int wc = workerCountOf(c);//worker的数量,即线程池中工作线程的数量。
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))//数量不合法
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))//线程数不合法
return false;
if (compareAndIncrementWorkerCount(c))//工作线程数量加1. ctl的值表示当前工作线程数
break retry;
Expand All @@ -986,13 +988,14 @@ private boolean addWorker(Runnable firstTask, boolean core) {
}
}

//下面开始创建Worker
boolean workerStarted = false;//工作线程启动成功
boolean workerAdded = false;//工作线程添加成功
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
w = new Worker(firstTask);
final Thread t = w.thread;
w = new Worker(firstTask); //Worker类封装了一个线程和一个Runnable的task
final Thread t = w.thread; //获取Worker中的工作线程
if (t != null) {
mainLock.lock();
try {
Expand All @@ -1017,6 +1020,7 @@ private boolean addWorker(Runnable firstTask, boolean core) {
mainLock.unlock();
}
if (workerAdded) {
//启动线程
//执行runWorker()方法。里面有beforeExecutor,afterExecutor方法。在commond.run()方法的前后执行。
//在调用Worker的构造函数时,this.thread = getThreadFactory().newThread(this);这一句会将Worker当做
// 一个任务(Worker实现了Runnable接口),所以t.start()会调用Worker的run方法执行runWorker。
Expand Down

0 comments on commit 0f7dd2c

Please sign in to comment.