为什么需要线程池
“池”化,相当于计划经济,预先分配。
如果不使用线程池,那么每个任务都要新开一个线程处理。
问题在于,反复创建并销毁线程会带来开销问题。
定义
提前创建若干个线程,当有任务需要处理时,就处理,处理完成并不会被销毁,而是等待下任务。
执行步骤
- 如果 线程数<核心线程数,创建新的线程,并执行任务.ps:即使其他线程处于空闲,也会创建线程
- 如果当前线程数>核心线程数,尝试把任务加入任务队列。
- 如果任务队列已满,入队失败,并且 当前线程数< 最大线程数,创建新线程执行任务
- 如果 线程数>最大线程数,执行拒绝策略
判断顺序 corePoolSize->workQueue->maxPoolSize
当corePoolSize和maximumPoolSize相同时,就可以创建固定大小的线程池
如果配置的corePoolSize比maximumPoolSize 大,会报错
创建线程池
//ThreadPoolExecutor
package demo;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author
* @create 2020-02-28 16:19
* @description
*/
public class DemoThreadPoolExecutor {
private static final int CORE_POOL_SIZE = 5;
private static final int MAX_POOL_SIZE = 10;
private static final int QUEUE_CAPACITY = 100;
private static final Long KEEP_ALIVE_TIME = 1L;
public static void main(String[] args) {
// 使用线程池来创建线程
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
// 核心线程数为 :5
CORE_POOL_SIZE,
// 最大线程数 :10
MAX_POOL_SIZE,
// 等待时间 :1L
// 保持存活时间
KEEP_ALIVE_TIME,
// 等待时间的单位 :秒
TimeUnit.SECONDS,
// 任务队列为 ArrayBlockingQueue,且容量为 100
new ArrayBlockingQueue<>(QUEUE_CAPACITY),
//线程工厂
threadFactory,
// 饱和策略为 CallerRunsPolicy
new ThreadPoolExecutor.CallerRunsPolicy()
);
for(int i = 0; i < 15; i++) {
// 创建WorkerThread对象,该对象需要实现Runnable接口
Runnable worker = new DemoThread("任务" + i);
// 通过线程池执行Runnable
threadPoolExecutor.execute(worker);
}
// 终止线程池
threadPoolExecutor.shutdown();
while (!threadPoolExecutor.isTerminated()) {
}
System.out.println("全部线程已终止");
}
}
execute和submit
两个都是执行线程池的,区别在于submit可以接收返回值
shutdown和shutdownow
- shutdown() 不会立即关闭,而是等待所有任务队列中的任务处理完,就不接受新的任务了
- shutdownNow() 执行该方法,立刻变成stop,试图停止正在执行的线程,不在处理等待的线程,并返回未执行的任务
isShutdown(),isTerminated()
ishutdown 开始停止
isterminated 完全停止,所有任务全部执行完
awaitTemination:等待一段时间判断线程池是否关闭
shutdownNow
常用参数(构造函数)
介绍
- corePoolSize 核心线程数,定义最小可同时运行的线程数目
- maximumPoolSize 最大线程数,当队列中存放的任务达到队列容量时,当前可以同时运行的线程数量会扩大到最大线程数,超过默认提示RejectedExecutionException异常
- keepAliveTime 等待时间 当线程数大于核心线程数时,多余的空闲线程的存活时间,即指定时间内将还未接收任务的线程销毁
- unit 时间单位 TimeUnit.SECONDS、TimeUnit.MINUTES、TimeUnit.HOURS、TimeUnit.DAYS 等等
- workQueue 任务队列,存储等待执行任务的线程 ,有三种最常见的队列
- SynchronousQueue,直接交接,无容量
- LinkedBlockingQueue 无界队列。此时maximumPoolSize 多大都没用,都会放进无界队列
- ArrayBlockingQueue 有界队列
- threadFactory 线程工厂,创建线程,一般默认。默认是Executors.defaultThreadFactory(),创建出来的都在同一个线程组,拥有同样的优先级。自己指定的话可以改变线程名,线程组,优先级,是否守护线程等等
- handler 拒绝策略,饱和策略,当提交任务过多而不能即使处理时,定制策略来处理
- ThreadPoolExecutor.AbortPolicy: 抛出 RejectedExecutionException 来拒绝新任务的处理,是 Spring 中使用的默认拒绝策略。
- ThreadPoolExecutor.CallerRunsPolicy: 线程调用运行该任务的 execute 本身,也就是直接在调用 execute 方法的线程中运行 (run) 被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度,但可能造成延迟。若应用程序可以承受此延迟且不能丢弃任何一个任务请求,可以选择这个策略。
- ThreadPoolExecutor.DiscardPolicy: 不处理新任务,直接丢弃掉。
- ThreadPoolExecutor.DiscardOldestPolicy: 此策略将丢弃最早的未处理的任务请求。
- 继承RejectedExecutionHandler,实现自定义拒绝策略.void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
规约
- 必须通过线程池来创建线程,不允许显式创建
- 必须通过构造方法创建,而不是executors
Executors 返回线程池对象的弊端如下: FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列长度为 Integer.MAX_VALUE,可能会堆积大量请求,从而导致 OOM。 CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE,可能会创建大量线程,从而导致 OOM
添加规则
- 如果线程数小于核心线程数,即使其他工作线程处于空闲状态,也会创建新的线程来运行
- 如果线程数大于等于corePoolSize,小于maximumPoolSize 将任务放入队列
- 如果队列已满,并且线程数小于maxPoolSize,则创建新线程来运行
- 如果队列已满,并且线程数大于或等于maxPoolSize,则会拒绝任务
非核心线程何时回收
如果经过 keepAliveTime 时间后,超过核心线程数的线程还没有接受到新的任务,就会被回收。
allowsCoreThreadTimeOut设置为true,核心线程也会回收
核心线程如何回收
- java.util.concurrent.ThreadPoolExecutor#getTask
最终会执行到这个方法:
- java.util.concurrent.ThreadPoolExecutor#processWorkerExit
线程池的异常处理
当线程池处理任务的时候,任务代码可能抛出RuntimeException,抛出异常后,线程池可能捕获异常,也可能创建一个新的线程来替代异常的线程。我们可能无法感知,所以需要考虑异常的情况
线程池submit的执行过程
//构造feature对象
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
//线程池执行
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
// 注意这一行代码,添加到等待队列成功后,判断当前池内线程数是否为0,
//如果是则创建一个firstTask为null的worker,这个worker会从等待队列中获取任务并执行。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
//捕获异常
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
处理线程池异常的方案
- 在任务代码块 try-catch
- 通过Future对象的get方法接收抛出的异常,再处理
future.get();
- 为工作者线程设置UncaughtExceptionHandler,在uncaughtException方法中处理异常
ExecutorService threadPool = Executors.newFixedThreadPool(1, r -> {
Thread t = new Thread(r);
t.setUncaughtExceptionHandler(
(t1, e) -> {
System.out.println(t1.getName() + "线程抛出的异常"+e);
});
return t;
});
threadPool.execute(()->{
Object object = null;
System.out.print("result## " + object.toString());
});
- 重写ThreadPoolExecutor的afterExecute方法,处理传递的异常引用
class ExtendedExecutor extends ThreadPoolExecutor {
// 这可是jdk文档里面给的例子。。
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t == null && r instanceof Future<?>) {
try {
Object result = ((Future<?>) r).get();
} catch (CancellationException ce) {
t = ce;
} catch (ExecutionException ee) {
t = ee.getCause();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // ignore/reset
}
}
if (t != null)
System.out.println(t);
}
}}
线程池的工作队列
- ArrayBlockingQueue:有界队列,用数组实现的有界阻塞队列,按FIFO排序量
- LinkedBlockingQueue:(可设置容量队列)基于链表结构的阻塞队列,按FIFO排序任务,容量可以选择进行设置,不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE,吞吐量通常要高于ArrayBlockingQuene;newFixedThreadPool线程池使用了这个队列
- DelayQueue:(延迟队列)是一个任务定时周期的延迟执行的队列。根据指定的执行时间从小到大排序,否则根据插入到队列的先后排序。newScheduledThreadPool线程池使用了这个队列。-
- PriorityBlockingQueue:PriorityBlockingQueue(优先级队列)是具有优先级的无界阻塞队列;
- SynchronousQueue:同步队列)一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作( 每个put()都必须等到一个take(),才能解除阻塞, 反之亦然),否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene,newCachedThreadPool线程池使用了这个队列。
线程池线程复用的原理
任务进来之后,线程池会逐一判断 corePoolSize 、workQueue 、maxPoolSize ,如果依然不能满足需求,则会拒绝任务。
execute方法
public void execute(Runnable command) {
//如果传入的Runnable的空,就抛出异常
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//判断当前线程数是否小于核心线程数,如果小于核心线程数就调用 addWorker() 方法增加一个 Worker
//这里的 Worker 就可以理解为一个线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//检查线程池状态是否为 Running,如果线程池状态是 Running 就把任务放入任务队列中,
//也就是 workQueue.offer(command)。如果线程池已经不处于 Running 状态,
//说明线程池被关闭,那么就移除刚刚添加到任务队列中的任务,并执行拒绝策略
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
//检查当前线程数为 0,也就是 workerCountOf**(recheck) == 0,
//那就执行 addWorker() 方法新建线程。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
adworker: addWorker 方法的主要作用是在线程池中创建一个线程并执行第一个参数传入的任务,它的第二个参数是个布尔值,如果布尔值传入 true 代表增加线程时判断当前线程是否少于 corePoolSize,小于则增加新线程,大于等于则不增加;同理,如果传入 false 代表增加线程时判断当前线程是否少于 maxPoolSize,小于则增加新线程,大于等于则不增加,所以这里的布尔值的含义是以核心线程数为界限还是以最大线程数为界限进行是否新增线程的判断。addWorker() 方法如果返回 true 代表添加成功,如果返回 false 代表添加失败。
在 execute 方法中,多次调用 addWorker 方法把任务传入,
addWorker 方法会添加并启动一个 Worker,
这里的 Worker 可以理解为是对 Thread 的包装,Worker 内部有一个 Thread 对象,它正是最终真正执行任务的线程,所以一个 Worker 就对应线程池中的一个线程,addWorker 就代表增加线程。线程复用的逻辑实现主要在 Worker 类中的 run 方法里执行的 runWorker 方法中
runWorker(Worker w) {
Runnable task = w.firstTask;
while (task != null || (task = getTask()) != null) {
try {
task.run();
} finally {
task = null;
}
}
}
- 通过取 Worker 的 firstTask 或者通过 getTask 方法从 workQueue 中获取待执行的任务。
- 直接调用 task 的 run 方法来执行具体的任务(而不是新建线程)。
worker类
使用AQS实现,Worker 内部有一个 Thread 对象,它正是最终真正执行任务的线程,所以一个 Worker 就对应线程池中的一个线程。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
钩子方法
继承ThreadPoolExecutor,实现beforeExecute
package threadpool;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* 描述: 演示每个任务执行前后放钩子函数
*/
public class PauseableThreadPool extends ThreadPoolExecutor {
private final ReentrantLock lock = new ReentrantLock();
private Condition unpaused = lock.newCondition();
private boolean isPaused;
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory,
handler);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
lock.lock();
try {
while (isPaused) {
unpaused.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
private void pause() {
lock.lock();
try {
isPaused = true;
} finally {
lock.unlock();
}
}
public void resume() {
lock.lock();
try {
isPaused = false;
unpaused.signalAll();
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10l,
TimeUnit.SECONDS, new LinkedBlockingQueue<>());
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("我被执行");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
for (int i = 0; i < 10000; i++) {
pauseableThreadPool.execute(runnable);
}
Thread.sleep(1500);
pauseableThreadPool.pause();
System.out.println("线程池被暂停了");
Thread.sleep(1500);
pauseableThreadPool.resume();
System.out.println("线程池被恢复了");
}
}
beforeExecute
任务的拒绝
拒绝策略
- ThreadPoolExecutor.AbortPolicy: 抛出 RejectedExecutionException 来拒绝新任务的处理,是 Spring 中使用的默认拒绝策略。
- ThreadPoolExecutor.CallerRunsPolicy: 线程调用运行该任务的 execute 本身,也就是直接在调用 execute 方法的线程中运行 (run) 被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度,但可能造成延迟。若应用程序可以承受此延迟且不能丢弃任何一个任务请求,可以选择这个策略。
- ThreadPoolExecutor.DiscardPolicy: 不处理新任务,直接丢弃掉,默默地丢弃,不会通知你
- ThreadPoolExecutor.DiscardOldestPolicy: 此策略将丢弃最早的未处理的任务请求。
拒绝时机
- Executor关闭,提交新任务会被拒绝
- 最大线程数并且工作队列有界队列并且已经饱和了
几种常见的线程池
tips:当最大线程数和核心线程数相等,则keepAliveTime失效,当keepAliveTime为1的时候,线程任务执行完会立即回收。
newFixedThreadPool固定大小线程池
//核心线程数和最大线程数一样,为nthread,0毫秒回收一次,就是不回收,使用LinkedBlockingQueue队列
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(
nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
newCachedThreadPool可缓冲线程池
核心线程数0,最大线程数Integer.MAX,每60秒回收一次,使用SynchronousQueue队列。是一个仅包含一个元素的队列,插入元素会阻塞,直到另一个线程取出元素。
newScheduledThreadPool定时任务线程池
它的核心线程数是corePoolSize变量,需要用户自己决定,最大线程数是integer的最大值(会OOM),同样,它的每隔0毫秒回收一次线程,换句话说就是不回收线程。使用了DelayedWorkQueue队列,该队列具有延时的功能。
newSingleThreadExecutor单个线程池
其实,跟上面的newFixedThreadPool是一样的,稍微有一点区别是核心线程数 和 最大线程数 都是1
newSingleThreadScheduledExecutor单线程定时任务线程池
该线程池是对上面介绍过的ScheduledThreadPoolExecutor定时任务线程池的简单封装,核心线程数固定是1,其他的功能一模一样
newWorkStealingPool窃取线程池
底层是通过ForkJoinPool类来实现的。会创建一个含有足够多线程的线程池,来维持相应的并行级别,它会通过工作窃取的方式,使得多核的 CPU 不会闲置,总会有活着的线程让 CPU 去运行。
- 产生子任务的话,适合这个,处理树结构,矩阵结构
- 窃取能力,线程之间会合作,适合无锁的任务,不保证执行顺序
最佳线程数
- IO密集型:频繁读取磁盘数据,调用网络接口 2N
- CPU密集型:非常复杂的逻辑,循环次数很多 N+1
//N的值
int availableProcessors = Runtime.getRuntime().availableProcessors();
//最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目
//cpu核心数(1+平均等待时间/平均工作时间)
线程池的实现原理
- 线程池管理器
- 工作线程
- 任务队列
- 任务接口(task)
线程池任务复用的原理
相同线程执行不同任务
主要就是worker类
runWorker
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//ctl记录了线程状态和线程数
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
//command是任务,true/false,判断是否小于核心/最大
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
线程的状态
注意点
- 避免任务堆积 fixedpool
- 避免线程数过度增加 cachedpool
- 排查线程泄露
动态化线程池
部分问题
如果corepoolsize=0,会怎么样
这里只讨论jdk1.6之后
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
// 注意这一行代码,添加到等待队列成功后,判断当前池内线程数是否为0,如果是则创建一个firstTask为null的worker,这个worker会从等待队列中获取任务并执行。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
- 线程池提交任务后,首先判断当前池中线程数是否小于corePoolSize。
- 如果小于则尝试创建新的线程执行该任务;否则尝试添加到等待队列。
- 如果添加队列成功,判断当前池内线程数是否为0,如果是则创建一个firstTask为null的worker,这个worker会从等待队列中获取任务并执行。
- 如果添加到等待队列失败,一般是队列已满,才会再尝试创建新的线程。
- 但在创建之前需要与maximumPoolSize比较,如果小于则创建成功。
- 否则执行拒绝策略。
如果corePoolSize=0,提交任务时如果线程池为空,则会立即创建一个线程来执行任务(先排队再获取);如果提交任务的时候,线程池不为空,则先在等待队列中排队,只有队列满了才会创建新线程。
所以,优化在于,在队列没有满的这段时间内,会有一个线程在消费提交的任务;1.6之前的实现是,必须等队列满了之后,才开始消费。
线程池应该手动创建还是自动创建
应该要手动创建
线程池中的ctl是用来干嘛的
ctl就是controlstate
ctl 是一个涵盖了两个概念的原子整数类,它将工作线程数和线程池状态结合在一起维护,低 29 位存放 workerCount,高 3 位存放 runState。
线程池有多少种状态
- RUNNING:能接受新任务,并处理阻塞队列中的任务
- SHUTDOWN:不接受新任务,但是可以处理阻塞队列中的任务
- STOP:不接受新任务,并且不处理阻塞队列中的任务,并且还打断正在运行任务的线程,就是直接撂担子不干了!
- TIDYING:所有任务都终止,并且工作线程也为0,处于关闭之前的状态
- TERMINATED:已关闭。
如何修改原生线程池,使得可以先拉满线程数再入任务队列排队?
execute 方法想必大家都不陌生,就是给线程池提交任务的方法。在这个方法中可以看到只要在 offer 方法内部判断此时线程数还小于最大线程数的时候返回 false,即可走下面 else if 中 addWorker (新增线程)的逻辑,如果数量已经达到最大线程数,直接入队即可。
原生线程池的核心线程一定伴随着任务慢慢创建的吗?
- prestartCoreThread:启动一个核心线程
- prestartAllCoreThreads :启动所有核心线程
tomcat的线程池
Tomcat 采用扩展方案,踩在 JDK 线程池的肩膀上,扩展 JDK 原生线程池,队列用的是自己的task queue。
Tomcat线程池有如下参数:
maxThreads, 最大线程数,tomcat能创建来处理请求的最大线程数
maxSpareTHreads, 最大空闲线程数,在最大空闲时间内活跃过,但现在处于空闲,若空闲时间大于最大空闲时 间,则回收,小于则继续存活,等待被调度。
minSpareTHreads,最小空闲线程数,无论如何都会存活的最小线程数
acceptCount, 最大等待队列数 ,请求并发大于tomcat线程池的处理能力,则被放入等待队列等待被处理。
maxIdleTime, 最大空闲时间,超过这个空闲时间,且线程数大于最小空闲数的,都会被回收
各家线程池的拒绝策略
Dubbo
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
...
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String msg = String.format("Thread pool is EXHAUSTED!" + ...);
logger.warn(msg);
dumpJStack();
dispatchThreadPoolExhaustedEvent(msg);
throw new RejectedExecutionException(msg);
}
...
}
Dubbo的拒绝策略是抛出异常RejectedExecutionException,同时还会做一件事情 - dumpJStack(),记录下当时的JVM线程堆栈。(dubbo3.0会判断是否要dump)
打印堆栈信息 可以更好的帮助排查问题
private void dumpJStack() {
//一些dump时间间隔和并发控制
...
//新建单线程池用于dump堆栈
ExecutorService pool = Executors.newSingleThreadExecutor();
pool.execute(() -> {
...
try (FileOutputStream jStackStream = new FileOutputStream(
new File(dumpPath, "Dubbo_JStack.log" + "." + dateStr))) {
JVMUtil.jstack(jStackStream);
} catch (Throwable t) {
logger.error("dump jStack error", t);
}
...
});
...
}
RocketMQ
RocketMQ通过BrokerFixedThreadPoolExecutor继承封装了一层ThreadPoolExecutor,上层可以自行传入参数,其中也包含了可配置的RejectedExecutionHandler。
实际在Broker创建消息处理的不同线程池时,并没有指定特殊的拒绝策略,所以使用的是默认的AbortPolicy,即抛出异常。
同时为了避免任务溢出,为每个线程池默认设置了较大的任务队列大小。
Netty
以EventLoopGroup为例,线程池的拒绝策略默认使用RejectedExecutionHandlers,通过单例模式提供Handler进行处理。
public final class RejectedExecutionHandlers {
private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() {
@Override
public void rejected(Runnable task, SingleThreadEventExecutor executor) {
throw new RejectedExecutionException();
}
};
private RejectedExecutionHandlers() { }
public static RejectedExecutionHandler reject() {
return REJECT;
}
...
}