关于线程池,你该知道的都在这儿!
By: Date: 2021年2月24日 Categories: 程序 标签:, ,

线程池的优点自不必说,重用已有的线程,可以有效减少线程创建及销毁对性能造成的损失。并且通过有效控制线程数量,可以避免资源拥堵,或提高系统资源的使用率。但如何用好线程池才是重点,本篇就通过一个简单的例子,重温线程池相关的知识点。

示例代码

public class ThreadPoolExecutorTest {
    @Test
    public void testThreadPoolExecutor() throws InterruptedException {
        // 定义线程名称
        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("test-pool-thread-%d").build();
        // 创建线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4, 10, 60L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(10),
                namedThreadFactory,
                new ThreadPoolExecutor.AbortPolicy());

        // 循环调用线程池执行任务
        List<String> list = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            System.out.println("接收任务:" + i);
            threadPoolExecutor.execute(() -> {
                try {
                    List<String> resultList = execTask();
                    list.addAll(resultList);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        // 不再接受新的线程,并且等待前面的线程都执行完毕后关闭线程池
        threadPoolExecutor.shutdown();
        // 阻塞主线程, 直至线程池关闭
        threadPoolExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
        System.out.println("主线程执行完毕!收到全部结果如下:");
        System.out.println(list);
    }

    static List<String> execTask() throws InterruptedException {
        String threadName = Thread.currentThread().getName();
        Random random = new Random();
        int sleetMilliseconds = random.nextInt(5000);
        System.out.println(String.format("[%s] 线程开始,预计:%s 毫秒", threadName, sleetMilliseconds));

        List<String> nameList = new ArrayList<>();
        int nameCount = random.nextInt(5) + 1;
        for (int i = 0; i < nameCount; i++) {
            nameList.add(getRandomString(5));
        }

        Thread.sleep(sleetMilliseconds);
        System.out.println(String.format("[%s] 线程执行完毕!返回结果:%s", threadName, nameList));
        return nameList;
    }

    /**
     * 获取随机字符串
     * @param length
     * @return
     */
    static String getRandomString(int length) {
        String base = "abcdefghijklmnopqrstuvwxyz0123456789";
        Random random = new Random();
        StringBuffer sb = new StringBuffer();
        for (int i = 0; i < length; i++) {
            int number = random.nextInt(base.length());
            sb.append(base.charAt(number));
        }
        return sb.toString();
    }
}

上面的代码中,我们使用 LinkedBlockingQueue 队列,采用默认的拒绝策略创建一个线程池,核心线程数为4,最大线程数设置了10但并不会生效。
向队列中放入10个任务,每个任务都会随机的产生几个长度为5的字符串,并返回。
主线程等待所有线程执行完毕后,关闭线程池,并输出所有线程池返回结果。

线程池的关闭我们使用标准的shutdown,awaitTermination两个方法进行关闭。
看下输出结果:

接收任务:0
接收任务:1
接收任务:2
接收任务:3
[test-pool-thread-0] 线程开始,预计:1231 毫秒...
接收任务:4
接收任务:5
接收任务:6
接收任务:7
接收任务:8
接收任务:9
[test-pool-thread-3] 线程开始,预计:4366 毫秒...
[test-pool-thread-1] 线程开始,预计:4626 毫秒...
[test-pool-thread-2] 线程开始,预计:3959 毫秒...
[test-pool-thread-0] 线程执行完毕!返回结果:[0tuj4, q6qif]
[test-pool-thread-0] 线程开始,预计:4767 毫秒...
[test-pool-thread-2] 线程执行完毕!返回结果:[amea5, z2kdz, sqx1b]
[test-pool-thread-2] 线程开始,预计:1203 毫秒...
[test-pool-thread-3] 线程执行完毕!返回结果:[lbu9a, l2wru, 822f4]
[test-pool-thread-3] 线程开始,预计:1914 毫秒...
[test-pool-thread-1] 线程执行完毕!返回结果:[6dygm, p5x6r, 44n35, 2z0dx]
[test-pool-thread-1] 线程开始,预计:2868 毫秒...
[test-pool-thread-2] 线程执行完毕!返回结果:[lh86x, mai62, uv4xb, 98mms]
[test-pool-thread-2] 线程开始,预计:879 毫秒...
[test-pool-thread-0] 线程执行完毕!返回结果:[f87dn, wjkyh, kfahp, 3i6bn, uh3w4]
[test-pool-thread-0] 线程开始,预计:1141 毫秒...
[test-pool-thread-2] 线程执行完毕!返回结果:[t1pzg, poj1q, f68x8, adven]
[test-pool-thread-3] 线程执行完毕!返回结果:[ks3fl]
[test-pool-thread-0] 线程执行完毕!返回结果:[l4ub4, gwof6, v2duc, 4lcg0, nlu5h]
[test-pool-thread-1] 线程执行完毕!返回结果:[a1a73, w5t6p, enqqm, m3awd, w7xmv]
主线程执行完毕!收到全部结果如下:
[0tuj4, q6qif, amea5, z2kdz, sqx1b, lbu9a, l2wru, 822f4, 6dygm, p5x6r, 44n35, 2z0dx, lh86x, mai62, uv4xb, 98mms, f87dn, wjkyh, kfahp, 3i6bn, uh3w4, t1pzg, poj1q, f68x8, adven, ks3fl, l4ub4, gwof6, v2duc, 4lcg0, nlu5h, a1a73, w5t6p, enqqm, m3awd, w7xmv]

可以看到线程池的核心线程数一直都是4。
上面的程序比较简单,接下来回顾下有关线程池的一些基础知识。


Executors 和 ThreadPoolExecutor

先来看下线程池相关的类:
Executor:用于专门处理多线程相关的一个接口,只有一个方法 void execute(Runnable command)。;
ExecutorService:继承 Executor,提供了一系列如shutdown(),shutdownNow(),submit()等生命周期管理的方法。
Executors:线程池工具类,提供了一系列工厂方法,用于创建实现了 ExecutorService 接口线程池。
ThreadPoolExecutor:自定义线程池类,是 ExecutorService 的子集。
这几个类或者接口都在java.util.concurrent命名空间下。

线程池的工作逻辑

  1. 当线程数小于核心线程数时,创建线程。
  2. 当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。
  3. 当线程数大于等于核心线程数,且任务队列已满:
    若线程数小于最大线程数,则创建线程;
    若线程数等于最大线程数,则调用拒绝执行处理程序,默认拒绝策略:抛出异常;

Executors创建4种线程池
Executors是一个工具类,可以快速的创建多种线程池:

  1. newCachedThreadPool:用来创建一个可以无限扩大的线程池,适用于负载较轻的场景,执行短期异步任务。
  2. newFixedThreadPool:创建一个固定大小的线程池,因为采用无界的阻塞队列,所以实际线程数量永远不会变化,适用于负载较重的场景,对当前线程数量进行限制。
  3. newSingleThreadExecutor:适用于执行定时或者周期性任务。
  4. newScheduledThreadPool:创建一个单线程的线程池,适用于需要保证顺序执行各个任务。
    其中 newCachedThreadPool,newFixedThreadPool,newSingleThreadExecutor 的实现类均是 ThreadPoolExecutor,而 newScheduledThreadPool 的实现类是 ScheduledThreadPoolExecutor。
    ScheduledThreadPoolExecutor 相较于 ThreadPoolExecutor,都是 ExecutorService 的子集,而前者是接口,增加了对任务的调度功能,如延迟,定时等。

阿里的开发手册中不推荐使用 Executors 来创建线程池,因为:

  1. SingleThreadExecutor 及 FixedThreadPool 的实现方法中使用的 LinkedBlockingQueue 工作队列没有给定容量,允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量请求导致OOM;
  2. CachedThreadPool 和 ScheduledThreadPool 分别使用 SynchronousQueue 和 DelayedWorkQueue 两种无界队列,允许创建的线程数为Integer.MAX_VALUE,可能会因创建大量的线程而导致OOM。

因此通过 ThreadPoolExecutor 来创建线程池,能更清楚的知道线程池的运行规则,避免资源耗尽风险。

ThreadPoolExecutor创建线程池

ThreadPoolExecutor的构造方法共有四个:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

所有参数表示如下:

  1. corePoolSize: 核心线程池大小
  2. maximumPoolSize: 最大线程池大小
  3. keepAliveTime: 线程最大空闲时间
  4. unit: 时间单位
  5. workQueue: BlockingQueue 线程工作队列
  6. threadFactory: ThreadFactory 线程创建工厂
  7. handler: RejectedExecutionHandler 拒绝策略

线程工作队列 workQueue
上面参数中的工作队列 BlockingQueue workQueue 是一个阻塞队列。阻塞队列常用于生产者和消费者的场景,生产者向队列中添加元素,而消费者从队列中取出元素。阻塞队列支持两个附加操作:

  1. 在队列为空时,获取元素的消费者线程会等待队列变为非空,即等待生产线程放入元素。
  2. 当队列满时,存储元素的生产线程会等待队列可用,即消费线程取出元素。

线程池常用的阻塞队列有以下几种:

  1. SynchronousQueue:无界队列,提交的任务不会被保存,总是会马上提交执行。当创建的线程数大于 maximumPoolSize 时,直接执行拒绝策略抛出异常。
  2. ArrayBlockingQueue :有界队列,当有新的任务需要执行时,线程池会创建新的线程,直到创建的线程数量达到 corePoolSize 时,则会将新的任务加入到等待队列中。若等待队列已满,即超过ArrayBlockingQueue的初始化容量capacity,则继续创建线程,直到线程数量达到maximumPoolSize最大线程数,若大于 maximumPoolSize,则执行拒绝策略。
  3. LinkedBlockingQueue :一个用链表实现的有界阻塞队列,不设置容量时视为无界队列,可以无限添加新任务。线程池创建的最大线程数量就是 corePoolSize 设置的数量,即 maximumPoolSize 参数是无效的,当线程数达到 corePoolSize 后,就不再增加,新的任务直接进入队列等待。
  4. PriorityBlockingQueue :优先任务队列,特殊的无界队列,其他队列是按照先进先出的规则处理任务,而PriorityBlockingQueue队列可以自定义规则根据任务的优先级顺序先后执行。
  5. DelayQueue :无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。

JDK中还提供了其他的阻塞队列,如 LinkedTransferQueue,LinkedBlockingDeque 等,这里不常用就不再详述。

拒绝策略handler
为防止资源被耗尽,就需要在定义线程池时设置合理的拒绝策略,尤其是在有界队列当中。

  1. AbortPolicy策略:会直接抛出异常,阻止系统正常工作。
  2. CallerRunsPolicy策略:如果线程池的线程数量达到上限,该策略会把任务队列中的任务放在调用者线程当中运行。
  3. DiscardOledestPolicy策略:该策略会丢弃任务队列中最老的一个任务,即任务队列中最先被添加进去的,也是即将要被执行的那个任务,并尝试再次提交。
  4. DiscardPolicy策略:丢弃无法处理的任务,不予任何处理。使用此策略时,业务场景中需允许任务的丢失。

线程池线程数量取值

java并发编程中给出了一个公式:
Nthread = Ncpu * Ucpu * (1+ W/C)

  1. Nthreads:线程数量
  2. Ncpu:CPU的数量,Runtime.getRuntime().availableProcessors()
  3. Ucpu:CPU使用率,范围在[0,1]
  4. W/C:等待时间与计算时间的比率
    对于线程的数量其实重要的是分辨出是IO密集型,还是计算密集型

另一种简单的计算方法:
如果是IO密集型应用,则线程池大小设置为2N+1;
如果是CPU密集型应用,则线程池大小设置为N或不超过N+1;例如我们4核的CPU,那么设置线程数不超过5为宜。

ThreadPoolExecutor扩展

ThreadPoolExecutor是利用beforeExecute()、afterExecute()和terminated()接口实现对线程监控或其他操作。

  1. beforeExecute:线程池中任务运行前执行
  2. afterExecute:线程池中任务运行完毕后执行
  3. terminated:线程池结束后执行
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4, 10, 60L, TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(10),
        namedThreadFactory,
        new ThreadPoolExecutor.AbortPolicy()){

    protected void beforeExecute(Thread t,Runnable r) {
        System.out.println(String.format("[%s] 准备执行!", t.getName()));
    }

    protected void afterExecute(Runnable r,Throwable t) {
        System.out.println("执行完毕!");
    }

    protected void terminated() {
        System.out.println("线程池结束");
    }
};

SpringBoot中线程池的使用

前面讲的 ThreadPoolExecutor 是JDK中提供的线程池类,而 ThreadPoolTaskExecutor 是Spring为我们封装的线程池类。在SpringBoot 中我们可以直接使用后者,因为它本身也是基于ThreadPoolExecutor实现的。

@Configuration
@EnableAsync
public class ThreadPoolExecutorConfig {
    @Bean("threadPoolExecutor")
    public Executor asyncThreadPoolExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 设置核心线程数
        executor.setCorePoolSize(4);
        // 设置最大线程数
        executor.setMaxPoolSize(10);
        // 设置工作队列大小
        executor.setQueueCapacity(Integer.MAX_VALUE);
        // 设置线程空闲时间,单位:秒
        executor.setKeepAliveSeconds(60);
        // 设置线程池线程名称前缀
        executor.setThreadNamePrefix("test-pool-thread-");
        // 设置拒绝策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //执行初始化
        executor.initialize();
        return executor;
    }
}

initialize方法初始化线程池,实际上就是new ThreadPoolExecutor,它的阻塞队列使用的是 LinkedBlockingQueue 或者 SynchronousQueue,视初始化时是否设置了容量而定。
源码就不贴了,接下来我们就可以定义异步的方法,来执行我们的业务逻辑。

//定义接口
public interface AsyncExecuteService {
    /**
     * 执行异步任务
     */
    void asyncExecute();
}

//接口实现类
@Slf4j
@Service
public class AsyncExecuteServiceImpl implements AsyncExecuteService {
    @Override
    @Async("threadPoolExecutor")
    public void asyncExecute() {
        log.info("调用asyncExecute开始");
        //@TODO业务逻辑处理
        //...
        log.info("调用asyncExecute结束");
    }
}

@Async("threadPoolExecutor") 注解标识从线程池中获取线程来执行该方法。

扩展一下,ThreadPoolTaskExecutor 类本身就已实现了诸如submit,execute等方法。若想对线程池进行监控,可以继承ThreadPoolTaskExecutor,自定义类并重写其中的方法实现,这里就不在赘述。

好了,以上就是能想到的关于线程池的常用知识点,表述不全,具体使用还需根据场景决定。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注