简介

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。

一种使用线程的模式,存放了很多可以复用的线程,对线程统一管理。我们可以使用new的方式去创建线程,但若是并发线程太高,每个线程执行时间不长,这样频繁的创建销毁线程是比较耗费资源的,线程池就是用来解决此问题的。

线程池的优点

  1. 降低资源的消耗:线程可以重复使用,不需要在创建线程和消耗线程上浪费资源;
  2. 提高响应速度:任务到达时,线程可以复用已有的线程,及时响应;
  3. 可管理性:无限制的创建线程会降低系统效率,线程池可以对线程进行管理、监控、调优。

线程池的继承关系:

ThreadPoolExecutor

ThreadPoolExecutor是线程池最核心的一个类,构造类代码如下:

package java.util.concurrent;

public class ThreadPoolExecutor extends AbstractExecutorService {
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}

}

参数说明:

  • corePoolSize:线程池核心线程数。默认情况下,线程池中是没有线程的,当还没有一次任务到达过时,初始化的线程数为0,当有任务初次来临,直接创建corePoolSize个线程;核心线程生命周期无限,即使空闲也不会死亡。
  • maximumPoolSize:线程池能创建的最大线程数。当核心线程数已满,并且工作队列也已经存放满,才会去判断当前线程数是否小于maximumPoolSize,小于则继续创建线程处理任务,等于则执行拒绝策略。
  • keepAliveTime:闲置超时时间。当线程池中的线程数大于corePoolSize时,此值才生效,即大于corePoolSize的线程在经过keepAliveTime的时间依然没有任务执行,则销毁线程。
  • unit:超时时间单位。参数keepAliveTime的单位。NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS
  • workQueue:工作队列。当核心线程数已满时,新提交的任务放到任务队列中(前提是任务队列没满)。
  • threadFactory:线程池创建新线程的工厂。创建线程,一般默认即可。
  • handler: 线程池达到饱和之后的拒绝策略。当线程数达到最大线程maximumPoolSize后(此时队列已经存满),再有新任务提交,执行的处理策略。

workQueue工作队列

workQueque决定了缓存任务的排队策略,对于不同的业务场景,我们可以选择不同的工作队列。类型为BlockingQueue

package java.util.concurrent;
public interface BlockingQueue<E> extends Queue<E> {

}

SynchronousQueue

没有容量,直接提交队列,是无缓存等待队列,当任务提交进来,它总是马上将任务提交给线程去执行,如果线程已经达到最大,则执行拒绝策略;所以使用SynchronousQueue阻塞队列一般要求maximumPoolSize为无界(无限大),避免线程拒绝执行操作。从源码中可以看到容量为0:

package java.util.concurrent;
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
//是否为空,直接返回的true
public boolean isEmpty() {
return true;
}
//队列大小为0
public int size() {
return 0;
}
}

LinkedBlockingQueue

默认情况下,LinkedBlockingQueue是个无界的任务队列,默认值是Integer.MAX_VALUE,当然我们也可以指定队列的大小。从构造LinkedBlockingQueue源码中可以看出它的大小指定方式,为了避免队列过大造成机器负载,或者内存泄漏,我们在使用的时候建议手动传一个队列的大小。内部分别使用了takeLock和putLock对并发进行控制,添加和删除操作不是互斥操作,可以同时进行,这样大大提供了吞吐量。源码中有定义这两个锁

package java.util.concurrent;
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
//默认构造函数,大小为Integer最大
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
//也可以指定大小
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
//获取元素使用的锁
private final ReentrantLock takeLock = new ReentrantLock();

//加入元素使用的锁
private final ReentrantLock putLock = new ReentrantLock();

//获取元素时使用到takeLock锁
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
//加锁操作
takeLock.lock();
try {
//获取元素
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
//解锁
takeLock.unlock();
}
}

//添加元素到队列中使用putLock锁
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
//加锁操作
putLock.lock();
try {
//队列中存放的数据小于队列设置的值
if (count.get() < capacity) {
//添加元素
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
//解锁
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
}

ArrayBlockingQueue

可以理解为有界的队列,创建的时候必须要指定队列的大小,从源码可以看出构造的时候要传递值

package java.util.concurrent;
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {

public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
}

DelayQueue

是一个延迟队列,无界、队列中每个元素都有过期时间,当从队列获取元素时,只有过期的元素才会出队,而队列头部是最早过期的元素,若是没有过期,则进行等待。利用这个特性,我们可以用来处理定时任务调用的场景,例如订单过期未支付自动取消,设置一个在队列中过期的时间,过期了后,再去查询订单的状态,若是没支付,则调用取消订单的方法。

package java.util.concurrent;
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {

//获取元素
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
//获取元素
E first = q.peek();
if (first == null)
//进入等待
available.await();
else {
//获取过期时间
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
//小于等于0则过期,返回此元素
return q.poll();
first = null;
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
//设置还需要等待的时间
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
}

handler四种拒绝策略

package java.util.concurrent;
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

RejectedExecutionHandler接口的实现类有

DiscardPolicy

DiscardPolicy:当任务添加到线程池中被拒绝时,直接丢弃任务,不抛出异常

public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}

AbortPolicy

AbortPolicy:当任务添加到线程池中被拒绝时,直接丢弃任务,并抛出RejectedExecutionException异常

public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());
}
}

DiscardOldestPolicy

DiscardOldestPolicy:当任务添加到线程池中被拒绝时,判断线程池是否还在运行,然后获取队列,让队首(最久)的元素出队,直接抛弃,把当前任务添加执行,不出意外还是添加到队列中,除非当前这会好几个线程执行完,线程数小于了corePoolSize。

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
//线程池还没有销毁停止
if (!e.isShutdown()) {
//获取队列,并让队列头(最久)的任务出队,丢弃队头
e.getQueue().poll();
//执行新任务,新任务再添加到队列中
e.execute(r);
}
}
}

CallerRunsPolicy

CallerRunsPolicy:当任务添加到线程池中被拒绝时,判断线程池是否还在运行,直接在主线程中运行此任务,即在调用execute或者submit的方法中执行,不再使用线程池来处理此任务。

public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}

为了说明这一点,来看一个demo:

public static void main(String[] args) {
//最大线程数设置为2,队列最大能存2,使用主线程执行的拒绝策略
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,2,10, TimeUnit.SECONDS,new LinkedBlockingQueue<>(2),new ThreadPoolExecutor.CallerRunsPolicy());
//此时有6个任务,最大线程+队列能处理4个,主线程需要处理6-4=2个
for(int i = 0; i < 6;i ++) {
Runnable run = new Runnable(){
@Override
public void run() {
try {
Thread.sleep(10);
System.out.println("执行当前任务的线程:"+Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
threadPoolExecutor.execute(run);
}
}

输出结果:
执行当前任务的线程:main
执行当前任务的线程:pool-1-thread-1
执行当前任务的线程:pool-1-thread-2
执行当前任务的线程:main
执行当前任务的线程:pool-1-thread-1
执行当前任务的线程:pool-1-thread-2

线程池处理任务策略说明

  1. 如果当前线程池中的线程数量小于corePoolSize,则会创建一个线程执行此任务;
  2. 如果当前线程池中的线程数量大于corePoolSize,则会尝试将其添加到队列中,若添加成功,则该任务会排队等待线程将其取出进行执行;若队列中已达最大值,则添加失败,则会尝试创建新的线程执行这个任务;
  3. 如果当前线程池中的线程数量已经达到maximumPoolSize,则尝试创建新的线程结果为false,会采取任务拒绝策略;
  4. 如果线程池中线程数量大于corePoolSize,则当空闲时间超过keepAliveTime的线程,将会被终止,直到线程池数量不大于corePoolSize为止。

当提交一个新任务后,线程池的处理流程图:

来看当添加一个任务到线程池的源码:

public void execute(Runnable command) {
//执行的任务为空,直接抛出异常
if (command == null)
throw new NullPointerException();

//ctl:AtomicInteger类型,获取当前线程池中的线程数
int c = ctl.get();
//当前线程数小于核心线程数,直接创建线程执行任务
if (workerCountOf(c) < corePoolSize) {
//创建线程执行任务,从wc >= (core ? corePoolSize : maximumPoolSize)可以看出,true代表创建核心线程,false代表创建非核心线程
if (addWorker(command, true))
//创建线程成功,直接返回
return;
//没成功,重新获取当前线程数
c = ctl.get();
}
//线程池还是运行状态、并且把任务添加到队列中成功
if (isRunning(c) && workQueue.offer(command)) {
//获取下当前线程数
int recheck = ctl.get();
//若是线程池不运行了,则把当前添加的任务移出
if (! isRunning(recheck) && remove(command))
//执行拒绝策略
reject(command);
//当前运行的线程数为0,
else if (workerCountOf(recheck) == 0)
//传递空参数,不进行创建
addWorker(null, false);
}
//尝试创建线程,此时传递false,wc >= (core ? corePoolSize : maximumPoolSize),则看线程的上限匹配maximumPoolSize
else if (!addWorker(command, false))
//创建线程失败,执行拒绝策略
reject(command);
}

Executors四种方式创建线程池

Executors类(并发包)提供了4种创建线程池的方法,这些方法最终都是通过配置ThreadPoolExecutor的不同参数,来达到不同的线程管理效果。

newFixedThreadPool

创建一个定长的线程池,可控制最大并发数,超出的线程进行排队等待。此线程池的核心线程数、最大线程数都是nThreads,线程空闲回收时间配置也没有意义了,所以闲置时间给0,队列使用LinkedBlockingQueue无界的方式,当线程数达到nThreads后,新任务放到队列中。

缺点:因为LinkedBlockingQueue是一个无界的队列,当线程数达到核心线程数时,新提交的任务会一直放到队列中,当任务很多的时候,会造成OOM(内存不足)。

public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

测试demo:

public static void main(String[] args)  {
// 创建定长线程池
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(4);

for (int i = 0; i < 10; i++) {
//创建任务
Runnable runnable = new Runnable(){
@Override
public void run() {
try {
Thread.sleep(3);
System.out.println("当前执行的线程为:"+Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//任务添加到线程池
newFixedThreadPool.execute(runnable);
}
}

newSingleThreadExecutor

创建一个单线程池,它只会用唯一的工作线程来执行任务,超出的线程进行排队等待。此线程池的核心线程数、最大线程数都是1,线程空闲回收时间配置也没有意义了,所以闲置时间给0,队列使用LinkedBlockingQueue无界的方式,当线程数达到1后,新任务放到队列中。

缺点:因为LinkedBlockingQueue是一个无界的队列,当线程数达到核心线程数时,新提交的任务会一直放到队列中,当任务很多的时候,会造成OOM(内存不足)。

public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

测试demo:

public static void main(String[] args)  {
// 创建单线程-线程池,任务依次执行
ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 6; i++) {
//创建任务
Runnable runnable = new Runnable(){
@Override
public void run() {
System.out.println("当前执行的线程为:"+Thread.currentThread().getName());
}
};
//任务添加到线程池
newSingleThreadExecutor.execute(runnable);
}
}

newCachedThreadPool

创建一个可缓存的线程池,如果线程池长度大于处理需要,则根据线程空闲时间大于60s的会进行销毁;新任务添加进来,若是没有空闲的线程复用,则会立马创建一个线程来处理,因为使用的是无缓存队列。此线程池的核心线程数为0、最大线程数为无界Integer.MAX_VALUE,线程空闲回收时间60S,队列使用SynchronousQueue无缓存的方式,当有任务添加,能复用之前线程则复用,没有空闲线程则创建新线程。

缺点:因为最大线程数为无界,当任务很多的时候,会创建大量线程,造成OOM(内存不足)。

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

测试demo:

public static void main(String[] args)  {
// 创建可缓存线程池
ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();

for (int i = 0; i < 6; i++) {
//创建任务
Runnable runnable = new Runnable(){
@Override
public void run() {
try {
Thread.sleep(6);
System.out.println("当前执行的线程为:"+Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}

}
};
//任务添加到线程池
newCachedThreadPool.execute(runnable);
}
}

newScheduledThreadPool

创建支持定时、周期任务的线程池。此线程池的核心线程数为corePoolSize、最大线程数为无界Integer.MAX_VALUE,线程空闲回收时间0S,当线程数大于corePoolSize时,有线程处理完任务后,接下来就进行销毁。队列使用DelayedWorkQueue延迟队列,可以设置延时时间,当元素达到延时时间,才从队列出队。

缺点:因为最大线程数为无界,当任务很多的时候,会创建大量线程,造成OOM(内存不足)。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

测试demo:

public static void main(String[] args)  {
// 创建支持定时线程池
ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(2);

for (int i = 0; i < 5; i++) {
//创建任务
Runnable runnable = new Runnable(){
@Override
public void run() {
System.out.println("当前执行的线程为:"+Thread.currentThread().getName());
}
};
//任务添加到线程池,延迟2秒后才能从队列中出队
newScheduledThreadPool.schedule(runnable, 2, TimeUnit.SECONDS);
}
}