跳至主要內容

Java线程池

IT王小二约 3396 字

Java线程池

作者:IT王小二

博客:https://itwxe.comopen in new window

一、为什么要用线程池

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。

二、ThreadPoolExecutor的类关系

  • Executor 是一个接口,它是 Executor 框架的基础,它将任务的提交与任务的执行分离开来。
  • ExecutorService 接口继承了 Executor,在其上做了一些 shutdown()、submit() 的扩展,可以说是真正的线程池接口。
  • AbstractExecutorService 抽象类实现了 ExecutorService 接口中的大部分方法。
  • ThreadPoolExecutor 是线程池的核心实现类,用来执行被提交的任务。
  • ScheduledExecutorService 接口继承了 ExecutorService 接口,提供了带"周期执行"功能 ExecutorService 。
  • ScheduledThreadPoolExecutor 是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutor 比 Timer 更灵活,功能更强大。

三、线程池的创建各个参数含义

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

1. corePoolSize

  • 线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于 corePoolSize。
  • 如果当前线程数为 corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行。
  • 如果执行了线程池的 prestartAllCoreThreads() 方法,线程池会提前创建并启动所有核心线程。

2. maximumPoolSize

线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于 maximumPoolSize 。

3. keepAliveTime和TimeUnit

这两个配套使用,TimeUnit是keepAliveTime的时间单位。

线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间。默认情况下,该参数只在线程数大于 corePoolSize 时才有用。

4. workQueue

用于保存等待执行的任务的阻塞队列,一般来说,我们应该尽量使用有界队列,因为使用无界队列作为工作队列会对线程池带来如下影响。

  1. 当线程池中的线程数达到 corePoolSize 后,新任务将在无界队列中等待,因此线程池中的线程数不会超过 corePoolSize
  2. 由于 1,使用无界队列时 maximumPoolSize 将是一个无效参数
  3. 由于 1 和 2,使用无界队列时 keepAliveTime 将是一个无效参数
  4. 更重要的,使用无界 queue 可能会耗尽系统资源,导致OOM

所以我们一般会使用,ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、PriorityBlockingQueue 。

5、threadFactory

创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名,当然还可以更加自由的对线程做更多的设置,比如设置所有的线程为守护线程,需要自定义时实现 ThreadFactory 接口,然后实现 newThread() 方法即可。

Executors 静态工厂里默认的 ThreadFactory,线程的命名规则是“pool-数字-thread-数字”,源码如下:

static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

6、RejectedExecutionHandler

线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了 4 种策略:

  1. AbortPolicy:直接抛出异常,默认策略。
  2. allerRunsPolicy:用调用者所在的线程来执行任务。
  3. iscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务。
  4. iscardPolicy:直接丢弃任务。

当然也可以根据应用场景实现 RejectedExecutionHandler 接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

四、线程池的工作机制

  1. 如果当前运行的线程少于 corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。
  2. 如果运行的线程等于或多于 corePoolSize,则将任务加入 BlockingQueue 。
  3. 如果无法将任务加入 BlockingQueue(队列已满),则创建新的线程来处理任务。
  4. 建新线程将使当前运行的线程超出 maximumPoolSize,任务将被拒绝,并调用 RejectedExecutionHandler.rejectedExecution()方法。

五、提交任务

  1. execute() 方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。
  2. submit() 方法用于提交需要返回值的任务,线程池会返回一个 future 类型的对象,通过这个 future 对象可以判断任务是否执行成功,并且可以通过 future 的 get()方法来获取返回值。

get()方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

六、关闭线程池

可以通过调用线程池的 shutdown() 或 shutdownNow() 方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的 interrupt() 方法来中断线程,所以无法响应中断的任务可能永远无法终止。

  • shutdown() 只是将线程池的状态设置成 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程
  • shutdownNow 首先将线程池的状态设置成 STOP 状态,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表。

只要调用了这两个关闭方法中的任意一个,isShutdown 方法就会返回 true。

当所有的任务都已关闭后,才表示线程池关闭成功,这时调用 isTerminaed 方法会返回 true。至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用 shutdown 方法来关闭线程池,如果任务不一定要执行完,则可以调用 shutdownNow 方法。

七、扩展线程池

能扩展线程池的功能吗?比如在任务执行的前后做一点我们自己的业务工作(类似切面编程)。

实际上,JDK 的线程池已经为我们预留的接口,在线程池核心方法中,有 2 个方法是空的,就是给我们预留的。还有一个线程池退出时会调用的方法。

分别是:

  • protected void beforeExecute(Thread t, Runnable r) { }
  • protected void afterExecute(Runnable r, Throwable t) { }
  • protected void terminated() { }

使用示例:

public class ThreadPoolAOP {

    /**
     *  模拟工作任务
     */
    static class WorkRunnable implements Runnable {

        private String name;

        public WorkRunnable(String name) {
            this.name = name;
        }

        public String getName() {
            return name;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " 处理任务 " + name + " 中");
            try {
                Thread.sleep(400);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        // 创建线程池及实现方法
        ExecutorService threadPool = new ThreadPoolExecutor(2,
                4,
                3,
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
                new ThreadPoolExecutor.DiscardOldestPolicy()) {
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println(Thread.currentThread().getName() + " 执行任务:" + ((WorkRunnable) r).getName() + " 之前");
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println(Thread.currentThread().getName() + " 执行任务:" + ((WorkRunnable) r).getName() + " 之后");
            }

            @Override
            protected void terminated() {
                System.out.println("线程池退出。。。");
            }
        };

        // 模拟6个工作任务执行
        for (int i = 1; i < 6; i++) {
            WorkRunnable workRunnable = new WorkRunnable("工作任务" + i);
            System.out.println("工作任务" + i + "提交");
            threadPool.execute(workRunnable);
        }
        // 关闭线程池
        threadPool.shutdown();
    }
}

八、合理配置线程池

要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析(Ncpu代表cpu核心数,可以通过 Runtime.getRuntime().availableProcessors()方法获得当前设备的 CPU 个数)。

1、任务的性质:CPU 密集型任务、IO 密集型任务和混合型任务。

  • CPU 密集型任务应配置尽可能小的线程,如配置 Ncpu + 1 个线程的线程池。
  • IO 密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如 2 * Ncpu 。
  • 混合型的任务,如果可以拆分,将其拆分成一个 CPU 密集型任务和一个 IO 密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。

2、任务的优先级:高、中和低。

优先级不同的任务可以使用优先级队列 PriorityBlockingQueue 来处理,它可以让优先级高的任务先执行。

3、任务的执行时间:长、中和短。

执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让执行时间短的任务先执行。

4、任务的依赖性:是否依赖其他系统资源,如数据库连接。

依赖数据库连接池的任务,因为线程提交 SQL 后需要等待数据库返回结果,等待的时间越长,则 CPU 空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用 CPU 。

六、预定义线程池

除了ThreadPoolExecutor以外,jdk还提供了一些预定义的线程池供我们使用,但是不推荐使用,阿里规约里面的说明。

预定义线程池

预定义线程池有:

  • FixedThreadPool
  • SingleThreadExecutor
  • CachedThreadPool
  • WorkStealingPool
  • ScheduledThreadPoolExecutor

1. FixedThreadPool

  • 创建一个固定长度的线程池,每提交一个任务时就创建一个线程,直到达到线程池的最大数量,其中 nThreads 即为固定长度的线程池。
  • FixedThreadPool 使用有界队列 LinkedBlockingQueue 作为线程池的工作队列(队列的容量为 Integer.MAX_VALUE)。
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

2. SingleThreadExecutor

  • 创建使用单个线程的API,corePoolSize 和 maximumPoolSize 被设置为 1 。
  • SingleThreadExecutor 使用有界队列 LinkedBlockingQueue 作为线程池的工作队列(队列的容量为 Integer.MAX_VALUE)。
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

3. CachedThreadPool

  • 创建一个会根据需要创建新线程的API,大小无界的线程池,适用于执行很多的短期异步任务的小程序,或者是负载较轻的服务器。
  • maximumPool 最大长度为 Integer.MAX_VALUE 。
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

4. WorkStealingPool

利用所有运行的处理器数目来创建一个工作窃取的线程池,使用 forkjoin 实现。

public static ExecutorService newWorkStealingPool(int parallelism) {
    return new ForkJoinPool
        (parallelism,
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null, true);
}

5. ScheduledThreadPoolExecutor

用于周期性执行任务,即定时任务。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

示例代码:

public class ScheduledThreadPoolExecutorTest {

    public static void main(String[] args) {

        ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(1);

        // 延时任务,仅执行一次
//        scheduled.schedule(new Runnable() {
//            @Override
//            public void run() {
//                System.out.println("我只运行一次" + LocalDateTime.now());
//            }
//        }, 3, TimeUnit.SECONDS);

        // 固定延时时间间隔任务,执行多次
//        scheduled.scheduleWithFixedDelay(new Runnable() {
//            @Override
//            public void run() {
//                System.out.println("开始执行。。 " + LocalDateTime.now());
//                try {
//                    Thread.sleep(2000);
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
//                System.out.println("执行结束。。 " + LocalDateTime.now());
//            }
//        },1000,1000,TimeUnit.MILLISECONDS);

        // 固定时间间隔执行任务,执行多次
        scheduled.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                System.out.println("开始执行。。 " + LocalDateTime.now());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("执行结束。。 " + LocalDateTime.now());
            }
        }, 1000, 1000, TimeUnit.MILLISECONDS);

    }

}
  • 其中固定延时间隔任务是指每次执行完任务以后都延时一个固定的时间。由于操作系统调度以及每次任务执行的语句可能不同,所以每次任务执行所花费的时间是不确定的,也就导致了每次任务的执行周期存在一定的波动,例如示例代码中的 scheduleWithFixedDelay 。
  • 固定时间间隔的任务不论每次任务花费多少时间,下次任务开始执行时间从理论上讲是确定的,当然执行任务的时间不能超过执行周期;如果超过设置的延时时间则立刻开始执行下一次任务,例如示例代码中的 scheduleAtFixedRate 。

七、CompletionService

CompletionService的主要功能就是一边生成任务,一边获取任务的返回值。让两件事分开执行,任务之间不会互相阻塞。

CompletionService在提交任务之后,会根据任务完成顺序来获取返回值,也就是谁先完成就返回谁的返回值。

示例代码:

public class CompletionServiceTest {

    private final static int POOL_SIZE = Runtime.getRuntime().availableProcessors();
    private final static int TOTAL_TASK = Runtime.getRuntime().availableProcessors() * 10;

    static class WorkTask implements Callable<Integer> {

        private String name;
        public WorkTask(String name) {
            this.name = name;
        }

        @Override
        public Integer call() throws Exception {
            int sleepTime = new Random().nextInt(1000);
            Thread.sleep(sleepTime);
            return sleepTime;
        }
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        long start = System.currentTimeMillis();
        AtomicInteger count = new AtomicInteger(0);
        // 创建线程池
        ExecutorService threadPool = Executors.newFixedThreadPool(POOL_SIZE);
        CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(threadPool);

        // 向里面放任务
        for (int i = 0; i < TOTAL_TASK; i++) {
            completionService.submit(new WorkTask("workTask " + i));
        }

        // 检查线程池任务执行结果
        for (int i = 0; i < TOTAL_TASK; i++) {
            int sleepTime = completionService.take().get();
            count.addAndGet(sleepTime);
        }

        // 关闭线程池
        threadPool.shutdown();
        System.out.println("任务休眠时间:" + count.get() + " ms,取出执行结果时间:" + (System.currentTimeMillis() - start) + " ms");
    }
}