线程池的优点自不必说,重用已有的线程,可以有效减少线程创建及销毁对性能造成的损失。并且通过有效控制线程数量,可以避免资源拥堵,或提高系统资源的使用率。但如何用好线程池才是重点,本篇就通过一个简单的例子,重温线程池相关的知识点。
示例代码
public class ThreadPoolExecutorTest {
@Test
public void testThreadPoolExecutor() throws InterruptedException {
// 定义线程名称
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("test-pool-thread-%d").build();
// 创建线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4, 10, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10),
namedThreadFactory,
new ThreadPoolExecutor.AbortPolicy());
// 循环调用线程池执行任务
List<String> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
System.out.println("接收任务:" + i);
threadPoolExecutor.execute(() -> {
try {
List<String> resultList = execTask();
list.addAll(resultList);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
// 不再接受新的线程,并且等待前面的线程都执行完毕后关闭线程池
threadPoolExecutor.shutdown();
// 阻塞主线程, 直至线程池关闭
threadPoolExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
System.out.println("主线程执行完毕!收到全部结果如下:");
System.out.println(list);
}
static List<String> execTask() throws InterruptedException {
String threadName = Thread.currentThread().getName();
Random random = new Random();
int sleetMilliseconds = random.nextInt(5000);
System.out.println(String.format("[%s] 线程开始,预计:%s 毫秒", threadName, sleetMilliseconds));
List<String> nameList = new ArrayList<>();
int nameCount = random.nextInt(5) + 1;
for (int i = 0; i < nameCount; i++) {
nameList.add(getRandomString(5));
}
Thread.sleep(sleetMilliseconds);
System.out.println(String.format("[%s] 线程执行完毕!返回结果:%s", threadName, nameList));
return nameList;
}
/**
* 获取随机字符串
* @param length
* @return
*/
static String getRandomString(int length) {
String base = "abcdefghijklmnopqrstuvwxyz0123456789";
Random random = new Random();
StringBuffer sb = new StringBuffer();
for (int i = 0; i < length; i++) {
int number = random.nextInt(base.length());
sb.append(base.charAt(number));
}
return sb.toString();
}
}
上面的代码中,我们使用 LinkedBlockingQueue 队列,采用默认的拒绝策略创建一个线程池,核心线程数为4,最大线程数设置了10但并不会生效。
向队列中放入10个任务,每个任务都会随机的产生几个长度为5的字符串,并返回。
主线程等待所有线程执行完毕后,关闭线程池,并输出所有线程池返回结果。
线程池的关闭我们使用标准的shutdown,awaitTermination两个方法进行关闭。
看下输出结果:
接收任务:0
接收任务:1
接收任务:2
接收任务:3
[test-pool-thread-0] 线程开始,预计:1231 毫秒...
接收任务:4
接收任务:5
接收任务:6
接收任务:7
接收任务:8
接收任务:9
[test-pool-thread-3] 线程开始,预计:4366 毫秒...
[test-pool-thread-1] 线程开始,预计:4626 毫秒...
[test-pool-thread-2] 线程开始,预计:3959 毫秒...
[test-pool-thread-0] 线程执行完毕!返回结果:[0tuj4, q6qif]
[test-pool-thread-0] 线程开始,预计:4767 毫秒...
[test-pool-thread-2] 线程执行完毕!返回结果:[amea5, z2kdz, sqx1b]
[test-pool-thread-2] 线程开始,预计:1203 毫秒...
[test-pool-thread-3] 线程执行完毕!返回结果:[lbu9a, l2wru, 822f4]
[test-pool-thread-3] 线程开始,预计:1914 毫秒...
[test-pool-thread-1] 线程执行完毕!返回结果:[6dygm, p5x6r, 44n35, 2z0dx]
[test-pool-thread-1] 线程开始,预计:2868 毫秒...
[test-pool-thread-2] 线程执行完毕!返回结果:[lh86x, mai62, uv4xb, 98mms]
[test-pool-thread-2] 线程开始,预计:879 毫秒...
[test-pool-thread-0] 线程执行完毕!返回结果:[f87dn, wjkyh, kfahp, 3i6bn, uh3w4]
[test-pool-thread-0] 线程开始,预计:1141 毫秒...
[test-pool-thread-2] 线程执行完毕!返回结果:[t1pzg, poj1q, f68x8, adven]
[test-pool-thread-3] 线程执行完毕!返回结果:[ks3fl]
[test-pool-thread-0] 线程执行完毕!返回结果:[l4ub4, gwof6, v2duc, 4lcg0, nlu5h]
[test-pool-thread-1] 线程执行完毕!返回结果:[a1a73, w5t6p, enqqm, m3awd, w7xmv]
主线程执行完毕!收到全部结果如下:
[0tuj4, q6qif, amea5, z2kdz, sqx1b, lbu9a, l2wru, 822f4, 6dygm, p5x6r, 44n35, 2z0dx, lh86x, mai62, uv4xb, 98mms, f87dn, wjkyh, kfahp, 3i6bn, uh3w4, t1pzg, poj1q, f68x8, adven, ks3fl, l4ub4, gwof6, v2duc, 4lcg0, nlu5h, a1a73, w5t6p, enqqm, m3awd, w7xmv]
可以看到线程池的核心线程数一直都是4。
上面的程序比较简单,接下来回顾下有关线程池的一些基础知识。
Executors 和 ThreadPoolExecutor
先来看下线程池相关的类:
Executor:用于专门处理多线程相关的一个接口,只有一个方法 void execute(Runnable command)。;
ExecutorService:继承 Executor,提供了一系列如shutdown(),shutdownNow(),submit()等生命周期管理的方法。
Executors:线程池工具类,提供了一系列工厂方法,用于创建实现了 ExecutorService 接口线程池。
ThreadPoolExecutor:自定义线程池类,是 ExecutorService 的子集。
这几个类或者接口都在java.util.concurrent命名空间下。
线程池的工作逻辑:
- 当线程数小于核心线程数时,创建线程。
- 当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。
- 当线程数大于等于核心线程数,且任务队列已满:
若线程数小于最大线程数,则创建线程;
若线程数等于最大线程数,则调用拒绝执行处理程序,默认拒绝策略:抛出异常;
Executors创建4种线程池
Executors是一个工具类,可以快速的创建多种线程池:
- newCachedThreadPool:用来创建一个可以无限扩大的线程池,适用于负载较轻的场景,执行短期异步任务。
- newFixedThreadPool:创建一个固定大小的线程池,因为采用无界的阻塞队列,所以实际线程数量永远不会变化,适用于负载较重的场景,对当前线程数量进行限制。
- newSingleThreadExecutor:适用于执行定时或者周期性任务。
- newScheduledThreadPool:创建一个单线程的线程池,适用于需要保证顺序执行各个任务。
其中 newCachedThreadPool,newFixedThreadPool,newSingleThreadExecutor 的实现类均是 ThreadPoolExecutor,而 newScheduledThreadPool 的实现类是 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor 相较于 ThreadPoolExecutor,都是 ExecutorService 的子集,而前者是接口,增加了对任务的调度功能,如延迟,定时等。
阿里的开发手册中不推荐使用 Executors 来创建线程池,因为:
- SingleThreadExecutor 及 FixedThreadPool 的实现方法中使用的 LinkedBlockingQueue 工作队列没有给定容量,允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量请求导致OOM;
- CachedThreadPool 和 ScheduledThreadPool 分别使用 SynchronousQueue 和 DelayedWorkQueue 两种无界队列,允许创建的线程数为Integer.MAX_VALUE,可能会因创建大量的线程而导致OOM。
因此通过 ThreadPoolExecutor 来创建线程池,能更清楚的知道线程池的运行规则,避免资源耗尽风险。
ThreadPoolExecutor创建线程池
ThreadPoolExecutor的构造方法共有四个:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
所有参数表示如下:
- corePoolSize: 核心线程池大小
- maximumPoolSize: 最大线程池大小
- keepAliveTime: 线程最大空闲时间
- unit: 时间单位
- workQueue: BlockingQueue 线程工作队列
- threadFactory: ThreadFactory 线程创建工厂
- handler: RejectedExecutionHandler 拒绝策略
线程工作队列 workQueue
上面参数中的工作队列 BlockingQueue workQueue 是一个阻塞队列。阻塞队列常用于生产者和消费者的场景,生产者向队列中添加元素,而消费者从队列中取出元素。阻塞队列支持两个附加操作:
- 在队列为空时,获取元素的消费者线程会等待队列变为非空,即等待生产线程放入元素。
- 当队列满时,存储元素的生产线程会等待队列可用,即消费线程取出元素。
线程池常用的阻塞队列有以下几种:
- SynchronousQueue:无界队列,提交的任务不会被保存,总是会马上提交执行。当创建的线程数大于 maximumPoolSize 时,直接执行拒绝策略抛出异常。
- ArrayBlockingQueue :有界队列,当有新的任务需要执行时,线程池会创建新的线程,直到创建的线程数量达到 corePoolSize 时,则会将新的任务加入到等待队列中。若等待队列已满,即超过ArrayBlockingQueue的初始化容量capacity,则继续创建线程,直到线程数量达到maximumPoolSize最大线程数,若大于 maximumPoolSize,则执行拒绝策略。
- LinkedBlockingQueue :一个用链表实现的有界阻塞队列,不设置容量时视为无界队列,可以无限添加新任务。线程池创建的最大线程数量就是 corePoolSize 设置的数量,即 maximumPoolSize 参数是无效的,当线程数达到 corePoolSize 后,就不再增加,新的任务直接进入队列等待。
- PriorityBlockingQueue :优先任务队列,特殊的无界队列,其他队列是按照先进先出的规则处理任务,而PriorityBlockingQueue队列可以自定义规则根据任务的优先级顺序先后执行。
- DelayQueue :无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。
JDK中还提供了其他的阻塞队列,如 LinkedTransferQueue,LinkedBlockingDeque 等,这里不常用就不再详述。
拒绝策略handler
为防止资源被耗尽,就需要在定义线程池时设置合理的拒绝策略,尤其是在有界队列当中。
- AbortPolicy策略:会直接抛出异常,阻止系统正常工作。
- CallerRunsPolicy策略:如果线程池的线程数量达到上限,该策略会把任务队列中的任务放在调用者线程当中运行。
- DiscardOledestPolicy策略:该策略会丢弃任务队列中最老的一个任务,即任务队列中最先被添加进去的,也是即将要被执行的那个任务,并尝试再次提交。
- DiscardPolicy策略:丢弃无法处理的任务,不予任何处理。使用此策略时,业务场景中需允许任务的丢失。
线程池线程数量取值
java并发编程中给出了一个公式:
Nthread = Ncpu * Ucpu * (1+ W/C)
- Nthreads:线程数量
- Ncpu:CPU的数量,Runtime.getRuntime().availableProcessors()
- Ucpu:CPU使用率,范围在[0,1]
- W/C:等待时间与计算时间的比率
对于线程的数量其实重要的是分辨出是IO密集型,还是计算密集型
另一种简单的计算方法:
如果是IO密集型应用,则线程池大小设置为2N+1;
如果是CPU密集型应用,则线程池大小设置为N或不超过N+1;例如我们4核的CPU,那么设置线程数不超过5为宜。
ThreadPoolExecutor扩展
ThreadPoolExecutor是利用beforeExecute()、afterExecute()和terminated()接口实现对线程监控或其他操作。
- beforeExecute:线程池中任务运行前执行
- afterExecute:线程池中任务运行完毕后执行
- terminated:线程池结束后执行
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4, 10, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10),
namedThreadFactory,
new ThreadPoolExecutor.AbortPolicy()){
protected void beforeExecute(Thread t,Runnable r) {
System.out.println(String.format("[%s] 准备执行!", t.getName()));
}
protected void afterExecute(Runnable r,Throwable t) {
System.out.println("执行完毕!");
}
protected void terminated() {
System.out.println("线程池结束");
}
};
SpringBoot中线程池的使用
前面讲的 ThreadPoolExecutor 是JDK中提供的线程池类,而 ThreadPoolTaskExecutor 是Spring为我们封装的线程池类。在SpringBoot 中我们可以直接使用后者,因为它本身也是基于ThreadPoolExecutor实现的。
@Configuration
@EnableAsync
public class ThreadPoolExecutorConfig {
@Bean("threadPoolExecutor")
public Executor asyncThreadPoolExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 设置核心线程数
executor.setCorePoolSize(4);
// 设置最大线程数
executor.setMaxPoolSize(10);
// 设置工作队列大小
executor.setQueueCapacity(Integer.MAX_VALUE);
// 设置线程空闲时间,单位:秒
executor.setKeepAliveSeconds(60);
// 设置线程池线程名称前缀
executor.setThreadNamePrefix("test-pool-thread-");
// 设置拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//执行初始化
executor.initialize();
return executor;
}
}
initialize方法初始化线程池,实际上就是new ThreadPoolExecutor,它的阻塞队列使用的是 LinkedBlockingQueue 或者 SynchronousQueue,视初始化时是否设置了容量而定。
源码就不贴了,接下来我们就可以定义异步的方法,来执行我们的业务逻辑。
//定义接口
public interface AsyncExecuteService {
/**
* 执行异步任务
*/
void asyncExecute();
}
//接口实现类
@Slf4j
@Service
public class AsyncExecuteServiceImpl implements AsyncExecuteService {
@Override
@Async("threadPoolExecutor")
public void asyncExecute() {
log.info("调用asyncExecute开始");
//@TODO业务逻辑处理
//...
log.info("调用asyncExecute结束");
}
}
@Async("threadPoolExecutor") 注解标识从线程池中获取线程来执行该方法。
扩展一下,ThreadPoolTaskExecutor 类本身就已实现了诸如submit,execute等方法。若想对线程池进行监控,可以继承ThreadPoolTaskExecutor,自定义类并重写其中的方法实现,这里就不在赘述。
好了,以上就是能想到的关于线程池的常用知识点,表述不全,具体使用还需根据场景决定。