首页 » 编写高质量代码:改善Java程序的151个建议 » 编写高质量代码:改善Java程序的151个建议全文在线阅读

《编写高质量代码:改善Java程序的151个建议》建议126:适时选择不同的线程池来实现

关灯直达底部

Java的线程池实现从最根本上来说只有两个:ThreadPoolExecutor类和Scheduled-ThreadPoolExecutor类,这两个类还是父子关系,但是Java为了简化并行计算,还提供了一个Executors的静态类,它可以直接生成多种不同的线程池执行器,比如单线程执行器、带缓冲功能的执行器等,但归根结底还是使ThreadPoolExecutor类或ScheduledThreadPoolExecutor类的封装类。

为了理解这些执行器,我们首先来ThreadPoolExecutor类,其中它复杂的构造函数可以很好解释该线程池的作用,代码如下:


public class ThreadPoolExecutor extends AbstractExecutorService{

//最完整的构造函数

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long

keep Alive Time, Time Unitunit, Block in gQueue<Runnable>

workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler){

//检验输入条件

if(corePoolSize<0||maximumPoolSize<=0||

maximumPoolSize<corePoolSize||keepAliveTime<0)

throw new IllegalArgumentException();

//检验运行环境

if(workQueue==null||threadFactory==null||handler==null)

throw new NullPointerException();

this.corePoolSize=corePoolSize;

this.maximumPoolSize=maximumPoolSize;this.workQueue=workQueue;

this.keepAliveTime=unit.toNanos(keepAliveTime);this.threadFactory=threadFactory;

this.handler=handler;

}

}


这是ThreadPoolExecutor最完整的构造函数,其他的构造函数都是引用该构造函数实现的,我们逐步来解释这些参数的含义。

corePoolSize:最小线程数。

线程池启动后,在池中保持线程的最小数量。需要说明的是线程数量是逐步到达corePoolSize值的,例如corePoolSize被设置为10,而任务数量只有5,则线程池中最多会启动5个线程,而不是一次性地启动10个线程。

maximumPoolSize:最大线程数量。

这是池中能够容纳的最大线程数量,如果超出,则使用RejectedExecutionHandler拒绝策略处理。

keepAliveTime:线程最大生命期。

这里的生命期有两个约束条件:一是该参数针对的是超过corePoolSize数量的线程;二是处于非运行状态的线程。这么说吧,如果corePoolSize为10,maximumPoolSize为20,此时线程池中有15个线程在运行,一段时间后,其中有3个线程处于等待状态的时间超过了keepAliveTime指定的时间,则结束这3个线程,此时线程池中则还有12个线程正在运行。

unit:时间单位。

这是keepAliveTime的时间单位,可以是纳秒、毫秒、秒、分钟等选项。

workQueue:任务队列。

当线程池中的线程都处于运行状态,而此时任务数量继续增加,则需要有一个容器来容纳这些任务,这就是任务队列。

threadFactory:线程工厂。

定义如何启动一个线程,可以设置线程名称,并且可以确认是否是后台线程等。

handler:拒绝任务处理器。

由于超出线程数量和队列容量而对继续增加的任务进行处理的程序。

线程池的管理是这样一个过程:首先创建线程池,然后根据任务的数量逐步将线程增大到corePoolSize数量,如果此时仍有任务增加,则放置到workQueue中,直到workQueue爆满为止,然后继续增加池中的线程数量(增强处理能力),最终达到maximumPoolSize,那如果此时还有任务要增加进来呢?这就需要handler来处理了,或者丢弃新任务,或者拒绝新任务,或者挤占已有任务等。

在任务队列和线程池都饱和的情况下,一旦有线程处于等待(任务处理完毕,没有新任务增加)状态的时间超过keepAliveTime,则该线程终止,也就是说池中的线程数量会逐渐降低,直至为corePoolSize数量为止。

我们可以把线程池想象成这样一个场景:在一条生产线上,车间规定是可以有corePoolSize数量的工人,但是生产线刚建立时,工作不多,不需要那么多的人。随着工作数量的增加,工人数量也逐渐增加,直至增加到corePoolSize数量为止,此时任务还在增加,那怎么办呢?

好办,任务排队,corePoolSize数量的工人不停歇地处理任务,新增加的任务按照一定的规则存放在仓库中(也就是我们的workQueue中),一旦任务增加的速度超过了工人处理的能力,也就是说仓库爆满时,车间就会继续招聘工人(也就是扩大线程数),直至工人数量达到maximumPoolSize为止,那如果所有的maximumPoolSize工人都在处理任务,而且仓库也是饱和状态,新增任务的该怎么处理呢?这就会扔给一个叫handler的专门机构去处理了,它要么丢弃这些新增的任务,要么无视,要么替换掉别的任务。

过了一段时间后,任务的数量逐渐减少了,导致有一部分工人处以待工状态,为了减少开支(Java是为了减少系统资源消耗),于是开始辞退工人,直至保持为corePoolSize数量的工人为止,此时即使没有工作,也不再辞退工人(池中线程数量不再减少),这也是为了保证以后再有任务时能够快速的处理。

明白了线程池的概念,我们再来看Executors提供的几个创建线程池的便捷方法:

newSingleThreadExecutor:单线程池。

顾名思义就是一个池中只有一个线程在运行,该线程永不超时。而且由于是一个线程,当有多个任务需要处理时,会将它们放置到一个无界阻塞队列中逐个处理,它的实现代码如下:


public static ExecutorService newSingleThreadExecutor(){

return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1,1,0L,

TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));

}


它的使用方法也非常简单,下面是简单的示例:


public static void main(Stringargs)throws Exception{

//创建单线程执行器

ExecutorService es=Executors.newSingleThreadExecutor();

//执行一个任务

Future<String>future=es.submit(new Callable<String>(){

public String call()throws Exception{

return/"/";

}

});

//获得任务执行后的返回值

System.out.println(/"返回值:/"+future.get());

//关闭执行器

es.shutdown();

}


newCachedThreadPool:缓冲功能的线程池。

建立了一个线程池,而且线程数量是没有限制的(当然,不能超过Integer的最大值),新增一个任务即有一个线程处理,或者复用之前空闲的线程,或者新启动一个线程,但是一旦一个线程在60秒内一直是出于等待状态时(也就是1分钟没工作可做),则会被终止,其源代码如下。


public static ExecutorService newCachedThreadPool(){

return new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L, TimeUnit.SECONDS, new

SynchronousQueue<Runnable>());

}


这里需要说明的是,任务队列使用了同步阻塞队列,这意味着向队列中加入一个元素,即可唤醒一个线程(新创建的线程或复用池中空闲线程)来处理,这种队列已经没有队列深度的概念了。

newFixedThreadPool:固定线程数量的线程池。

在初始化时已经决定了线程的最大数量,若任务添加的能力超出了线程处理能力,则建立阻塞队列容纳多余的任务,源代码如下:


public static ExecutorService newFixedThreadPool(int nThreads){

return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS, new

LinkedBlockingQueue<Runnable>());

}


上面返回的是一个ThreadPoolExecutor,它的corePoolSize和maximumPoolSize是相等的,也就是说,最大线程数量也是nThreads。如果任务增长速度非常快,超过了LinkedBlockingQueue的最大容量(Integer的最大值),那此时会如何处理呢?会按照ThreadPoolExecutor默认的拒绝策略(默认是DiscardPolicy,直接丢弃)来处理。

以上三种线程池执行器都是ThreadPoolExecutor的简化版,目的是帮助开发人员屏蔽过多的线程细节,简化多线程开发。当需要运行异步任务时,可以直接通过Executors获得一个线程池,然后运行任务,不需要关注ThreadPoolExecutor的一系列参数是什么含义。当然,有时候这三个线程池不能满足要求,此时则可以直接操作ThreadPoolExecutor来实现复杂的多线程运算。可以这样来比喻:newSingleThreadExecutor、newCachedThreadPool、newFixedThreadPool是线程池的简化版,而ThreadPoolExecutor则是旗舰版——简化版更容易操作,需要了解的知识相对少些,方便实用,而且旗舰版功能齐全,适用面广,但难于驾驭。