
阿里妹导读
一、线程池简介
1.什么是线程池?
2.线程池有什么好处?
减少线程创建和销毁的开销,线程的创建和销毁需要消耗系统资源,线程池通过复用线程,避免了对资源的频繁操作,从而提高系统性能;
控制和优化系统资源利用,线程池通过控制线程的数量,可以尽可能地压榨机器性能,提高系统资源利用率;
提高响应速度,线程池可以预先创建线程且通过多线程并发处理任务,提升任务的响应速度及系统的并发性能;
二、Java线程池的实现原理
1.类继承关系
execute(Runnable r):没有返回值,仅仅是把一个任务提交给线程池处理submit(Runnable r):返回值为Future类型,当任务处理完毕后,通过Future的get()方法获取返回值时候,得到的是nullsubmit(Runnable r,Object result):返回值为Future类型,当任务处理完毕后,通过Future的get()方法获取返回值时候,得到的是传入的第二个参数resultshutdown():关闭线程池,不接受新任务,但是等待队列中的任务处理完毕才能真正关闭shutdownNow():立即关闭线程池,不接受新任务,也不再处理等待队列中的任务,同时中断正在执行的线程setCorePoolSize(int corePoolSize):设置核心线程数setKeepAliveTime(long time, TimeUnit unit):设置线程的空闲时间setMaximumPoolSize(int maximumPoolSize):设置最大线程数setRejectedExecutionHandler(RejectedExecutionHandler rh):设置拒绝策略setThreadFactory(ThreadFactory tf):设置线程工厂beforeExecute(Thread t, Runnable r):任务执行之前的钩子函数,这是一个空函数,使用者可以继承ThreadPoolExecutor后重写这个方法,实现其中的逻辑afterExecute(Runnable r, Throwable t):任务执行之后的钩子函数,这是一个空函数,使用者可以继承ThreadPoolExecutor后重写这个方法,实现其中的逻辑
2.线程池的状态
RUNNING:线程池一旦被创建,就处于RUNNING状态,任务数为0,能够接收新任务,对已排队的任务进行处理。
SHUTDOWN:不接收新任务,但能处理已排队的任务。当调用线程池的shutdown()方法时,线程池会由RUNNING转变为SHUTDOWN状态。
STOP:不接收新任务,不处理已排队的任务,并且会中断正在处理的任务。当调用线程池的shutdownNow()方法时,线程池会由RUNNING或SHUTDOWN转变为STOP状态。
TIDYING:当线程池在SHUTDOWN状态下,任务队列为空且执行中任务为空,或者线程池在STOP状态下,线程池中执行中任务为空时,线程池会变为TIDYING状态,会执行terminated()方法。这个方法在线程池中是空实现,可以重写该方法进行相应的处理。
TERMINATED:线程池彻底终止。线程池在TIDYING状态执行完terminated()方法后,就会由TIDYING转变为TERMINATED状态。
3.线程池的执行流程

4.问题思考
线程池的核心线程可以回收吗?
线程池在提交任务前,可以提前创建线程吗?
三、源码分析
1.execute(Runnable command)

2.addWorker(Runnable firstTask, boolean core)


线程创建成功并添加到线程池后,会调用start()方法,启动线程,执行任务。

3.runWorker(Worker w)


4.getTask()

根据是否需要超时控制,提供两个阻塞方法获取阻塞队列中的任务。
5.processWorkerExit(w, completedAbruptly)

四、线程池在业务中的最佳实践
1.如何选择合适的线程池参数
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
1.corePoolSize: 核心线程数2.maximumPoolSize: 最大线程数3.keepAliveTime: 线程的空闲时间4.unit: 空闲时间的单位(秒、分、小时等等)5.workQueue: 等待队列6.threadFactory: 线程工厂7.handler: 拒绝策略ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)ThreadPoolExecutor.CallerRunsPolicy:由调用线程直接处理该任务(可能为主线程Main),保证每个任务执行完毕
如何选择合适的线程池参数?
2.如何正确地创建线程池对象
FixedThreadPool:具有固定线程数量的线程池,无界阻塞队列;
CachedThreadPool:线程数量可以动态伸缩的线程池,最大线程数为Integer.MAX_VALUE
SingleThreadPool:单个线程的线程,核心线程数和最大线程数都是1,无界阻塞队列
public class TestThreadPool {/*** 线程池*/private static ExecutorService executor = initDefaultExecutor();/*** 统一的获取线程池对象方法*/public static ExecutorService getExecutor() {return executor;}private static final int DEFAULT_THREAD_SIZE = 16;private static final int DEFAULT_QUEUE_SIZE = 10240;private static ExecutorService initDefaultExecutor() {return new ThreadPoolExecutor(DEFAULT_THREAD_SIZE, DEFAULT_THREAD_SIZE,300, TimeUnit.SECONDS,new ArrayBlockingQueue<>(DEFAULT_QUEUE_SIZE),new DefaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy());}}
局部变量定义的线程池对象在方法结束后可以被垃圾回收吗?
public static void main(String[] args) {test1();test2();}public static void test1(){Object obj = new Object();System.out.println("方法一执行完成");}public static void test2(){ExecutorService executorService = Executors.newFixedThreadPool(10);executorService.execute(new Runnable() {@Overridepublic void run() {System.out.println("方法二执行完成");}});}
问题分析与解答
虚拟机栈(栈帧中的本地变量表)中引用的对象;
方法区中的类静态属性引用的对象;
方法区中常量引用的对象;
本地方法栈中JNI(即一般说的Native方法)引用的对象;
正在运行的线程;
private final class Workerextends AbstractQueuedSynchronizerimplements Runnable
public class Outer {private String name;private Inner inner;public int outerMethod() {return 1;}/*** 非静态内部类*/class Inner {private void innerMethod() {//在非静态内部类中可以直接调用外部类的方法outerMethod();}private String address;}}
class Outer$Inner {private String address;Outer$Inner(Outer var1) {this.this$0 = var1;}private void innerMethod() {this.this$0.outerMethod();}}
public class Outer {private String name;private Inner inner;public int outerMethod() {return 1;}/*** 静态内部类*/static class Inner {private String address;}}
class Outer$Inner {private String address;Outer$Inner() {}}
这个问题带来两个启发:
3.相互依赖的子任务避免使用同一线程池
public class FartherAndSonTask {public static ExecutorService executor= TestThreadPool.getExecutor();public static void main(String[] args) throws Exception {FatherTask fatherTask = new FatherTask();Future<String> future = executor.submit(fatherTask);future.get();}/*** 父任务,里面异步执行子任务*/static class FatherTask implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("开始执行父任务");SonTask sonTask = new SonTask();Future<String> future = executor.submit(sonTask);String s = future.get();System.out.println("父任务已拿到子任务执行结果");return s;}}/*** 子任务*/static class SonTask implements Callable<String> {@Overridepublic String call() throws Exception {//处理一些业务逻辑System.out.println("子任务执行完成");return null;}}}
使用不同的线程池隔离有相互依赖的任务;
调用future.get()方法设置超时时间,这样做可以避免线程阻塞,但是依然会出现大量的超时异常;
4.合理选择submit()和execute()方法
execute(Runnable r):没有返回值,仅仅是把一个任务提交给线程池处理,轻量级方法,适用于处理不需要返回结果的任务;
submit(Runnable r):返回值为Future类型,future可以用来检查任务是否已经完成,获取任务的结果等,适用于需要处理返回结果的任务;
private void asyncSupplyPriceSync(List<Long> shidList, SupplyPriceSyncMsg msg) {if (CollectionUtils.isEmpty(shidList)) {return;}PlatformLogUtil.logInfo("异步推送酒店报价信息供给总数:", shidList.size());final Map<String, Future<?>> futures = Maps.newLinkedHashMap();//分批提交线程池处理Lists.partition(shidList, SwitchConfig.HOTEL_PRICE_ASYNC_LIST_SIZE).forEach(subList -> {try {futures.put(UUID.randomUUID().toString(), executorService.submit(() -> batchSupplyPriceSync(subList, msg)));} catch (Exception e) {PlatformLogUtil.logFail("异步推送报价信息线程池子任务执行异常", LogListUtil.newArrayList(subList), e);}});//阻塞,等所有子任务都处理完,才返回结果futures.forEach((uuid, future) -> {try {future.get();} catch (InterruptedException | ExecutionException e) {PlatformLogUtil.logFail("异步推送报价信息获取子任务执行结果异常", LogListUtil.newArrayList(e));}});}
5.请捕获线程池中子任务的代码异常
public class ExceptionTest {public static ExecutorService executor = TestThreadPool.getExecutor();public static void main(String[] args) {executor.execute(() -> test("正常"));executor.execute(() -> test("正常"));executor.execute(() -> test("任务执行异常"));executor.execute(() -> test("正常"));executor.shutdown();}public static void test(String str) {String result = "当前ThreadName为" + Thread.currentThread().getName() + ":结果" + str;if (str.equals("任务执行异常")) {throw new RuntimeException(result + "****执行异常");} else {System.out.println(result);}}}
如果线程池中执行任务的线程异常,发生异常的线程会销毁吗?其他任务还能正常执行吗?
可以发现

在processWorkerExit(w, completedAbruptly)方法内,可以看到如果运行中的线程池有线程执行异常,会调用workers.remove()移除当前线程,并调用addWorker()重新创建新的线程。

所以在任务3销毁线程再重新创建线程,和任务4创建线程这两个动作会有时序问题,具体看下图:

那么控制打印的异常信息是怎么来的呢?

所以,在业务代码中,请捕获子任务中的异常,否则会导致线程池中的工作线程频繁销毁、创建,造成资源浪费,违背了线程复用的设计原则。