Java线程池笔记
在软件开发中,池一直都是一种非常优秀的设计思想,通过建立池可以有效的利用系统资源,节约系统性能。Java 中的线程池就是一种非常好的实现,从 JDK 1.5 开始 Java 提供了一个线程工厂 Executors 用来生成线程池,通过 Executors 可以方便的生成不同类型的线程池。但是要更好的理解使用线程池,就需要了解线程池的配置参数意义以及线程池的具体工作机制
(1) 为什么要使用线程池
(1.1) 使用线程池的好处
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
- 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。
(1.2) 线程池解决了什么问题
多线程流行的原因是因为他能够处理与多进程一样的功能,并且创建线程耗费的时间、资源少,共享进程的资源。多线程有各自的线程ID,栈,PC,寄存器集合组成。共享代码段,文件,数据。
在实际使用中,每个请求创建新线程的服务器在创建和销毁线程上花费的时间和消耗的系统资源,甚至可能要比花在处理实际的用户请求的时间和资源要多得多。除了创建和销毁线程的开销之外,活动的线程也需要消耗系统资源。如果在一个JVM里创建太多的线程,可能会导致系统由于过度消耗内存或“切换过度”而导致系统资源不足。为了防止资源不足,服务器应用程序需要一些办法来限制任何给定时刻处理的请求数目,尽可能减少创建和销毁线程的次数,特别是一些资源耗费比较大的线程的创建和销毁,尽量利用已有对象来进行服务,这就是“池化资源”技术产生的原因。
进程是资源管理的最小单元;而线程是程序执行的最小单元。
(2) 线程池的使用
(2.1) 使用
/**
* 参数初始化
*/
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
/**
* 核心线程数量大小
*/
private static final int corePoolSize = Math.max(2, Math.min(CPU_COUNT - 1, 4));
/**
* 线程池最大容纳线程数
*/
private static final int maximumPoolSize = CPU_COUNT * 2 + 1;
/**
* 线程空闲后的存活时长
*/
private static final int keepAliveTime = 30;
/**
* 任务过多后,存储任务的一个阻塞队列
*/
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
/**
* 线程的创建工厂
*/
ThreadFactory threadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "AdvacnedAsyncTask #" + mCount.getAndIncrement());
}
};
/**
* 线程池任务满载后采取的任务拒绝策略
*/
RejectedExecutionHandler rejectHandler = new ThreadPoolExecutor.CallerRunsPolicy();
/**
* 线程池对象,创建线程
*/
ThreadPoolExecutor execute = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
workQueue,
threadFactory,
rejectHandler
);
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
/**
* ThreadPoolTest
*
* @author: weikeqin.cn@gmail.com
* @date: 2017-10-23 07:52
**/
public class ThreadPoolTest {
private transient static final Logger log = LoggerFactory.getLogger(ThreadPoolTest.class);
/**
*
*/
@Test
public void testExecute() {
log.info("开始");
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(5));
for (int i = 0; i < 15; i++) {
MyTask myTask = new MyTask(i);
executor.execute(myTask);
log.info("线程池中线程数目:{},队列中等待执行的任务数目:{},已执行玩别的任务数目:{}", executor.getPoolSize(), executor.getQueue().size(), executor.getCompletedTaskCount());
}
while (!executor.isTerminated()) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("还有线程未执行完成");
}
executor.shutdown();
log.info("线程池关闭。");
log.info("执行完了。");
}
/**
* Runnable任务
*/
class MyTask implements Runnable {
private int taskNum;
public MyTask(int num) {
this.taskNum = num;
}
/**
*
*/
@Override
public void run() {
log.info("正在执行task " + taskNum);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.error(" {}", e);
}
log.info("task " + taskNum + "执行完毕");
}
}
/**
*
*/
@Test
public void testSubmit() {
ExecutorService executorService = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(5));
List<Future<String>> resultList = new ArrayList<>();
// 创建10个任务并执行
for (int i = 0; i < 10; i++) {
// 使用ExecutorService执行Callable类型的任务,并将结果保存在future变量中
Future<String> future = executorService.submit(new TaskWithResult(i));
// 将任务执行结果存储到List中
resultList.add(future);
}
// 遍历任务的结果
for (Future<String> fs : resultList) {
try {
// Future返回如果没有完成,则一直循环等待,直到Future返回完成
while (!fs.isDone()) {
TimeUnit.SECONDS.sleep(1);
}
// 打印各个线程(任务)执行的结果
log.info("执行结果:{}", fs.get());
} catch (InterruptedException e) {
log.error("", e);
} catch (ExecutionException e) {
log.error("", e);
}
}
executorService.shutdown();
}
/**
* Callable任务
*/
class TaskWithResult implements Callable<String> {
private int id;
public TaskWithResult(int id) {
this.id = id;
}
/**
* 任务的具体过程,一旦任务传给ExecutorService的submit方法, 则该方法自动在一个线程上执行
*/
@Override
public String call() throws Exception {
log.info("call()方法被自动调用。 当前线程名:{}", Thread.currentThread().getName());
// 该返回结果将被Future的get方法得到
return "call()方法被自动调用,任务返回的结果是:" + id + " " + Thread.currentThread().getName();
}
} // end class TaskWithResult
} // end class
(3) 源码解读
在Java中,线程池的概念是
Executor
这个接口,常用的是ThreadPoolExecutor
类,学习Java中的线程池,就可以直接学习ThreadPoolExecutor
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters and default thread factory and rejected execution handler.
* It may be more convenient to use one of the {@link Executors} factory
* methods instead of this general purpose constructor.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters and default rejected execution handler.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters and default thread factory.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
线程池工作原则
- 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
- 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
- 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
- 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
- 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
任务队列 BlockingQueue
任务队列 workQueue 是用于存放不能被及时处理掉的任务的一个队列,它是 一个 BlockingQueue 类型。
关于 BlockingQueue,虽然它是 Queue 的子接口,但是它的主要作用并不是容器,而是作为线程同步的工具,他有一个特征,当生产者试图向 BlockingQueue 放入(put)元素,如果队列已满,则该线程被阻塞;当消费者试图从 BlockingQueue 取出(take)元素,如果队列已空,则该线程被阻塞。(From 疯狂Java讲义)
(3.1) 参数介绍
corePoolSize 核心线程数
corePoolSize
线程池的核心线程数。
在没有设置 allowCoreThreadTimeOut 为 true 的情况下,核心线程会在线程池中一直存活,即使处于闲置状态。
maximumPoolSize
线程池所能容纳的最大线程数。
maximumPoolSize
线程池所能容纳的最大线程数。
当活动线程(核心线程+非核心线程)达到这个数值后,后续任务将会根据 RejectedExecutionHandler 来进行拒绝策略处理。
keepAliveTime
非核心线程闲置时的超时时长。
keepAliveTime
非核心线程闲置时的超时时长。
超过该时长,非核心线程就会被回收。若线程池通过 allowCoreThreadTimeOut() 方法设置 allowCoreThreadTimeOut 属性为 true,则该时长同样会作用于核心线程,AsyncTask 配置的线程池就是这样设置的。
unit
时长对应的单位
unit
TimeUnit是一个枚举类型,其包括:
NANOSECONDS : 1纳秒 = 1微秒 / 1000
MICROSECONDS : 1微秒 = 1毫秒 / 1000
MILLISECONDS : 1毫秒 = 1秒 /1000
SECONDS : 秒
MINUTES : 分
HOURS : 小时
DAYS : 天
workQueue
线程池中的任务队列
workQueue
线程池中的任务队列,通过线程池的 execute() 方法提交的 Runnable 对象会存储在该队列中。当所有的核心线程都在干活时,新添加的任务会被添加到这个队列中等待处理,如果队列满了,则新建非核心线程执行任务。
常用的workQueue类型
SynchronousQueue
SynchronousQueue:这个队列接收到任务的时候,会直接提交给线程处理,而不保留它,
如果所有线程都在工作怎么办?
那就新建一个线程来处理这个任务!
所以为了保证不出现<线程数达到了maximumPoolSize而不能新建线程>的错误,使用这个类型队列的时候,maximumPoolSize一般指定成Integer.MAX_VALUE,即无限大
LinkedBlockingQueue
LinkedBlockingQueue:这个队列接收到任务的时候,如果当前线程数小于核心线程数,则新建线程(核心线程)处理任务;
如果当前线程数等于核心线程数,则进入队列等待。
ArrayBlockingQueue
ArrayBlockingQueue:可以限定队列的长度,接收到任务的时候,
如果没有达到corePoolSize的值,则新建线程(核心线程)执行任务,
如果达到了,则入队等候,
如果队列已满,则新建线程(非核心线程)执行任务,
如果总线程数到了maximumPoolSize,并且队列也满了,则发生错误
DelayQueue
DelayQueue:队列内元素必须实现Delayed接口,这就意味着你传进去的任务必须先实现Delayed接口。这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务
ThreadFactory
线程工厂
ThreadFactory
线程工厂, 可以更改线程的名称,线程组,优先级,守护程序状态
RejectedExecutionHandler
任务拒绝策略
RejectedExecutionHandler
通过这个参数你可以自定义任务的拒绝策略。如果线程池中所有的线程都在忙碌,并且工作队列也满了(前提是工作队列是有界队列),那么此时提交任务,线程池就会拒绝接收。至于拒绝的策略,你可以通过 handler 这个参数来指定。ThreadPoolExecutor 已经提供了以下 4 种策略。
ThreadPoolExecutor.AbortPolicy 拒绝
ThreadPoolExecutor.AbortPolicy: 当线程池中的数量等于最大线程数时抛 java.util.concurrent.RejectedExecutionException 异常,涉及到该异常的任务也不会被执行,线程池默认的拒绝策略就是该策略。
ThreadPoolExecutor.DiscardPolicy() 默默丢弃
ThreadPoolExecutor.DiscardPolicy():当线程池中的数量等于最大线程数时,默默丢弃不能执行的新加任务,不报任何异常。
ThreadPoolExecutor.CallerRunsPolicy() 重试
ThreadPoolExecutor.CallerRunsPolicy():当线程池中的数量等于最大线程数时,重试添加当前的任务;它会自动重复调用execute()方法。
ThreadPoolExecutor.DiscardOldestPolicy() 抛弃最开始的任务
ThreadPoolExecutor.DiscardOldestPolicy(): 当线程池中的数量等于最大线程数时,抛弃线程池中工作队列头部的任务(即等待时间最久的任务),并执行新传入的任务。
我们可以通过调用线程池的shutdown或shutdownNow方法来关闭线程池,但是它们的实现原理不同,shutdown的原理是只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。shutdownNow的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。shutdownNow会首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表。
只要调用了这两个关闭方法的其中一个,isShutdown方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。至于我们应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow。
References
[1] Java线程池实现原理及其在美团业务中的实践
[2] 聊聊并发(三)Java线程池的分析和使用
[3] ThreadPoolExecutor里面4种拒绝策略(详细)
[4] Java并发编程:线程池的使用
[5] 线程池踩坑记 –load飙高的原因
[6] 操作系统之线程
[7] 为什么要使用线程池