多线程执行任务,核心就在于如何划分任务,如何管理执行任务的多个线程,用公式化的形式表示就是多线程执行任务=清晰的任务边界+明确的任务执行策略
如何确定任务,什么样的任务才可以被放到单独的一个或者一组线程执行,这就需要明确任务的边界。我们希望任务之间尽量没有依赖关系,最好是相互独立的,这样一旦资源充分可用时,它们可以同时被执行,发挥并行的最大效果。同时这也就要求对问题要有着清晰的认识,要能从问题中挖掘出子问题;还要控制好任务的粒度,保证并行化任务节省时间大于线程开销。以大多数网站服务器程序为例,每一个客户请求是独立的,同一时刻可能存在大量的请求,那么每一个客户请求就可以被视为一个任务。
确定了任务之后,就要考虑如何利用多个线程来执行这些划分出来的任务。还是以服务器端接收请求为例,最简单的方式是为每一个请求都创建一个线程,这种方式实现起来很简单,每次都手动启动一个Thread,但是在大规模的并发程序中会有很多隐患:
要注意线程的生命周期活动,包括创建、运行、切换和终止这都是需要开销的,这样如果处理多个并发请求时,就会带来很大的开销用在处理线程的创建上,这样就减少了可用于真正处理请求任务的CPU时间,一旦有请求来不及处理的话,用户就会得到很慢的反馈甚至超时失败,另外线程相关的资源也都是存放在内存中的,当有多个并发请求时,就会占用大量的内存资源,当线程结束时也并不会被马上回收,这就导致了一些闲置的线程白白占用了可用于创建新线程的资源,可能就会造成下一次创建线程失败从而响应请求失败。
导致无限制创建线程的这些缺点的原因就在于没有限制线程创建的最大数量,线程池的思想就解决了这个问题:线程池维护一组线程,线程的最大数量是有限制的,当有任务需要创建线程时都要从线程池中去取,如果线程池中有闲置线程直接使用,对这个任务来说就省去了重新创建线程的时间开销;当线程池内没有空闲线程可以直接使用时,此时如果线程数量还未达到最大限制,那么就创建线程处理任务,如果已经达到了最大限制,之后的请求就要等待直到有线程执行完毕又归还到线程池中。这样内存中的线程会控制在一个开发者设定的范围内,一方面保持了任务处理的快速响应,一方面减少了对内存的占用。
注意到线程池的实现是要把任务的提交和执行解耦开的,当有任务需要执行时,只需要提交一个任务给工作线程的管理者,而不用去关注这个任务究竟是哪个线程来执行,任务的执行者只需要从任务队列中取任务而不用关心这些任务是谁提交的。通用的线程池执行框架可以构建为单独的模块,业务方只负责提交任务给任务执行框架而不需关心任务框架的内部实现,通过配置线程池的不同策略就可以控制任务的执行策略,这就大大简化了多线程程序的开发。
这是java多线程执行框架的类图,希望能先让大家对这个框架的设计有一个大致的了解:
Executor
是对任务执行者的抽象,它的工作是也仅是执行任务,所以它只有一个方法:
void execute(Runnable command);
ExecutorService
继承于Executor
,它在执行任务的基础上还负责管理任务执行者的生命周期和对任务执行状态的跟踪,通过shutdown()
方法可以关闭线程池,拒绝新任务的执行,移除已提交但未执行的任务,但它会让正在运行的线程执行完毕,如果想让线程池马上关闭的话可以使用shutdownNow()
,但这并一定能保证正在运行的线程真的能马上关闭,因为它的实现是基于中断机制的,即通过调用interrupt()
方法中断线程,如果线程不响应中断的话线程还是会等待执行完毕后才自动关闭,(关于中断在下一篇博客会有更详细的介绍),可以通过isShutdown()
查询线程池的状态ScheduledExecutorService
继承于ExecutorService
,扩展了对任务的延迟和周期性执行的任务执行策略Callable
类似于Runnable
,都是对可执行任务的抽象,不同于Runnable
的是它可以返回任务执行后的结果,并且在不能返回结果时会抛出异常Future
代表的是对任务执行后状态的抽象,通过它可以控制任务执行的生命周期,比如使用cancel()
方法停止任务的执行,get()
方法查询任务执行的结果,作为一种结果状态,它不能直接被创建,只有在有任务被执行后才能产生RunnableFuture
继承于Runnable
和Future
,可以认为是一个具有完整生命周期状态的任务,一个可以自己掌控自己生老病死的任务AbstractExecutorService
实现了ExecutorService
接口并提供了任务执行方法的默认实现,另外还提供了可执行任务Runnable
、Callable
到RunnableFuture
的转换策略,从而统一并简化了后续任务的管理ThreadPoolExecutor
继承于AbstractExecutorService
,实现了利用线程池执行任务的策略Executors
则是对任务、任务执行状态、任务执行者、任务执行策略封装后方便开发者使用的工具类
线程池的不同策略体现在线程数量、线程过期时间和绑定的任务队列对任务的处理方式上,这些都可以通过ThreadExector
的构造函数设定:
1 | public ThreadPoolExecutor(int corePoolSize, |
corePoolSize: 核心线程数量
maximunPoolSize: 最大线程数量
keepAliveTime: 线程结束后最后可以存活的时间
unit: 时间单元,@see TimeUnit
workQueue:任务队列,管理任务的读取
threadFactory: 创建实际工作的线程的工厂类,可以定制Thread类
handler: 处理任务被拒绝执行的情况,一般是因为已调用了shutdown()
关闭线程池
Executors
中就提供了几个常用的不同类型线程池创建方法:
Executors#newFixedThreadPool
通过设置核心线程数量等于最大线程数量和将多余线程的过期时间置为0保证了生成的线程池中线程数量是固定的,如果当前没有闲置线程,之后提交的任务会阻塞在与线程池绑定的队列中,直到有线程执行完可重新利用时才从队列中取出请求处理;一旦有线程由于一些不可控的情况被系统终止,那么线程池能保证重新创建相同数量的线程来维持可用线程的数量不变。多用于一些很稳定固定的正规并发线程,多用于服务器:1
2
3return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());Executors#newSingleThreadExecutor
通过设置核心线程数量和最大线程数量都为1使得线程池中只存在一个线程可以使用,其他策略同Executors#newFixedThreadPool
生成的线程池
1 | new ThreadPoolExecutor(1, 1, |
Executors#newCachedThreadPool
通过设置核心线程数量为0、最大线程数量无限制和执行完毕的线程过期时间为60s构造了一个可缓存线程的线程池,一开始线程池的数量为0,请求到来时发现没有线程可处理就创建一个线程,下次请求到来时若之前执行完毕的线程还未因过期被回收就可以直接使用线程,线程的最大数量是没有限制的,但实际上最大线程数量可能限制于系统资源。这种线程池适合于频繁执行而且周期短的任务
1 | new ThreadPoolExecutor(0, Integer.MAX_VALUE, |
如果没有线程池的话每次新建任务我们就要手动创建一个线程执行任务:
1 | for(int i=0;i<5;++i){ |
有了线程池后
1 | import java.util.concurrent.ExecutorService; |
- 还有一个很有意思的线程池类型:
Executors#newWorkStealingPool
是使用线程池作为执行任务线程的管理者,来实现fork/join模式的任务执行策略,所谓fork/join(详解见java doc),就是一种利用一台计算机上的多个处理器进行同类型问题并行计算的模式,通过对大规模问题逐步分解,直到可以作为独立的小任务在单独的线程中执行,结合线程间的通信机制实现相当于递归迭代的并行版本,这和现在流行的Map/Reduce模式有些类似,只不过Map/Reduce是在多台计算机上执行的。
这一篇仅是对多线程执行任务做了大概的原理和使用的介绍,如果各个线程执行的任务真的能够彼此独立,那多线程并发会简单很多,因为这样就不需要线程间数据的传递和线程间的协调,但很多时候任务是不太可能被划分成完全独立的任务的,这就需要线程间的通信来实现线程间的协调策略和消息传递,这里面会涉及到很多深层次一些的概念。