1200字范文,内容丰富有趣,写作的好帮手!
1200字范文 > 多线程—线程池Executor框架及四种常用线程池

多线程—线程池Executor框架及四种常用线程池

时间:2019-02-24 02:05:20

相关推荐

多线程—线程池Executor框架及四种常用线程池

池化技术应用:线程池、数据库连接池、http连接池等等。

池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。

使用线程池的好处:

降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗。提高响应速度:当任务到达时,可以不需要等待线程创建就能立即执行。管理线程:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,监控和调优。

Executor框架

绿色实线箭头是继承,虚线是接口实现

Executor接口是Executor框架的一个最基本的接口,Executor框架的大部分类都直接或间接地实现了此接口。

Executor接口只有一个execute(Runnable command)方法。

public void execute(Runnable r) {new Thread(r).start();}

ExecutorService接口继承了Executor接口,该接口用于管理线程。

public interface ExecutorService extends Executor {// 请求关闭、发生超时或者当前线程中断,无论哪一个首先发生之后,都将导致阻塞,直到所有任务完成执行。boolean awaitTermination(long timeout, TimeUnit unit);// 执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks);// 执行给定的任务,当所有任务完成或超时期满时,返回保持任务状态和结果的 Future 列表。<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit);// 任务列表里只要有一个任务完成了,就立即返回。而且一旦正常或异常返回后,则取消尚未完成的任务。<T> T invokeAny(Collection<? extends Callable<T>> tasks);// 执行给定的任务,如果在给定的超时期满前某个任务已成功完成(也就是未抛出异常),则返回其结果。<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit);// 如果此执行程序已关闭,则返回 true。boolean isShutdown();// 如果关闭后所有任务都已完成,则返回 true。先调用 shutdown 或 shutdownNow,否则 isTerminated 永不为 true。boolean isTerminated();// 启动一次顺序关闭,执行以前提交的任务,但不接受新任务。void shutdown();// 通过调用interrupt试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。List<Runnable> shutdownNow();// 提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future,该Future的get方法在成功完成时将会返回给定的结果。<T> Future<T> submit(Callable<T> task);Future<?> submit(Runnable task);<T> Future<T> submit(Runnable task, T result);}

execute和submit方法

execute,执行一个任务,没有返回值。submit,提交一个线程任务,有返回值。submit(Callable<T> task)能获取到它的返回值,通过future.get()获取(阻塞直到任务执行完)。submit(Runnable task, T result)能通过传入的载体result间接获得线程的返回值。

接下来介绍线程池主要实现类ThreadPoolExecutor

ThreadPoolExecutor构造函数

//五个参数的构造函数public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue)//第六个参数ThreadFactory threadFactory//第七个参数RejectedExecutionHandler handler

ThreadPoolExecutor类的构造函数有4个,前面5个参数是必须的,第6和7个参数同这5个参数又构成了3个构造函数。

int corePoolSize

线程池中核心线程数最大值,线程池新建线程的时候,如果当前线程总数小于corePoolSize,则新建的是核心线程,如果超过corePoolSize,进入任务队列。核心线程默认情况下会一直存活在线程池中,即使这个核心线程是闲置状态。如果指定ThreadPoolExecutor的allowCoreThreadTimeOut这个属性为true,那么闲置状态的核心线程,超过一定时间(keepAliveTime),就会被销毁掉。

int maximumPoolSize

该线程池中线程总数最大值。如果等待队列满了,创建非核心线程。线程总数 = 核心线程数 + 非核心线程数。

long keepAliveTime

该线程池中非核心线程闲置超时时长。一个非核心线程,如果不干活(闲置状态)的时长超过这个参数所设定的时长,就会被销毁掉。如果设置allowCoreThreadTimeOut = true,则会作用于核心线程。

TimeUnit unit

keepAliveTime的时间单位。

BlockingQueue<Runnable> workQueue

该线程池中的任务队列,维护着等待执行的Runnable对象。当所有的核心线程都在干活时,新添加的任务会被添加到这个队列中等待处理,如果队列满了,则新建非核心线程执行任务。

队列的三种通用策略详解:

直接提交SynchronousQueue

将任务直接提交给线程而不保存它们。如果所有线程都在工作,就新建一个线程来处理这个任务,所以通常要求maximumPoolSizes设置为Integer.MAX_VALUE,即无限大,以避免拒绝新提交的任务。

无界队列LinkedBlockingQueue

将导致在所有核心线程都在忙时新任务在队列中等待,创建的线程就不会超过 corePoolSize,maximumPoolSize 的值也就没意义了。

有界队列ArrayBlockingQueue

可以限定队列的长度,接收到任务的时候,如果没有达到corePoolSize的值,则新建线程(核心线程)执行任务,如果达到了,则入队等候,如果队列已满,则新建线程(非核心线程)执行任务,又如果总线程数到了maximumPoolSize,并且队列也满了,则发生错误。

延时队列DelayQueue

传进去的任务必须先实现Delayed接口。这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务。

ThreadFactory threadFactory

用于创建新线程,是一个接口,new他的时候需要实现他的Thread newThread(Runnable r)方法,一般不用。

RejectedExecutionHandler handler

用于抛出异常,一般不用。

ThreadPoolExecutor.AbortPolicy() 抛出java.util.concurrent.RejectedExecutionException异常,默认。ThreadPoolExecutor.CallerRunsPolicy() 重试添加当前的任务,他会自动重复调用execute()方法。ThreadPoolExecutor.DiscardOldestPolicy() 抛弃旧的任务ThreadPoolExecutor.DiscardPolicy() 抛弃当前的任务。

如何配置线程池

CPU密集型任务

CPU核心数+1,CPU 密集型任务使得CPU使用率很高,内存、硬盘、网络占用的时间少于cpu本身计算的时间,这时应配置尽可能小的线程避免线程之间频繁的切换消耗资源。

IO密集型任务

2*CPU核心数,O密集型任务CPU使用率并不高,当线程发出请求后,由于不占用cpu资源,可以阻塞等待,因此可以让CPU在等待IO的时候有其他线程去处理别的任务,充分利用CPU时间。

混合型任务

可以将任务分成IO密集型和CPU密集型任务,然后分别用不同的线程池去处理。

线程池执行策略

线程数量未达到corePoolSize,则新建一个核心线程执行任务线程数量达到了corePools,则将任务移入队列等待队列已满,新建线程(非核心线程)执行任务队列已满,总线程数又达到了maximumPoolSize,就会由RejectedExecutionHandler抛出异常

常用的四种线程池

CachedThreadPool 缓存线程池

public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); cachedThreadPool.execute(new Runnable(){public void run() {};})

特点:

核心线程数为0,线程数无限制有空闲线程则复用空闲线程,若无空闲线程则新建线程空闲线程只会等60s直接提交队列

FixedThreadPool定长线程池

public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}ExecutorService fixedThreadPool = Executors.newFixedThreadPool(int theads);fixedThreadPool.execute(new Runnable(){public void run() {};});

特点:

创建时给定核心线程数,所有线程都是核心线程。线程空闲就回收,核心线程默认不回收。阻塞队列无界

SingleThreadExecutor单线程化的线程池

public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();singleThreadExecutor.execute(new Runnable(){public void run() {};});

static class FinalizableDelegatedExecutorServiceextends DelegatedExecutorService {FinalizableDelegatedExecutorService(ExecutorService executor) {super(executor);}protected void finalize() {super.shutdown();}}

加了个finalize方法保证线程池的关闭,DelegatedExecutorService是继承AbstractExecutorService的一个类。

和newFixedThreadPool(1)的区别

封装成FinalizableDelegatedExecutorService类,这个类就是对ExecutorService进行了一个包装,防止暴露出不该被暴露的方法,然后加上了finalize方法保证线程池的关闭。

特点:

线程池只有一个核心线程线程空闲就回收,核心线程默认不回收。阻塞队列无界

ScheduledThreadPool定长周期线程池:

ScheduledThreadPool是一个能实现定时、周期性任务的线程池。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);}ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(int theads);scheduledThreadPool.schedule(new Runnable() {public void run() {System.out.println("延迟1秒执行");}}, 1, TimeUnit.SECONDS);scheduledThreadPool.scheduleAtFixedRate(new Runnable() {public void run() {System.out.println("延迟1秒后每3秒执行一次");}}, 1, 3, TimeUnit.SECONDS);

public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());}

ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,实现了ScheduledExecutorService接口,该接口定义了schedule等任务调度的方法。ScheduledThreadPoolExecutor有两个重要的内部类:DelayedWorkQueue和ScheduledFutureTask。DelayeddWorkQueue是一个阻塞队列,而ScheduledFutureTask继承自FutureTask,并且实现了Delayed接口。

特点:

给定核心线程数,定时、周期性处理任务线程空闲就回收阻塞队列无界

线程池为什么能维持线程不释放,随时运行各种任务?

总结就是:如果队列中没有任务时,核心线程会一直阻塞在获取任务的方法,直到返回任务。

//重点:poll会一直阻塞直到超过keepAliveTime或者获取到任务//take 会一直阻塞直到获取到任务//在没有任务的时候 如果没有特别设置allowCoreThreadTimeOut,我们的核心线程会一直阻塞在这里Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();

所以也解释了workQueues为什么要是BlockingQueue

ArrayBlockingQueue的take方法

public E take() throws InterruptedException {final ReentrantLock lock = this.lock;//加锁lock.lockInterruptibly();try {while (count == 0)notEmpty.await();//队列为空时,将使这个线程进入阻塞状态,直到被其他线程唤醒时取出元素return dequeue();//消费对头中的元素} finally {lock.unlock();}}

可以看到当队列内没有任务时,调用await方法挂起线程。await方法是ConditionObject的方法,内部调用了LockSupport类的park方法将线程挂起。可以看这里。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。