并发编程之多线程执行篇

多线程执行任务,核心就在于如何划分任务,如何管理执行任务的多个线程,用公式化的形式表示就是多线程执行任务=清晰的任务边界+明确的任务执行策略

如何确定任务,什么样的任务才可以被放到单独的一个或者一组线程执行,这就需要明确任务的边界。我们希望任务之间尽量没有依赖关系,最好是相互独立的,这样一旦资源充分可用时,它们可以同时被执行,发挥并行的最大效果。同时这也就要求对问题要有着清晰的认识,要能从问题中挖掘出子问题;还要控制好任务的粒度,保证并行化任务节省时间大于线程开销。以大多数网站服务器程序为例,每一个客户请求是独立的,同一时刻可能存在大量的请求,那么每一个客户请求就可以被视为一个任务。

确定了任务之后,就要考虑如何利用多个线程来执行这些划分出来的任务。还是以服务器端接收请求为例,最简单的方式是为每一个请求都创建一个线程,这种方式实现起来很简单,每次都手动启动一个Thread,但是在大规模的并发程序中会有很多隐患:
    要注意线程的生命周期活动,包括创建、运行、切换和终止这都是需要开销的,这样如果处理多个并发请求时,就会带来很大的开销用在处理线程的创建上,这样就减少了可用于真正处理请求任务的CPU时间,一旦有请求来不及处理的话,用户就会得到很慢的反馈甚至超时失败,另外线程相关的资源也都是存放在内存中的,当有多个并发请求时,就会占用大量的内存资源,当线程结束时也并不会被马上回收,这就导致了一些闲置的线程白白占用了可用于创建新线程的资源,可能就会造成下一次创建线程失败从而响应请求失败。

导致无限制创建线程的这些缺点的原因就在于没有限制线程创建的最大数量,线程池的思想就解决了这个问题:线程池维护一组线程,线程的最大数量是有限制的,当有任务需要创建线程时都要从线程池中去取,如果线程池中有闲置线程直接使用,对这个任务来说就省去了重新创建线程的时间开销;当线程池内没有空闲线程可以直接使用时,此时如果线程数量还未达到最大限制,那么就创建线程处理任务,如果已经达到了最大限制,之后的请求就要等待直到有线程执行完毕又归还到线程池中。这样内存中的线程会控制在一个开发者设定的范围内,一方面保持了任务处理的快速响应,一方面减少了对内存的占用。

注意到线程池的实现是要把任务的提交和执行解耦开的,当有任务需要执行时,只需要提交一个任务给工作线程的管理者,而不用去关注这个任务究竟是哪个线程来执行,任务的执行者只需要从任务队列中取任务而不用关心这些任务是谁提交的。通用的线程池执行框架可以构建为单独的模块,业务方只负责提交任务给任务执行框架而不需关心任务框架的内部实现,通过配置线程池的不同策略就可以控制任务的执行策略,这就大大简化了多线程程序的开发。

这是java多线程执行框架的类图,希望能先让大家对这个框架的设计有一个大致的了解:

  • Executor是对任务执行者的抽象,它的工作是也仅是执行任务,所以它只有一个方法:

void execute(Runnable command);

  • ExecutorService继承于Executor,它在执行任务的基础上还负责管理任务执行者的生命周期和对任务执行状态的跟踪,通过shutdown()方法可以关闭线程池,拒绝新任务的执行,移除已提交但未执行的任务,但它会让正在运行的线程执行完毕,如果想让线程池马上关闭的话可以使用shutdownNow(),但这并一定能保证正在运行的线程真的能马上关闭,因为它的实现是基于中断机制的,即通过调用interrupt()方法中断线程,如果线程不响应中断的话线程还是会等待执行完毕后才自动关闭,(关于中断在下一篇博客会有更详细的介绍),可以通过isShutdown()查询线程池的状态

  • ScheduledExecutorService继承于ExecutorService,扩展了对任务的延迟和周期性执行的任务执行策略

  • Callable类似于Runnable,都是对可执行任务的抽象,不同于Runnable的是它可以返回任务执行后的结果,并且在不能返回结果时会抛出异常
  • Future代表的是对任务执行后状态的抽象,通过它可以控制任务执行的生命周期,比如使用cancel()方法停止任务的执行,get()方法查询任务执行的结果,作为一种结果状态,它不能直接被创建,只有在有任务被执行后才能产生
  • RunnableFuture继承于RunnableFuture,可以认为是一个具有完整生命周期状态的任务,一个可以自己掌控自己生老病死的任务
  • AbstractExecutorService实现了ExecutorService接口并提供了任务执行方法的默认实现,另外还提供了可执行任务RunnableCallableRunnableFuture的转换策略,从而统一并简化了后续任务的管理
  • ThreadPoolExecutor继承于AbstractExecutorService,实现了利用线程池执行任务的策略
    Executors则是对任务、任务执行状态、任务执行者、任务执行策略封装后方便开发者使用的工具类

线程池的不同策略体现在线程数量、线程过期时间和绑定的任务队列对任务的处理方式上,这些都可以通过ThreadExector的构造函数设定:

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
{

corePoolSize: 核心线程数量
maximunPoolSize: 最大线程数量 keepAliveTime: 线程结束后最后可以存活的时间
unit
: 时间单元,@see TimeUnit
workQueue:任务队列,管理任务的读取
threadFactory: 创建实际工作的线程的工厂类,可以定制Thread类
handler: 处理任务被拒绝执行的情况,一般是因为已调用了shutdown()关闭线程池

Executors中就提供了几个常用的不同类型线程池创建方法:

  • Executors#newFixedThreadPool通过设置核心线程数量等于最大线程数量和将多余线程的过期时间置为0保证了生成的线程池中线程数量是固定的,如果当前没有闲置线程,之后提交的任务会阻塞在与线程池绑定的队列中,直到有线程执行完可重新利用时才从队列中取出请求处理;一旦有线程由于一些不可控的情况被系统终止,那么线程池能保证重新创建相同数量的线程来维持可用线程的数量不变。多用于一些很稳定固定的正规并发线程,多用于服务器:

    1
    2
    3
    return new ThreadPoolExecutor(nThreads, nThreads,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>());
  • Executors#newSingleThreadExecutor通过设置核心线程数量和最大线程数量都为1使得线程池中只存在一个线程可以使用,其他策略同Executors#newFixedThreadPool生成的线程池

1
2
3
new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
ew LinkedBlockingQueue<Runnable>())
  • Executors#newCachedThreadPool通过设置核心线程数量为0、最大线程数量无限制和执行完毕的线程过期时间为60s构造了一个可缓存线程的线程池,一开始线程池的数量为0,请求到来时发现没有线程可处理就创建一个线程,下次请求到来时若之前执行完毕的线程还未因过期被回收就可以直接使用线程,线程的最大数量是没有限制的,但实际上最大线程数量可能限制于系统资源。这种线程池适合于频繁执行而且周期短的任务
1
2
3
4
new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);

如果没有线程池的话每次新建任务我们就要手动创建一个线程执行任务:

1
2
3
4
5
for(int i=0;i<5;++i){
new Thread(new Runnable(){
//execute Task
}).start();
}

有了线程池后

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors;

public class TestThreadPool{
public static void main(String[] args){
ExecutorService executorService = Executors.newCachedThreadPool();
// ExecutorService executorService = Executors.newFixedThreadPool(5);
// ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++){
executorService.execute(new TestRunnable());
System.out.println("************* a" + i + " *************");
}
executorService.shutdown();
}
}

class TestRunnable implements Runnable{
public void run(){
System.out.println(Thread.currentThread().getName() + "线程");
}
}
  • 还有一个很有意思的线程池类型:
    Executors#newWorkStealingPool是使用线程池作为执行任务线程的管理者,来实现fork/join模式的任务执行策略,所谓fork/join(详解见java doc),就是一种利用一台计算机上的多个处理器进行同类型问题并行计算的模式,通过对大规模问题逐步分解,直到可以作为独立的小任务在单独的线程中执行,结合线程间的通信机制实现相当于递归迭代的并行版本,这和现在流行的Map/Reduce模式有些类似,只不过Map/Reduce是在多台计算机上执行的。

这一篇仅是对多线程执行任务做了大概的原理和使用的介绍,如果各个线程执行的任务真的能够彼此独立,那多线程并发会简单很多,因为这样就不需要线程间数据的传递和线程间的协调,但很多时候任务是不太可能被划分成完全独立的任务的,这就需要线程间的通信来实现线程间的协调策略和消息传递,这里面会涉及到很多深层次一些的概念。