线程池
- 什么是线程池,为什么要有线程池
- 线程池的基本组成
- 常见的线程池实现
- 各种线程池的工作场景
- 线程池的参数设置
- 线程池关闭
什么是线程池,为什么要有线程池
线程池:即存放线程的容器,一个线程池中会存放一个或多个线程
为什么要有线程池
当我们需要多个线程来执行任务时,那么就需要去创建线程,用完之后就销毁线程,这样会给系统带来较大的消耗,因为线程的创建是会消耗CPU资源的,为了避免这种消耗所以有了线程池,线程池有几个优点:
- 线程的重用:比如我可以默认在线程池中存放几个永久存在的线程,任务来了可以直接从线程池里拿线程,用完之后再归还给线程池,这样就能够极大的避免线程的频繁创建和销毁
- 线程管理:所有的线程创建和销毁都交给线程池进行统一管理,而不是我们在代码中随处使用
new Thread(new Runnable())
来创建线程,导致管理起来特别困难
线程池的基本组成
- 线程池管理组件(ThreadPool):
- 工作线程:执行任务的线程
- 任务:被执行的任务
- 任务队列:当线程池中线程被占用完之后新提交的任务会存放到任务队列中
常见的线程池实现
- SingleThreadPoolExecutor:单线程的线程池,整个线程池中只有一个线程,可以用于做一些后台逻辑
- ScheduledThreadPoolExecutor:定时任务线程池,用于定时调度某个任务,一般用Spring自带的schedule或者quartz等其他定时调度框架来实现
- FixedThreadPoolExecutor:固定线程数量的线程池,线程池中固定N个线程,不会创建新的,如果有超过线程数量的任务被提交则将任务扔到任务队列中
- CachedThreadPoolExecutor:默认实现为不限制数量的线程池,根据任务多少来动态创建线程,线程空闲一定时间后会自动销毁
各种线程池的工作场景
SingleThreadPoolExecutor
单线程的线程池,可以用于做后台任务,但是一般不会使用这个
ScheduledThreadPoolExecutor
定时调度线程池,用于执行定时任务,一般也不用,可以用Spring自带的schedule来代替
FixedThreadPoolExecutor
固定线程数量的线程池,适用于负载较为稳定的系统,意思就是不会有什么高峰期、低峰期,每个时段的任务数量差不多,比如一些大数据的后台系统,每隔一定时间执行一次任务,会查询和处理大量数据
CachedThreadPoolExecutor
适用于有高峰期的系统,比如电商系统,一般中午、傍晚都是高峰期,但是其他时间段都是低峰期,在高峰期需要开辟出大量线程来处理用户请求,高峰期过去之后线程能自动销毁
如何创建一个线程池
- Executor:接口,用于执行任务
- ExecutorService:线程池接口,实现了Executor,并提供了对线程池的操作相关方法定义,如
#shutdown
、#submit
等 - Executors:工具类,可以用来创建线程池,用该工具类创建出来的线程池大部分的参数是默认的
- ThreadPoolExecutor:实现了ExecutorService接口,真正的线程池实现类
public static void main(String[] args) {
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(10);
}
上面是使用Executors的方法来创建线程池,但是阿里Java编码规范明确规定了最好不要用这种方式来创建线程池,因为通过这种方式无法根据业务来定制线程池参数,因此我们一般都会自己创建一个线程池,例如:
public static void createThreadPoolExecutor() {
// 创建一个fixedThreadPool
ThreadPoolExecutor fixedThreadPool = new ThreadPoolExecutor(
10, 10,
0L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10),
new ThreadPoolExecutor.AbortPolicy());
// 创建一个cachedThreadPool
ThreadPoolExecutor cachedThreadPool = new ThreadPoolExecutor(
10, 100,
60L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new ThreadPoolExecutor.AbortPolicy());
}
这里有一堆参数需要我们了解的:
- corePoolSize:核心线程数量,线程池中长期存活的线程数
- maximumPoolSize:最大线程数量,线程池中最大存在的线程数
- keepAliveTime:线程空闲时间,当线程空闲达到一定时间进行自我销毁
- workQueue:工作队列,当线程池内线程都处于工作状态时如果还有新的任务提交,那么就会先存放到工作队列中,等待有空闲线程之后才会执行该任务
- threadFactory:创建线程的工厂类
- rejectExecutionHandler:拒绝策略,当工作队列和线程都占用完了之后如果还有新的任务被提交应该怎么处理
线程池的工作原理
初始状态
初始状态下,线程池中没有任何线程和任务
线程池补充核心线程
创建线程池完毕后,线程池发现当前线程池中存活的线程数没有达到核心线程数量,于是开始补充核心线程,即调用threadFactory来创建线程
用户提交任务
接下来用户往线程池里提交了四个任务,但是核心线程只有两个,因此另外两个任务被扔到工作队列中了
线程池补充最大线程
如果线程池发现工作队列已满,此时会去检查是否还能创建新的线程,也就是检查最大线程数量 - 核心线程数量
是否大于0,如果大于0那么线程池就会创建额外线程来处理工作队列中的任务,此时线程池发现最大线程数量 - 核心线程数量 = 2
,因此会创建两个线程来处理工作队列中的任务
用户继续提交任务
用户继续提交任务,这一次用户提交了5个任务,线程池的线程都处于非空闲状态并且工作队列只有4个任务的容量
此时rejectExecutionHandle开始发挥作用了
拒绝策略发挥作用
- AbortPolicy:如果任务队列已满,则新提交的任务直接被拒绝,这里的实现是直接抛出一个
RejectedExecutionException
- CallerRunPolicy:直接运行新提交的任务
- DiscardOldestPolicy:丢弃掉最早进入队列的任务
- DiscardPolicy:丢弃任务,不对任务做任何处理
来看一下线程池执行任务的逻辑
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 双重检查
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
总的逻辑为三步:
- 如果当前线程池中的线程数量小于核心线程数量,那么会创建一个新的线程并执行新提交的任务
- 如果任务能够成功入队,接下来会使用
双重检查
的方式来检查是否应该创建一个新的线程来处理任务 - 如果任务不能入队,会尝试开启一个新的线程来处理任务,否则拒绝该任务
任务处理完毕,线程池回收额外线程
任务处理完毕之后,线程池将除核心线程之外的线程都回收掉,这需要根据keepAliveTime来判断线程是否可以回收,只有在线程空闲时间超过keepAliveTime指定的时间时,线程才可以被回收
线程池的参数设置
- corePoolSize:根据每个线程执行任务所需要花费的时间,比如一个任务需要消耗100ms,那么一个线程一秒钟就可以执行10个任务,如果一秒钟有200个任务需要执行,那么可以将corePoolSize设置为20,就可以满足每秒处理200个任务了,但是如果你不能特别确定每个任务执行的精准时间,那么你可以将corePoolSize设置大一点,比如设置为30
- maximumPoolSize:根据你选择的队列类型来设定,如果是fixedThreadPool,那么该参数就跟corePoolSize一样即可,如果是cachedThreadPool,那么该参数一般可以设置为你线上机器能负载的最大线程数量
fixedThreadPool
如果你选择的是fixedThreadPool,那么你需要针对你的业务来对workQueue和rejectExecutionHandler进行调整,默认情况下fixedThreadPool使用的是LinkedBlockingQueue,这是一个无界队列,意味着可以无限制的往队列里面提交任务,并且此时你的线程池的线程数量是有限的,这就很可能导致提交的任务数量过多而发生OOM
为了防止这种情况发生,你需要将无界队列替换为有界队列,然后选择合适的拒绝策略,比如你可以自定义一个拒绝策略,如果队列存放不下新的任务了,那么你就将任务写入到数据库,高峰期过去后,再重新从数据库取出并处理
cachedThreadPool
该类型的线程池主要就是可以瞬时创建大量的线程来处理任务,它默认使用的队列是SynchronousQueue,如果SynchronousQueue中的任务一直没有被拿走,那么线程池就认为队列满了,此时线程池就会创建新的线程来获取任务,因为默认情况下该线程池的maximumPoolSize是无限大的,所以会无限创建新的线程,只有当线程空闲了才会回收,因此不太适合处理太耗时的任务,否则会导致线程创建过多,CPU占用率增加导致系统运行速度变慢
我们可以限制一下maximumPoolSize的数量,这样能够降低上面所述情况的影响
线程池关闭
- shutdown:优雅关闭线程池,此时线程池不再接收新的任务,同时会等到线程池中所有线程都执行完成然后再释放线程池资源
- shutdownNow:强制关闭线程池,返回还未执行的任务列表,尝试停止正在执行的任务,暂停新的任务被执行