java并发编程框架:Executor
阅读数:167 评论数:0
跳转到新版页面分类
python/Java
正文
一、概述
Executor框架用于简化并发编程,通过Executor来启动线程比使用Thread的start方法更好。
(1)每次new Thread新建对象性能差。
(2)使用Thread缺乏统一管理,可能无限制新建线程。
(1)ThreadPoolExecutor中关闭线程池的方法
如果线程池作为局部变量使用,用完是要关闭的,否者大量的线程资源占用,会导致内存泄露。
shutdown |
它可以安全地关闭一个线程池,调用 shutdown() 方法之后线程池并不是立刻就被关闭,因为这时线程池中可能还有很多任务正在被执行,或是任务队列中有大量正在等待被执行的任务,调用 shutdown() 方法后线程池会在执行完正在执行的任务和队列中等待的任务后才彻底关闭。但这并不代表 shutdown() 操作是没有任何效果的,调用 shutdown() 方法后如果还有新的任务被提交,线程池则会根据拒绝策略直接拒绝后续新提交的任务。
|
isShutdown | 是否执行了 shutdown 或者 shutdownNow 方法 |
isTerminated | 这个方法可以检测线程池是否真正“终结”了,这不仅代表线程池已关闭,同时代表线程池中的所有任务都已经都执行完毕了, |
awaitTermination |
主要用来判断线程池状态的 等待期间(包括进入等待状态之前)线程池已关闭并且所有已提交的任务(包括正在执行的和队列中等待的)都执行完毕,相当于线程池已经“终结”了,方法便会返回 true; |
shutdownNow |
首先会给所有线程池中的线程发送 interrupt 中断信号,尝试中断这些任务的执行,然后会将任务队列中正在等待的所有任务转移到一个 List 中并返回,我们可以根据返回的任务 List 来进行一些补救的操作
|
二、Executor和ExecutorService
Executor | ExecutorService |
定义了execute()方法用来接收一个Runnable接口的对象 | submit()方法可以接收Runnable和Callable接口的对象 |
execute()不返回任何结果 | submit()方法通过Future对象返回运算结果 |
供了管理线程池的方法,比如通过shutdown()终止线程池 |
它是Java线程池的超级接口,提供一个execute(Runnable command)方法,我们一般用它的继承接口ExecutorService。
// 这里RunnableTask实现的Runnable接口
executor.execute(new RunnableTask()); // 异步执行
//等价于
new Thread(new RunnableTask())).start()
它是一个比Executor使用更广泛的子类接口。
(1)提供了生命周期管理方法
(2)可以调用shutdown()方法来平滑关闭,这时它会停止接收任何新的任务,且等待已提交的任务执行完。
(3)通过返回的Future对象,可以调用isDone()来查询Future是否已经完成,当任务完成时,它具有一个结果,你也可以调用get()方法来获取结果,get()将会被阻塞,直至结果准备就绪。还可以通过cancel()来取消还未执行的任务。
/**
* 停止接收新任务,已提交的任务会继续执行完成。
**/
void shutdown();
/**
* 停止接收新任务,忽略队列中等待的任务,尝试将正在执行的任务中断掉,
* 返回未执行的任务列表
* 它试图终止线程的方法是通过调用 Thread.interrupt() 方法来实现的,这种方法的作用有限,
* 如果线程中没有sleep 、wait、Condition、定时锁等应用, interrupt() 方法是无法中断当前的线程的。
* 所以,shutdownNow() 并不代表线程池就一定立即就能退出,
* 它也可能必须要等待所有正在执行的任务都执行完成了才能退出。但是大多数时候是能立即退出的。
**/
List<Runnable> shutdownNow();
/**
* 等所有已提交任务执行完,或等超时时间到了,或者线程被中断了,抛出InterruptedException
**/
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
//当调用shutdown()或shutdownNow()方法后返回为true
boolean isShutdown();
//当调用shutdown()方法后,并且所有提交的任务完成后返回为true
//当调用shutdownNow()方法后,成功停止后返回为true;
boolean isTerminated();
//提交一个Callable任务
<T> Future<T> submit(Callable<T> task);
//提交一个Runnable任务,因为Runnable没有返回指,所以第二个参数是用来返回值
<T> Future<T> submit(Runnable task, T result);
//提交一个Runnable任务
Future<?> submit(Runnable task);
//执行所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
//执行所有任务,有过期时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
//有一个任务结束就可以返回
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
//有一个任务结束就可以返回,有过期时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
三、Executors
这个类提供大量创建连接池的静态方法。
线程池方法 | 初始化线程池数 | 最大线程池数 | 线程池中线程存活时间 | 时间单位 | 工作队列 |
---|---|---|---|---|---|
newCachedThreadPool | 0 | Integer.MAX_VALUE | 60 | 秒 | SynchronousQueue |
newFixedThreadPool | 入参指定大小 | 入参指定大小 | 0 | 毫秒 | LinkedBlockingQueue |
newScheduledThreadPool | 入参指定大小 | Integer.MAX_VALUE | 0 | 微秒 | DelayedWorkQueue |
newSingleThreadExecutor | 1 | 1 | 0 | 毫秒 | LinkedBlockingQueue |
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads,
nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public class Test {
public static void main(String[] args) {
// 创建一个可重用固定线程数的线程池
ExecutorService pool = Executors.newFixedThreadPool(5);
// 创建线程
Thread t1 = new MyThread();
Thread t2 = new MyThread();
Thread t3 = new MyThread();
Thread t4 = new MyThread();
Thread t5 = new MyThread();
// 将线程放入池中进行执行
pool.execute(t1);
pool.execute(t2);
pool.execute(t3);
pool.execute(t4);
pool.execute(t5);
// 关闭线程池
pool.shutdown();
}
}
class MyThread extends Thread {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "正在执行。。。");
}
}
2、单任务线程池 newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}
可以看到线程池的大小上限是Integer.MAX_VALUE。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0,
Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
如果池中已有线程是空闲的,会重用已有线程。
4、延时线程池newScheduledThreadPool,多数情况下可用来替代Timer
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public class Test {
public static void main(String[] args) {
// 创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
// 创建实现了Runnable接口对象,Thread对象当然也实现了Runnable接口
Thread t1 = new MyThread();
Thread t2 = new MyThread();
Thread t3 = new MyThread();
// 将线程放入池中进行执行
pool.execute(t1);
// 使用延迟执行风格的方法
pool.schedule(t2, 1000, TimeUnit.MILLISECONDS);
pool.schedule(t3, 10, TimeUnit.MILLISECONDS);
// 关闭线程池
pool.shutdown();
}
}
class MyThread extends Thread {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "正在执行。。。");
}
}
之前的线程池都是基于ThreadPoolExecutor去实现的,多个线程共有一个阻塞队列。newWorkStealingPool则是基于ForkJoinPool的方式构建出来的,线程池中每一个线程都有一个自己的队列。
当线程发现自己的队列没有任务了,就会到别的线程的队列里获取任务执行。可以简单理解为”窃取“。
一般是自己的本地队列采取LIFO(后进先出),窃取时采用FIFO(先进先出),一个从头开始执行,一个从尾部开始执行,由于偷取的动作十分快速,会大量降低这种冲突,也是一种优化方式。
核心就是希望没有工作线程处于空闲状态。
public class Thread08_WorkStealing {
public static void main(String[] args) {
ExecutorService executorService = Executors.newWorkStealingPool(3);
for (int i=1; i<= 100; i++){
executorService.submit(new MyWorker(i));
}
while (true){}
}
}
四、ThreadPoolExecutor(实现类,可以直接使用)
多用于自定义线程池,基于ThreadPoolExecutor可以很容易将一个Runnable接口的任务放入线程池中。
Executors线程工具类中的很多方法最终都是通过ThreadPoolExecutor类来完成的。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
1、参数解释
(1)corePoolSize:核心线程数,会一直存活,即使没有任务,线程池会维护线程的最少数量。
(2)maximumPoolSize:线程池维护线程的最大数量。
(3)keepAliveTime:线程池维护线程所允许的空闲时间,当线程空闲时间达到keepAliveTime,该线程会退出,直到线程数量等于corePoolSize。如果allowCoreThreadTimeout设置为true,则所有线程均会退出直到线程数量为0.
(4)unit:线程池维护线程所允许空闲时间的单位
(5)workQueue:线程份所使用的缓冲队列
直接提交 | 工作队列的默认选项是SynChronousQueue,它将任务直接提交给线程,所以通常要求无界maximumPoolSizes以避免拒绝新提交的任务。 |
无界队列 | 如LinkedBlockingQueue。 |
有界队列 | 如ArrayBlockingQueue |
(6)handler:对拒绝任务的处理策略。
AbortPolicy | 直接拒绝策略,抛出RejectedExecutionException异常,该策略也是线程池的默认拒绝策略 |
CallerRunsPolicy | 将被拒绝的任务放在ThreadPoolExecutor.execute()方法所在的那个线程中执行。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。 |
DiscardPolicy | 将被拒绝的任务直接删除 |
DiscardOldeestPolicy | 当线程池没有关闭的情况下,会将阻塞队首的那个任务从队列中移除,然后将被拒绝的任务加入队列的队尾。 |
2、当通过execute方法将一个Runnable任务添加到线程池中时,基本处理逻辑
(1)如果线程池中的线程数量小于corePoolSize,会创建一个新的线程来执行新任务。
(2)如果线程池中的线程数量大于等于corePoolSize,但缓冲队列workQueue未满,则将新添加的任务放到workQueue中,按照FIFO的原则依次等待执
(3)如果线程池中的线程数量大于等于corePoolSize,且缓冲队列workQueue已满,但线程池中的线程数量小于maximumPoolSize,则会创建新的线程来处理被添加的任务;
(4)如果线程数据大于等于MaxPoolSize,那么执行拒绝策略。
3、线程池关闭
shutdown | 优雅关闭,已提交的任务继续执行,不再接受新任务。 |
shutdownNow | 尝试关闭所有任务 |
4、线程池的状态监控
getTaskCount | 返回曾计划的执行的近似任务总数。因为在计算期间任务和线程的状态可能动态改变,所以返回值只是一个近似值。 |
getCompletedTaskCount | 返回已完成执行的近似任务总数 |
getLargestPoolSize | 线程池曾经创建过的最大线程数量 |
getPoolSize | 线程池的线程数量 |
getActiveCount | 返回正在执行任务的近似线程数 |
五、ScheduledExecutorService
Timer对调度的支持是基于绝对时间,而不是相对时间。而ScheduledExecutorService只支持相对时间。
ScheduledExecutorService接口在ExecuteService基础上,提供了按时间安排执行任务的功能。
// 延迟时间后执行一次任务
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
// 后两个方法,是每间隔一段时间执行任务
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
当向线程池提交任务时会返回一个ScheduleFuture接口的对象
1、Timer管理延时任务的缺陷
因为Timer在执行定时任务时只会创建一个线程,所以如果存在多个任务,且任务过长,超过两个任务时间间隔,会发生一些缺陷。
public class TimerTest
{
private static long start;
public static void main(String[] args) throws Exception
{
TimerTask task1 = new TimerTask()
{
@Override
public void run()
{
System.out.println("task1 invoked ! "
+ (System.currentTimeMillis() - start));
try
{
Thread.sleep(3000);
} catch (InterruptedException e)
{
e.printStackTrace();
}
}
};
TimerTask task2 = new TimerTask()
{
@Override
public void run()
{
System.out.println("task2 invoked ! "
+ (System.currentTimeMillis() - start));
}
};
Timer timer = new Timer();
start = System.currentTimeMillis();
timer.schedule(task1, 1000);
timer.schedule(task2, 3000);
}
}
由于Timre内部是一个线程,而任务1所需的时间超过了两个任务间的间隔,所以任务2的执行和预期不一样。
而ScheduledThreadPool内部是个线程池,所以可以支持多个任务并发执行。
2、Timer当任务抛出异常时的缺陷
如果TimerTask抛出RuntimerExceptin,Timer会停止所有任务的运行。
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;
public class ScheduledThreadPoolDemo01
{
public static void main(String[] args) throws InterruptedException
{
final TimerTask task1 = new TimerTask()
{
@Override
public void run()
{
throw new RuntimeException();
}
};
final TimerTask task2 = new TimerTask()
{
@Override
public void run()
{
System.out.println("task2 invoked!");
}
};
Timer timer = new Timer();
timer.schedule(task1, 100);
timer.scheduleAtFixedRate(task2, new Date(), 1000);
}
}
但是ScheduledExecutorService可以保证,task1出现异常时,不影响task2的运行。
3、Timer执行周其任务时依赖系统时间
Timer执行周期任务时依赖系统时间,如果当前系统时间发生变化会出现一些执行上的变化,ScheduledExecutorService基于时间的延迟。
4、ScheduledFuture
(1)long getDelay(TimeUnit unit)
返回与此对象相关的剩余延迟时间,以给定的时间单位表示。
JDK中并没提供ScheduledFuture的实现类。只有在ScheduledExecutorService中提交了任务,才能返回一个实现了ScheduledFuture接口的对象。
5、RunnableScheduledFuture
(1)boolean isPeriodic()
如果这是一个这期任务,则返回true。
6、接口新增的四个方法
(1)ScheduledFuture<?> schedule(Ruunable command,long delay,TimeUnit unit)
创建并执行在给定的延迟后启用的命令。
(2)ScheduledFuture<V> schedule(Callable<V> callable,long delay,TimeUnit unit)
创建并执行在给定的延迟后执行的命令,可以有返回值。
(3)ScheduledFuture<?> scheduleAtFixedRate(Ruunable command,long initialDelay,long period,TimeUnit unit)
第一次执行延迟initialDelay,之后固定周期为period。
如果任务执行时长小于period,那么任务会在period-任务执行时长 后执行;如果任务执行时长大于period,下一次任务会在上一次任务完成后马上执行。
(4)ScheduledFuture<?> scheduleWithFixedDelay(Ruunable command,long initialDelay,long delay,TimeUnit unit)
第一次执行延迟initialDelay,之后每次延迟时长为delay
六、ScheduledThreadPoolExecutor(实现类,可以直接使用)
它与Timer的功能类似,通常使用工厂类Executors来创建它。
它实现了ScheduledExecutorService接口,实现了一些定时任务处理的方法,它实现了其最常用的两个方法scheduleAtFixedRate和ScheduleWithFixedDelay。
另外由于它继承了ThreadPoolExecutor实现类,所以也拥有一些线程管理的方法。