博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Executors下面的线程池实现
阅读量:6955 次
发布时间:2019-06-27

本文共 9927 字,大约阅读时间需要 33 分钟。

hot3.png

Sun在Java5中,对线程的类库做了大量的扩展,其中线程池就是Java5的新特征之一,除了线程池之外,还有很多多线程相关的内容,为多线程的编程带来了极大便利。为了编写高效稳定可靠的多线程程序,线程部分的新增内容显得尤为重要。 

  有关Java5线程新特征的内容全部在java.util.concurrent下面,里面包含数目众多的接口和类,熟悉这部分API特征是一项艰难的学习过程。目前有关这方面的资料和书籍都少之又少,大所属介绍线程方面书籍还停留在java5之前的知识层面上。 
  当然新特征对做多线程程序没有必须的关系,在java5之前通用可以写出很优秀的多线程程序。只是代价不一样而已。 
  线程池的基本思想还是一种对象池的思想,开辟一块内存空间,里面存放了众多(未死亡)的线程,池中线程执行调度由池管理器来处理。当有线程任务时,从池中取一个,执行完成后线程对象归池,这样可以避免反复创建线程对象所带来的性能开销,节省了系统的资源。 
  在Java5之前,要实现一个线程池是相当有难度的,现在Java5为我们做好了一切,我们只需要按照提供的API来使用,即可享受线程池带来的极大便利。 
  Java5的线程池分好多种:具体的可以分为两类,固定尺寸的线程池、可变尺寸连接池。 
  在使用线程池之前,必须知道如何去创建一个线程池,在Java5中,需要了解的是java.util.concurrent.Executors类的API,这个类提供大量创建连接池的静态方法,是必须掌握的。 

一、固定大小的线程池,newFixedThreadPool:

    下面是 Executors 类 newFixedThreadPool 方法的源码:

public static ExecutorService newFixedThreadPool(int nThreads) {        return new ThreadPoolExecutor(nThreads, nThreads,                                      0L, TimeUnit.MILLISECONDS,                                      new LinkedBlockingQueue
()); }

可以看到 corePoolSize 和 maximumPoolSize 设置成了相同的值,此时不存在线程数量大于核心线程数量的情况,所以KeepAlive时间设置不会生效。任务队列使用的是不限制大小的 LinkedBlockingQueue ,由于是无界队列所以容纳的任务数量没有上限。

因此,FixedThreadPool的行为如下:

  1. 从线程池中获取可用线程执行任务,如果没有可用线程则使用ThreadFactory创建新的线程,直到线程数达到nThreads

  2. 线程池线程数达到nThreads以后,新的任务将被放入队列

FixedThreadPool的优点是能够保证所有的任务都被执行,永远不会拒绝新的任务;同时缺点是队列数量没有限制,在任务执行时间无限延长的这种极端情况下会造成内存问题。

下面举个列子:

package 线程池;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;/*** @ClassName: Test * @Description:newFixedThreadPool  固定线程数的线程池Demo * @author lifei.pan* @date 2017年4月13日 下午8:06:11  */public class NewFixedThreadPoolTest {	public static void main(String[] args) {		// 创建一个可重用固定线程数的线程池		ExecutorService pool = Executors.newFixedThreadPool(5);		// 创建线程		Thread t1 = new MyThread();		Thread t2 = new MyThread();		Thread t3 = new MyThread();		Thread t4 = new MyThread();		Thread t5 = new MyThread();		// 将线程放入池中进行执行		pool.execute(t1);		pool.execute(t2);		pool.execute(t3);		pool.execute(t4);		pool.execute(t5);		// 关闭线程池		pool.shutdown();	}}class MyThread extends Thread {	@Override	public void run() {		System.out.println(Thread.currentThread().getName() + "正在执行。。。");	}}

 

输出结果:

pool-1-thread-1正在执行。。。  pool-1-thread-3正在执行。。。  pool-1-thread-4正在执行。。。  pool-1-thread-2正在执行。。。  pool-1-thread-5正在执行。。。  

改变ExecutorService pool = Executors.newFixedThreadPool(5)中的参数:ExecutorService pool = Executors.newFixedThreadPool(2),

输出结果是:

pool-1-thread-1正在执行。。。  pool-1-thread-1正在执行。。。  pool-1-thread-2正在执行。。。  pool-1-thread-1正在执行。。。  pool-1-thread-2正在执行。。。  

从以上结果可以看出,newFixedThreadPool的参数指定了可以运行的线程的最大数目,超过这个数目的线程加进去以后,不会运行。其次,加入线程池的线程属于托管状态,线程的运行不受加入顺序的影响。

 

二、单任务线程池,newSingleThreadExecutor:

public static ExecutorService newSingleThreadExecutor() {        return new FinalizableDelegatedExecutorService            (new ThreadPoolExecutor(1, 1,                                    0L, TimeUnit.MILLISECONDS,                                    new LinkedBlockingQueue
())); }

这个工厂方法中使用无界LinkedBlockingQueue,并的将线程数设置成1,除此以外还使用FinalizableDelegatedExecutorService类进行了包装。这个包装类的主要目的是为了屏蔽ThreadPoolExecutor中动态修改线程数量的功能,仅保留ExecutorService中提供的方法。虽然是单线程处理,一旦线程因为处理异常等原因终止的时候,ThreadPoolExecutor会自动创建一个新的线程继续进行工作。

13202955_WDsc.jpg

SingleThreadExecutor 适用于在逻辑上需要单线程处理任务的场景,同时无界的LinkedBlockingQueue保证新任务都能够放入队列,不会被拒绝;缺点和FixedThreadPool相同,当处理任务无限等待的时候会造成内存问题。

举个列子:

仅仅是把上述代码中的ExecutorService pool = Executors.newFixedThreadPool(2)改为ExecutorService pool = Executors.newSingleThreadExecutor();

package 线程池;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;/*** @ClassName: NewSingleThreadExecutorTest * newSingleThreadExecutor  单线程线程池Demo * @author lifei.pan* @date 2017年4月13日 下午8:06:11  */public class NewSingleThreadExecutorTest {	public static void main(String[] args) {		// 创建单线程线程池		ExecutorService pool = Executors.newSingleThreadExecutor();		// 创建线程		Thread t1 = new MyThread();		Thread t2 = new MyThread();		Thread t3 = new MyThread();		Thread t4 = new MyThread();		Thread t5 = new MyThread();		// 将线程放入池中进行执行		pool.execute(t1);		pool.execute(t2);		pool.execute(t3);		pool.execute(t4);		pool.execute(t5);		// 关闭线程池		pool.shutdown();	}}class MyThread extends Thread {	@Override	public void run() {		System.out.println(Thread.currentThread().getName() + "正在执行。。。");	}}

 

输出结果:

pool-1-thread-1正在执行。。。  pool-1-thread-1正在执行。。。  pool-1-thread-1正在执行。。。  pool-1-thread-1正在执行。。。  pool-1-thread-1正在执行。。。  

可以看出,每次调用execute方法,其实最后都是调用了thread-1的run方法。

 

三、可变尺寸的线程池,newCachedThreadPool:线程池无限大(MAX INT),等待队列长度为1

public static ExecutorService newCachedThreadPool() {        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,                                      60L, TimeUnit.SECONDS,                                      new SynchronousQueue
()); }

SynchronousQueue是一个只有1个元素的队列,入队的任务需要一直等待直到队列中的元素被移出。核心线程数是0,意味着所有任务会先入队列;最大线程数是Integer.MAX_VALUE,可以认为线程数量是没有限制的。KeepAlive时间被设置成60秒,意味着在没有任务的时候线程等待60秒以后退出。CachedThreadPool对任务的处理策略是提交的任务会立即分配一个线程进行执行,线程池中线程数量会随着任务数的变化自动扩张和缩减,在任务执行时间无限延长的极端情况下会创建过多的线程。

举个列子:

与上面的类似,只是改动下pool的创建方式:ExecutorService pool = Executors.newCachedThreadPool();

package 线程池;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;/*** @ClassName: NewCachedThreadPoolTest * @Description:newCachedThreadPool  固定线程数的线程池Demo * @author lifei.pan* @date 2017年4月13日 下午8:06:11  */public class NewCachedThreadPoolTest {	public static void main(String[] args) {		// 创建一个可变尺寸的线程池		ExecutorService pool = Executors.newCachedThreadPool();		// 创建线程		Thread t1 = new MyThread();		Thread t2 = new MyThread();		Thread t3 = new MyThread();		Thread t4 = new MyThread();		Thread t5 = new MyThread();		// 将线程放入池中进行执行		pool.execute(t1);		pool.execute(t2);		pool.execute(t3);		pool.execute(t4);		pool.execute(t5);		// 关闭线程池		pool.shutdown();	}}class MyThread extends Thread {	@Override	public void run() {		System.out.println(Thread.currentThread().getName() + "正在执行。。。");	}}

输出结果:

pool-1-thread-1正在执行。。。  pool-1-thread-2正在执行。。。  pool-1-thread-4正在执行。。。  pool-1-thread-3正在执行。。。  pool-1-thread-5正在执行。。。 

 

这种方式的特点是:可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。

三种ExecutorService特性总结

类型 核心线程数 最大线程数 Keep Alive 时间 任务队列 任务处理策略
FixedThreadPool 固定大小 固定大小(与核心线程数相同) 0 LinkedBlockingQueue 线程池大小固定,没有可用线程的时候任务会放入队列等待,队列长度无限制
SingleThreadExecutor 1 1 0 LinkedBlockingQueue 与 FixedThreadPool 相同,区别在于线程池的大小为1,适用于业务逻辑上只允许1个线程进行处理的场景
CachedThreadPool 0 Integer.MAX_VALUE 1分钟 SynchronousQueue 线程池的数量无限大,新任务会直接分配或者创建一个线程进行执行

 

下面几个不常用,这里我稍微的提下

四、延迟连接池,newScheduledThreadPool:

 

package 线程池;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;/*** @ClassName: NewScheduledThreadPoolTest * @Description:延迟线程池* @author lifei.pan* @email plfnet@163.com* @date 2017年4月13日 下午8:13:11 * */public class NewScheduledThreadPoolTest {	public static void main(String[] args) {		// 创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。		ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);		// 创建实现了Runnable接口对象,Thread对象当然也实现了Runnable接口		Thread t1 = new MyThread();		Thread t2 = new MyThread();		Thread t3 = new MyThread();		// 将线程放入池中进行执行		pool.execute(t1);		// 使用延迟执行风格的方法		pool.schedule(t2, 1000, TimeUnit.MILLISECONDS);		pool.schedule(t3, 10, TimeUnit.MILLISECONDS);		// 关闭线程池		pool.shutdown();	}}class MyThread extends Thread {	@Override	public void run() {		System.out.println(Thread.currentThread().getName() + "正在执行。。。");	}}

读者可以尝试改变Executors.newScheduledThreadPool(2)的参数来得到更多的体验,当然,让

 

@Override  public void run() {      System.out.println(Thread.currentThread().getName() + "正在执行。。。");  }  

变成一个无限循环,你可以得到更多的关于pool.shutdown()的用法。

 

五:单任务延迟连接池

 

package 线程池;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;/*** @ClassName: NewSingleThreadScheduledExecutorTest * @Description:单任务延迟线程池* @author lifei.pan* @email plfnet@163.com* @date 2017年4月13日 下午8:13:11 * */public class NewSingleThreadScheduledExecutorTest {	public static void main(String[] args) {		// 创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。		ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor();   		// 创建实现了Runnable接口对象,Thread对象当然也实现了Runnable接口		Thread t1 = new MyThread();		Thread t2 = new MyThread();		Thread t3 = new MyThread();		// 将线程放入池中进行执行		pool.execute(t1);		// 使用延迟执行风格的方法		pool.schedule(t2, 1000, TimeUnit.MILLISECONDS);		pool.schedule(t3, 10, TimeUnit.MILLISECONDS);		// 关闭线程池		pool.shutdown();	}}class MyThread extends Thread {	@Override	public void run() {		System.out.println(Thread.currentThread().getName() + "正在执行。。。");	}}

 

六:我们也可以自己自定义线程池

package 线程池;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class dfdf {	public static void main(String[] args) {		// //创建等待队列		BlockingQueue bqueue = new ArrayBlockingQueue(20);		// 创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。		ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 3, 2, TimeUnit.MILLISECONDS, bqueue);		// 创建实现了Runnable接口对象,Thread对象当然也实现了Runnable接口		Thread t1 = new MyThread();		Thread t2 = new MyThread();		Thread t3 = new MyThread();		Thread t4 = new MyThread();		Thread t5 = new MyThread();		Thread t6 = new MyThread();		Thread t7 = new MyThread();		Thread t8 = new MyThread();		// 将线程放入池中进行执行		pool.execute(t1);		pool.execute(t2);		pool.execute(t3);		pool.execute(t4);		pool.execute(t5);		pool.execute(t6);		pool.execute(t7);		pool.execute(t8);		// 关闭线程池		pool.shutdown();	}}class MyThread extends Thread {	@Override	public void run() {		System.out.println(Thread.currentThread().getName() + "正在执行。。。");	}}

输出结果:

pool-1-thread-1正在执行。。。pool-1-thread-2正在执行。。。pool-1-thread-1正在执行。。。pool-1-thread-2正在执行。。。pool-1-thread-1正在执行。。。pool-1-thread-2正在执行。。。pool-1-thread-1正在执行。。。pool-1-thread-2正在执行。。。

 

总结:

其实Executors下面的线程池实现,都是在ThreadPoolExecutor基础上的一个封装,所以前两篇的介绍我们已经对ThreadPoolExecutor的核心代码和实现原理有一定的了解了,在这里就不多提了。

 

转载于:https://my.oschina.net/u/2286631/blog/879186

你可能感兴趣的文章
Java核心技术卷一基础知识-第5章-继承-读书笔记
查看>>
解决了64位 window 8英文版 office 2013 word繁简转换的问题( 看图)
查看>>
【机器学习】--贝叶斯网络
查看>>
C# 中使用 Task 实现提前加载
查看>>
php---依赖倒转(反转控制)原则
查看>>
团队编程项目开发环境搭建过程
查看>>
iOS 页面与页面之间传参数的方法 代码块传值
查看>>
POJ 2503
查看>>
基于 HTML5 结合互联网+的电力接线图
查看>>
range与xrange
查看>>
Kali渗透测试——HexInject
查看>>
中国式人机协作
查看>>
进程 子进程 关系
查看>>
孟岩:通证(token)和通证经济的目的在于改善现有经济的效率性
查看>>
杜鹃演绎奢华春装大片
查看>>
mongoDb
查看>>
windows cmd 命令和 linux 命令
查看>>
vs extension
查看>>
代码度量工具——SourceMonitor的学习和使用
查看>>
201521123081《java程序设计》 第11周学习总结
查看>>