jdk源码系列-线程池

jdk源码系列-线程池

Executors 是 Executor、ExecutorService、ThreadFactory、Callable 类的工厂和工具方法。

创建一个固定大小的线程池

通过重用共享无界队列里的线程来减少线程创建的开销。当所有的线程都在执行任务,新增的任务将会在队列中等待,直到一个线程空闲。由于在执行前失败导致的线程中断,如果需要继续执行接下去的任务,新的线程会取代它执行。线程池中的线程会一直存在,除非明确地 shutdown 掉。

1
2
3
public static ExecutorService newFixedThreadPool(int nThreads) {  
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());  
}

返回的是 ThreadPoolExecutor 类的对象,这个构造方法使用传入的参数和默认的线程工厂与拒绝执行的处理。 corePoolSize:线程池中的线程数量,除非设置了 allowCoreThreadTimeOut, 否则就算线程空闲还是在保存在线程池中 maximumPoolSize:线程池中允许存放最大的线程数量 keepAliveTime:当线程数大于 corePoolSize,如果 keepAliveTime 内空闲的线程未执行,线程将被终结 unit:keepAliveTime 的时间单位 workQueue:保存 execute() 提交的任务

1
2
3
4
5
6
7
8
public ThreadPoolExecutor(int corePoolSize,  
                          int maximumPoolSize,  
                          long keepAliveTime,  
                          TimeUnit unit,  
                          BlockingQueue<Runnable> workQueue) {  
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,  
         Executors.defaultThreadFactory(), defaultHandler);  
}  

可以看到固定大小线程池,corePoolSize 和 maximumPoolSize 传入的参数是一样的

创建一个单个线程的线程池

任务会被保证顺序执行,因为只有一个工作线程。不像 newFixedThreadPool(1),这个不保证任务顺序执行。corePoolSize 和 maximumPoolSize 都是 1。

1
2
3
4
public static ExecutorService newSingleThreadExecutor() {  
    return new FinalizableDelegatedExecutorService  
        (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));  
}

创建一个可按需自动扩容的线程池,但是会优先重用线程池中空闲可用的线程

这个类型的线程池将会大大提升执行许多短暂的异步任务的程序。如果线程池中线程都在使用,又有新任务到来,则新增一个线程到线程池。如果线程 60 秒内空闲,则将被终止移除线程池。corePoolSize 为 0,可知一旦线程 60s 空闲就会被移出线程池

1
2
3
public static ExecutorService newCachedThreadPool() {  
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());  
}  

创建一个在一定延迟时间后调度命令的线程池,或者周期性执行的线程池。

1
2
3
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {  
    return new ScheduledThreadPoolExecutor(corePoolSize);  
}  

创建完线程池,然后就该执行任务了

看下内部类 DelegatedExecutorService 里的 execute 方法:可以看到任务执行策略(单线程串行、多线程并行)和任务的具体执行分离,是一个典型的命令模式。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
static class DelegatedExecutorService extends AbstractExecutorService {  
    private final ExecutorService e;  
    DelegatedExecutorService(ExecutorService executor) { e = executor; }  
    public void execute(Runnable command) { e.execute(command); }  
    public void shutdown() { e.shutdown(); }  
    public List<Runnable> shutdownNow() { return e.shutdownNow(); }  
    public boolean isShutdown() { return e.isShutdown(); }  
    public boolean isTerminated() { return e.isTerminated(); }  
    public boolean awaitTermination(long timeout, TimeUnit unit)  
        throws InterruptedException {  
        return e.awaitTermination(timeout, unit);  
    }  
    public Future<?> submit(Runnable task) {  
        return e.submit(task);  
    }  
    public <T> Future<T> submit(Callable<T> task) {  
        return e.submit(task);  
    }  
    public <T> Future<T> submit(Runnable task, T result) {  
        return e.submit(task, result);  
    }  
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)  
        throws InterruptedException {  
        return e.invokeAll(tasks);  
    }  
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,  
                                         long timeout, TimeUnit unit)  
        throws InterruptedException {  
        return e.invokeAll(tasks, timeout, unit);  
    }  
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)  
        throws InterruptedException, ExecutionException {  
        return e.invokeAny(tasks);  
    }  
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,  
                           long timeout, TimeUnit unit)  
        throws InterruptedException, ExecutionException, TimeoutException {  
        return e.invokeAny(tasks, timeout, unit);  
    }  
}  

创建线程池的时候,已经实例化了 ThreadPoolExecutor,所以上面 execute() 方法实际是调用 ThreadPoolExecutor 的 execute:给定的任务可能在未来的某个时刻执行。可能是新建一个线程执行,也可能是线程中原有的线程执行。如果任务不能执行,可能是这个 executor 已经被 shutdown 了,也可能是到达了线程池的执行阈值,任务被拒绝执行处理器处理中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
public void execute(Runnable command) {  
    if (command == null)  
        throw new NullPointerException();  
    int c = ctl.get();  
    if (workerCountOf(c) < corePoolSize) {  
        if (addWorker(command, true))  
            return;  
        c = ctl.get();  
    }  
    if (isRunning(c) && workQueue.offer(command)) {  
        int recheck = ctl.get();  
        if (! isRunning(recheck) && remove(command))  
            reject(command);  
        else if (workerCountOf(recheck) == 0)  
            addWorker(null, false);  
    }  
    else if (!addWorker(command, false))  
        reject(command);  
}  

看到这,其实发现线程池的核心实现就是在 ThreadPoolExecutor 里面,所以先介绍下 ThreadPoolExecutor 类的作用:

上图可以看出 ThreadPoolExecutor 类的层次结构中的位置,是对抽象方法和接口的完整实现,即核心代码在这个类里。

一个 ExecutorService 执行每个任务可能用到线程池中的一个或多个线程,线程池由 Executors 工厂创建。 线程池解决了两个不同的问题:

  1. 执行大量异步的任务时,线程池减少线程的创建来减少开销,提升性能
  2. 提供了对资源的管理,包括当执行一系列任务时,线程的消耗。每个 ThreadPoolExecutor 也存储有些基本数据,诸如完成的任务数量

想要在更广阔的背景下使用的话,这个类提供了许多可调整的参数和扩展的 hook。不管怎么样,还是推荐使用 Executors 来创建线程池比较方便。

这个主要的线程池控制状态 ctl,使用 AtomicInteger 存储两个概念上的字段 workerCount(线程池有效的线程数) 和 runState(线程池是 running、shuting down 等等状态)

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

上述 execute() 方法有三个处理步骤:

  1. 如果正在 running 的线程 (即 worker 线程) 小于 corePoolSize ( workerCountOf(c) < corePoolSize ),尝试创建一个新线程并把传入的 command(任务) 作为它的第一个任务。调用 addWorker(command, true) 自动检查 runState 和 workCount,以便在不能增加线程的时候返回 false。添加成功直接 return,不能添加就要看下面的步骤
  2. 如果被成功放进队列(if (isRunning(c) && workQueue.offer(command)) workQueue 是一个任务排队的阻塞队列),然后还需要二次检查线程池是否 shut down(上次检查后到这次检查前死亡)。所以我们重新检查状态,如果线程池停止的话,回滚进队操作,或者如果没有工作的线程开启一个新线程(addWorker(null, false);)
  3. 如果不能让任务进入阻塞队列,然后尝试新增一个线程。如果新增线程失败,可能是线程池 shut down 或者线程池饱和(达到 maxPoolSize),所以接下来抛弃这个任务。

任务(Task)被包装在一个叫做 Worker 的内部类,Worker 继承 AQS 来实现任务增加/删除的同步控制,使用 HashSet 来保存 Worker 线程。如果 worker 线程大于 corePoolSize,则不创建 worker 线程,而是放入一个 BlockingQueue 排队。如果有界队列的 BlockingQueue 满了,则尝试增加线程到线程池,但是线程总数要小于 maxPollSize。

addWorker() 方法:检查线程池当前状态和线程数量的边界条件看是否可以增加 worker 线程。如果可以,工作线程计数响应调整,并且,如果可能的话,新的 worker 线程被创建,启动,跑它的第一个 task。如果线程池 stop 或者要被 shut down,此方法返回 false。如果线程工厂(thread factory)创建线程失败,此方法也会返回 false。如果线程创建失败,不管是 thread factory 返回 null,或者 exception(通常是 OOM),都会被回滚。

参数 firstTask: 新线程应该运行的第一个任务。如果 worker 线程小于 corePoolSize,worker 和初始化的第一次任务一起创建绕过排队这一过程,或者队列已满。初始化空闲线程通常是通过 prestartCoreThread 或者替换已经死亡的 worker 线程。 core:如果 true,则使用 corePoolSize 作为边界,否则使用 maximumPoolSize 作为边界。(这里使用 Boolean 而不是传入实际值,是因为传入值可能会在传入过程被改变,在方法中直接读取值更精确)。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
private boolean addWorker(Runnable firstTask, boolean core) {  
    retry:  
    for (;;) {  
        int c = ctl.get();  
        int rs = runStateOf(c);  
  
        // Check if queue empty only if necessary.  
        if (rs >= SHUTDOWN &&  
            ! (rs == SHUTDOWN &&  
               firstTask == null &&  
               ! workQueue.isEmpty()))  
            return false;  
  
        for (;;) {  
            int wc = workerCountOf(c);  
            if (wc >= CAPACITY ||  
                wc >= (core ? corePoolSize : maximumPoolSize))  
                return false;  
            if (compareAndIncrementWorkerCount(c))  
                break retry;  
            c = ctl.get();  // Re-read ctl  
            if (runStateOf(c) != rs)  
                continue retry;  
            // else CAS failed due to workerCount change; retry inner loop  
        }  
    }  
  
    boolean workerStarted = false;  
    boolean workerAdded = false;  
    Worker w = null;  
    try {  
        final ReentrantLock mainLock = this.mainLock;  
        w = new Worker(firstTask);  
        final Thread t = w.thread;  
        if (t != null) {  
            mainLock.lock();  
            try {  
                // Recheck while holding lock.  
                // Back out on ThreadFactory failure or if  
                // shut down before lock acquired.  
                int c = ctl.get();  
                int rs = runStateOf(c);  
  
                if (rs < SHUTDOWN ||  
                    (rs == SHUTDOWN && firstTask == null)) {  
                    if (t.isAlive()) // precheck that t is startable  
                        throw new IllegalThreadStateException();  
                    workers.add(w);  
                    int s = workers.size();  
                    if (s > largestPoolSize)  
                        largestPoolSize = s;  
                    workerAdded = true;  
                }  
            } finally {  
                mainLock.unlock();  
            }  
            if (workerAdded) {  
                t.start();  
                workerStarted = true;  
            }  
        }  
    } finally {  
        if (! workerStarted)  
            addWorkerFailed(w);  
    }  
    return workerStarted;  
}  

鸣谢

wenniuwuren

署名 - 非商业性使用 - 禁止演绎 4.0