study

View project on GitHub

线程池

ExecutorService

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 这个是通过CAS 来实现的

高3位 用于存储状态:
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS; (等待线程执行完)
private static final int STOP       =  1 << COUNT_BITS; (取消所有执行的任务)
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS; (取消完成后的状态)

低位储存 worker的数量

BlockingQueue workQueue 待执行的runnable ReentrantLock mainLock ,用于维护workers的添加,largestPoolSize数量等 HashSet workers 工作线程Set

execute

  1. 如果当前工作的work数量小于 corePoolSize
    1. 尝试添加一个线程到corePool里,添加成功成功直接返回
  2. 如果线程池是在running状态,且在队列里添加成功 再二次确认一遍 第二次确认状态不对的话,从列表里移除,移除成功后,尝试tryTerminate,然后reject掉 如果状态正确,但是work数量是0里,新建一个非core线程
  3. 如果添加失败,尝试添加一个新的线程 如果再失败,reject这个command
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

addWorker

添加一个线程

  1. CAS 增加count
  2. 增加成功后添加一个Work放到workers里
  3. 如果需要执行这个线程,则执行work里的thread
    private boolean addWorker(Runnable firstTask, boolean core) {
     retry:
     for (;;) {
         int c = ctl.get();
         int rs = runStateOf(c);
         if (rs >= SHUTDOWN &&
             ! (rs == SHUTDOWN &&
                 firstTask == null &&
                 ! workQueue.isEmpty()))
             return false;
    
         for (;;) {
             int wc = workerCountOf(c);
             if (wc >= CAPACITY ||
                 wc >= (core ? corePoolSize : maximumPoolSize))
                 return false;
             if (compareAndIncrementWorkerCount(c))
                 break retry;
             c = ctl.get();  // Re-read ctl
             if (runStateOf(c) != rs)
                 continue retry;
             // else CAS failed due to workerCount change; retry inner loop
         }
     }
    
     boolean workerStarted = false;
     boolean workerAdded = false;
     Worker w = null;
     try {
         w = new Worker(firstTask);
         final Thread t = w.thread;
         if (t != null) {
             final ReentrantLock mainLock = this.mainLock;
             mainLock.lock();
             try {
                 // Recheck while holding lock.
                 // Back out on ThreadFactory failure or if
                 // shut down before lock acquired.
                 int rs = runStateOf(ctl.get());
    
                 if (rs < SHUTDOWN ||
                     (rs == SHUTDOWN && firstTask == null)) {
                     if (t.isAlive()) // precheck that t is startable
                         throw new IllegalThreadStateException();
                     workers.add(w);
                     int s = workers.size();
                     if (s > largestPoolSize)
                         largestPoolSize = s;
                     workerAdded = true;
                 }
             } finally {
                 mainLock.unlock();
             }
             if (workerAdded) {
                 t.start(); // 这里会调用Work的run,然后调用runWorker
                 workerStarted = true;
             }
         }
     } finally {
         if (! workerStarted)
             addWorkerFailed(w);
     }
     return workerStarted;
    }
    

Worker 的数据结构

包含一个线程和正在执行的 基于AQS

每个线程执行的是调用runWorker

runWorker

  1. 一个while循环,每次执行getTask,获取一个Runnable执行
  2. 一个thread执行的时候会加锁,加锁使用的是worker的锁,基于AQS
     final void runWorker(Worker w) {
     Thread wt = Thread.currentThread();
     Runnable task = w.firstTask;
     w.firstTask = null;
     w.unlock(); // allow interrupts
     boolean completedAbruptly = true;
     try {
         while (task != null || (task = getTask()) != null) {
             w.lock();
             // If pool is stopping, ensure thread is interrupted;
             // if not, ensure thread is not interrupted.  This
             // requires a recheck in second case to deal with
             // shutdownNow race while clearing interrupt
             if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                     runStateAtLeast(ctl.get(), STOP))) &&
                 !wt.isInterrupted())
                 wt.interrupt();
             try {
                 beforeExecute(wt, task);
                 Throwable thrown = null;
                 try {
                     task.run();
                 } catch (RuntimeException x) {
                     thrown = x; throw x;
                 } catch (Error x) {
                     thrown = x; throw x;
                 } catch (Throwable x) {
                     thrown = x; throw new Error(x);
                 } finally {
                     afterExecute(task, thrown);
                 }
             } finally {
                 task = null;
                 w.completedTasks++;
                 w.unlock();
             }
         }
         completedAbruptly = false;
     } finally {
         processWorkerExit(w, completedAbruptly);
     }
    }
    

Future submit

这个主要是通过AbstractExecutorService,通过Runnable生成一个RunnableFuture(FutureTask)

具体看FutureTask里的逻辑:

* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

比较简单
run之后的结果set到outcome,状态设置到NORMAL
然后唤醒所有的waiting线程
public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                        null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;        // to reduce footprint
}