java并发编程框架:Executor

阅读数:5 评论数:0

跳转到新版页面

分类

python/Java

正文

一、概述

Executor框架用于简化并发编程,通过Executor来启动线程比使用Thread的start方法更好。

1、常用的几个接口和子类的关系

二、Executor和ExecutorService

0、二者的区别

(1)Executor接口定义了execute()方法用来接收一个Runnable接口的对象,而ExecutorService接口中的submit()方法可以接收Runnable和Callable接口的对象。

(2)Executor的execute()不返回任何结果,而ExecutorService的submit()方法通过Future对象返回运算结果。

(3)ExecutorService还提供了管理线程池的方法,比如通过shutdown()终止线程斌。

1、Executor

它是Java线程池的超级接口,提供一个execute(Runnable command)方法,我们一般用它的继承接口ExecutorService。

// 这里RunnableTask实现的Runnable接口
executor.execute(new RunnableTask()); // 异步执行
//等价于
new Thread(new RunnableTask())).start()

2、ExecutorService

它是一个比Executor使用更广泛的子类接口。

(1)提供了生命周期管理方法

(2)可以调用shutdown()方法来平滑关闭,这时它会停止接收任何新的任务,且等待已提交的任务执行完。

(3)通过返回的Future对象,可以调用isDone()来查询Future是否已经完成,当任务完成时,它具有一个结果,你也可以调用get()方法来获取结果,get()将会被阻塞,直至结果准备就绪。还可以通过cancel()来取消还未执行的任务。

/**
* 停止接收新任务,已提交的任务会继续执行完成。
**/
void shutdown();
/**
* 停止接收新任务,忽略队列中等待的任务,尝试将正在执行的任务中断掉,
* 返回未执行的任务列表
* 它试图终止线程的方法是通过调用 Thread.interrupt() 方法来实现的,这种方法的作用有限,
* 如果线程中没有sleep 、wait、Condition、定时锁等应用, interrupt() 方法是无法中断当前的线程的。
* 所以,shutdownNow() 并不代表线程池就一定立即就能退出,
* 它也可能必须要等待所有正在执行的任务都执行完成了才能退出。但是大多数时候是能立即退出的。
**/
List<Runnable> shutdownNow();
/**
* 等所有已提交任务执行完,或等超时时间到了,或者线程被中断了,抛出InterruptedException
**/
boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException;
//当调用shutdown()或shutdownNow()方法后返回为true
boolean isShutdown();
//当调用shutdown()方法后,并且所有提交的任务完成后返回为true
//当调用shutdownNow()方法后,成功停止后返回为true;
boolean isTerminated();

//提交一个Callable任务
<T> Future<T> submit(Callable<T> task);
//提交一个Runnable任务,因为Runnable没有返回指,所以第二个参数是用来返回值
<T> Future<T> submit(Runnable task, T result);
//提交一个Runnable任务
Future<?> submit(Runnable task);
//执行所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException;
//执行所有任务,有过期时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                              long timeout, TimeUnit unit)
    throws InterruptedException;
//有一个任务结束就可以返回
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
                long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;
//有一个任务结束就可以返回,有过期时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;

三、Executors

这个类提供大量创建连接池的静态方法。

1、固定大小的线程池,newFixedThreadPool

public class Test {  
    public static void main(String[] args) {  
        // 创建一个可重用固定线程数的线程池  
        ExecutorService pool = Executors.newFixedThreadPool(5);  
        // 创建线程  
        Thread t1 = new MyThread();  
        Thread t2 = new MyThread();  
        Thread t3 = new MyThread();  
        Thread t4 = new MyThread();  
        Thread t5 = new MyThread();  
        // 将线程放入池中进行执行  
        pool.execute(t1);  
        pool.execute(t2);  
        pool.execute(t3);  
        pool.execute(t4);  
        pool.execute(t5);  
        // 关闭线程池  
        pool.shutdown();  
    }  
}  
  
class MyThread extends Thread {  
    @Override  
    public void run() {  
        System.out.println(Thread.currentThread().getName() + "正在执行。。。");  
    }  
}

2、单任务线程池 newSingleThreadExecutor

3、可缓存的线程池 newCachedThreadPool

如果池中已有线程是空闲的,会重用已有线程。

4、延时线程池newScheduledThreadPool,多数情况下可用来替代Timer

public class Test {  
    public static void main(String[] args) {  
        // 创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。  
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);  
        // 创建实现了Runnable接口对象,Thread对象当然也实现了Runnable接口  
        Thread t1 = new MyThread();  
        Thread t2 = new MyThread();  
        Thread t3 = new MyThread();  
        // 将线程放入池中进行执行  
        pool.execute(t1);  
        // 使用延迟执行风格的方法  
        pool.schedule(t2, 1000, TimeUnit.MILLISECONDS);  
        pool.schedule(t3, 10, TimeUnit.MILLISECONDS);  
  
        // 关闭线程池  
        pool.shutdown();  
    }  
}  
  
class MyThread extends Thread {  
    @Override  
    public void run() {  
        System.out.println(Thread.currentThread().getName() + "正在执行。。。");  
    }  
}

四、ThreadPoolExecutor

多用于自定义线程池,基于ThreadPoolExecutor可以很容易将一个Runnable接口的任务放入线程池中。

public ThreadPoolExecutor(int corePoolSize,  
                          int maximumPoolSize,  
                          long keepAliveTime,  
                          TimeUnit unit,  
                          BlockingQueue<Runnable> workQueue,  
                          RejectedExecutionHandler handler) {  
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,  
         Executors.defaultThreadFactory(), handler);  
}

1、参数解释

(1)corePoolSize:核心线程数,会一直存活,即使没有任务,线程池会维护线程的最少数量。

(2)maximumPoolSize:线程池维护线程的最大数量。

(3)keepAliveTime:线程池维护线程所允许的空闲时间,当线程空闲时间达到keepAliveTime,该线程会退出,直到线程数量等于corePoolSize。如果allowCoreThreadTimeout设置为true,则所有线程均会退出直到线程数量为0.

(4)unit:线程池维护线程所允许空闲时间的单位

(5)workQueue:线程份所使用的缓冲队列

(6)handler:对拒绝任务的处理策略。

2、当通过execute方法将一个Runnable任务添加到线程池中时,基本处理逻辑

(1)如果线程池中的线程数量小于corePoolSize,会创建一个新的线程来执行新任务。

(2)如果线程池中的线程数量大于等于corePoolSize,但缓冲队列workQueue未满,则将新添加的任务放到workQueue中,按照FIFO的原则依次等待执

(3)如果线程池中的线程数量大于等于corePoolSize,且缓冲队列workQueue已满,但线程池中的线程数量小于maximumPoolSize,则会创建新的线程来处理被添加的任务;

(4)如果线程数据大于等于MaxPoolSize,那么执行拒绝策略。

五、ScheduledExecutorService

Timer对调度的支持是基于绝对时间,而不是相对时间。而ScheduledExecutorService只支持相对时间

ScheduledExecutorService接口在ExecuteService基础上,提供了按时间安排执行任务的功能。

当向线程池提交任务时会返回一个ScheduleFuture接口的对象

1、Timer管理延时任务的缺陷

因为Timer在执行定时任务时只会创建一个线程,所以如果存在多个任务,且任务过长,超过两个任务时间间隔,会发生一些缺陷。

public class TimerTest  
{  
    private static long start;  
  
    public static void main(String[] args) throws Exception  
    {  
  
        TimerTask task1 = new TimerTask()  
        {  
            @Override  
            public void run()  
            {  
  
                System.out.println("task1 invoked ! "  
                        + (System.currentTimeMillis() - start));  
                try  
                {  
                    Thread.sleep(3000);  
                } catch (InterruptedException e)  
                {  
                    e.printStackTrace();  
                }  
  
            }  
        };  
        TimerTask task2 = new TimerTask()  
        {  
            @Override  
            public void run()  
            {  
                System.out.println("task2 invoked ! "  
                        + (System.currentTimeMillis() - start));  
            }  
        };  
        Timer timer = new Timer();  
        start = System.currentTimeMillis();  
        timer.schedule(task1, 1000);  
        timer.schedule(task2, 3000);  
  
    }  
} 

由于Timre内部是一个线程,而任务1所需的时间超过了两个任务间的间隔,所以任务2的执行和预期不一样。

ScheduledThreadPool内部是个线程池,所以可以支持多个任务并发执行。

2、Timer当任务抛出异常时的缺陷

如果TimerTask抛出RuntimerExceptin,Timer会停止所有任务的运行。

import java.util.Date;  
import java.util.Timer;  
import java.util.TimerTask;  
  
  
public class ScheduledThreadPoolDemo01  
{  
  
  
    public static void main(String[] args) throws InterruptedException  
    {  
  
        final TimerTask task1 = new TimerTask()  
        {  
  
            @Override  
            public void run()  
            {  
                throw new RuntimeException();  
            }  
        };  
  
        final TimerTask task2 = new TimerTask()  
        {  
  
            @Override  
            public void run()  
            {  
                System.out.println("task2 invoked!");  
            }  
        };  
          
        Timer timer = new Timer();  
        timer.schedule(task1, 100);  
        timer.scheduleAtFixedRate(task2, new Date(), 1000);  
          
          
  
    }  
}  

但是ScheduledExecutorService可以保证,task1出现异常时,不影响task2的运行。

3、Timer执行周其任务时依赖系统时间

Timer执行周期任务时依赖系统时间,如果当前系统时间发生变化会出现一些执行上的变化,ScheduledExecutorService基于时间的延迟。

4、ScheduledFuture

(1)long getDelay(TimeUnit unit)

返回与此对象相关的剩余延迟时间,以给定的时间单位表示。

JDK中并没提供ScheduledFuture的实现类。只有在ScheduledExecutorService中提交了任务,才能返回一个实现了ScheduledFuture接口的对象。

5、RunnableScheduledFuture

(1)boolean isPeriodic()

如果这是一个这期任务,则返回true。

6、接口新增的四个方法

(1)ScheduledFuture<?> schedule(Ruunable command,long delay,TimeUnit unit)

创建并执行在给定的延迟后启用的命令。

(2)ScheduledFuture<V> schedule(Callable<V> callable,long delay,TimeUnit unit)

创建并执行在给定的延迟后执行的命令,可以有返回值。

(3)ScheduledFuture<?> scheduleAtFixedRate(Ruunable command,long initialDelay,long period,TimeUnit unit)

第一次执行延迟initialDelay,之后固定周期为period。

如果任务执行时长小于period,那么任务会在period-任务执行时长 后执行;如果任务执行时长大于period,下一次任务会在上一次任务完成后马上执行。

(4)ScheduledFuture<?> scheduleWithFixedDelay(Ruunable command,long initialDelay,long delay,TimeUnit unit)

第一次执行延迟initialDelay,之后每次延迟时长为delay

 

六、ScheduledThreadPoolExecutor

它与Timer的功能类似。