线程池,指管理一组同构工作线程的资源池。线程池与工作队列密切相关,其中工作队列保存了所有等待执行的线程。工作者线程从工作队列中获取一个线程,执行任务,然后返回线程池并等待下一个任务。
1 创建线程池
一般一个简单线程池至少包含下列组成部分。
- 线程池管理器(ThreadPoolManager):创建线程池,销毁线程池,添加新任务
- 工作线程(WorkThread):线程池中线程
- 任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行。
- 任务队列:用于存放没有处理的任务。提供一种缓冲机制。
ExecutorService 包含运行/关闭/已终止三种状态
- 初始创建时处于运行状态
- 调用 shutdown、shutdownnow、awaitTermination 关闭,关闭后提交的任务由"拒绝执行处理器"处理,会抛弃任务,或使得 execute 方法抛出一个未受检查的 RejectedExecutionException
- 等所有任务都完成后,进入终止状态
1.1 Executor 类静态工厂方法
执行器(Executor)类有许多静态工厂方法用来构建线程池。
方法 | 描述 |
---|---|
newCachedThreadPool | 必要时创建新线程;空闲线程会被保留 60 秒 |
newFixedThreadPool | 线程包含固定数量的线程;空闲线程会一直被保留 |
newSingleThreadExecutor | 只有一个线程的“池”,该线程顺序执行每一行提交的任务 |
newScheduledThreadPool | 用于预定执行而构建的固定线程池,替代 java.util.Time |
newSingleThreadScheduledExecutor | 用于预定执行而构建的单线程“池” |
1.2 ThreadPoolExecutor 构造函数
ThreadPoolExecutor 为一些 Executor 提供了基本的实现,还可以通过 ThreadPoolExecutor 的构造函数来实例化一个对象,并根据自己的需求来定制。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
}
2 配置 ThreadPoolExecutor
2.1 线程池因素
- 核心线程数量(corePoolSize)
- 没有任务执行时线程池的大小
- 核心线程数没满的时候,有新任务来,即使有空闲核心线程,也要新建核心线程来执行,直到空闲线程数等于核心线程
- 核心线程数满了的时候,有新任务来,先加入到工作队列,工作队列满了才能创建更多线程
- 最大大小(maximumPoolSize)
- 可同时活动的线程数量的上限
- 工作队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。
- 存活时间(keepAliveTime)
- 空闲时间超过了存活时间的线程可被回收,在线程池超过了基本大小时此线程被终止。
Executor 的执行逻辑:
- 如果 当前活动线程数 < 指定的核心线程数,则创建并启动一个线程来执行新提交的任务(此时新建的线程相当于核心线程);
- 如果 当前活动线程数 >= 指定的核心线程数,且缓存队列未满,则将任务添加到缓存队列中;
- 如果 当前活动线程数 >= 指定的核心线程数,且缓存队列已满,则创建并启动一个线程来执行新提交的任务(此时新建的线程相当于非核心线程);
线程复用
线程执行时循环去队列取任务,没有任务了就进入阻塞状态,队列有新任务就被唤醒去执行任务,空闲时间超时就销毁
2.2 任务队列
ThreadPoolExecutor 允许提供一个 BlockingQueue 来保存等待执行的任务。
基本的任务排队方法有3种:
- 无界队列
newFixedThreadPool 和 newSingIeThreadExecutor默认将使用一个无界的 LinkedBlockingQueue。 - 有界队列
使用有界队列更稳妥,但需选择饱和策略。如 ArrayBlockingQueue、有界的 LinkedBlockingQueue、PriorityBlockingQueue。 - 同步移交
对于非常大的或者无界的线程池,可以通过使用 SynchronousQueue。任务不排队,直接从生产者移交给工作者线程。(newCachedThreadPool)
2.3 饱和策略
ThreadPoolExecutor 的饱和策略通过调用 setRejectedExecutionHandler 来修改。
- Abort(中止): 默认的饱和策略,抛出未检查的 RejectedExecutionException。调用者可以捕获处理
- Discard(抛弃): 悄悄抛弃任务
- DiscardOldest(抛弃最旧的): 抛弃下一个将被执行的任务,然后尝试重新提交新的任务。(不要和优先级队列放在一起使用)
- CallerRuns(调用者运行): 将任务回退到调用者,在调用者线程执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()) ;
2.4 线程工厂
如在应用程序中需要利用安全策略来控制访问权限,可用 Executor 中的 privilegedThreadFactory 工厂来定制自己的线程工厂。
public interface ThreadFactory {
Thread newThread(Runnable r);
}
2.5 扩展 ThreadPoolExecutor
ThreadPoolExecutor 是可扩展的,提供了几个可以在子类化中改写的方法:beforeExecute、 afterExecute 和 terminated。可以添加日志、计时 监视、通知等功能
private class SaveOlderCategoryTask implements Callable<String> {
DcdCategory category;
SaveOlderCategoryTask(DcdCategory category) {
this.category = category;
}
@Override
public String call() {
Long oldMinBehotTime = getMinBehotTime(category.getId());
saveCategory(category, oldMinBehotTime, 0L);
return "complete";
}
}
public static void doExecutor(Collection<FutureTask> list) {
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("dcd-pool-%d").build();
//Common Thread Pool
ExecutorService pool = new ThreadPoolExecutor(5, list.size(),
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
list.forEach(pool::submit);
list.forEach(e -> {
try {
log.info((String) e.get());
} catch (InterruptedException | ExecutionException e1) {
e1.printStackTrace();
}
});
pool.shutdown();
}
3 提交任务
- submit():将一个 Runnable 对象或 Callable 对象提交给 ExecutorService,会得到一个 Future 对象,可用来査询该任务的状态。
3.1 控制任务组
使用执行器有更有意义的原因,控制一组相关任务。
- shutdownNow 方法:取消所有的任务。
- invokeAny 方法:提交所有对象到一个 Callable 对象的集合中,并返回某个已经完成了的任务的结果。
- invokeAll 方法:提交所有对象到一个 Callable 对象的集合中并返回一个 Future 对象的列表,代表所有任务的解决方案。当计算结果可获得时,可以像下面这样对结果进行处理:
List<Callable<T>> tasks = ...;
List<Future<T>> results = executor.invokeAll(tasks);
for(Future<T> result : results)
processFurther(result.get());
可以用 ExecutorCompletionService 来进行排列。
ExecutorCompletionService<T> service = new ExecutorCompletionService<>(executor);
for (Callalbe<T> task : tasks) {
service.submit(task);
}
for (int i = 0; i < tasks.size(); i++) {
processFurther(service.take().get());
}
3.2 CompletionService
1)CompletableFuture
CompletableFuture<String> contents = readPage(url);
CompletableFuture<List<URL>> links = contents.thenApply(Parse::getLinks);
thenApply 方法不会阻塞。它会返回一个 future。第一个 future 完成时,其结果会提供给 getLinks 方法,这个方法的返回值是最终的结果。
从概念上讲,CompletableFuture 是一个简单 API,不过有很多不同方法来组合可完成 future。
2)CompletionService
- CompletionService 将 Executor 和 BlockingQueue 的功能融合,执行 Callable 任务,使用类似队列的 take 和 poll 等方法获得封装为 Future已完成的结果。
- ExecutorCompletionService 实现了 CompletionService 并将计算部分委托给 Executor。
- 实现:
- 在构造函数中创建一个 BockingQueue 保存计算完成结果,计算完成时调用 Future-Task 的 done 方法。
- 当提交任务时,首先将任务包装为一个QueueingFuture,FutureTask的子类,然后改写子类的 done 方法将结果放入 BlockingQueue
- take 和 poll 方法委托给了 BlockingQueue,在给出结果之阻塞
Future 调用 get() 获取值会阻塞住调用线程,为了解决这个问题,1.8加入了 CompletableFuture 类。与事件处理器不同,CompletableFuture 可以“组合”(composed)。
3.3 递归算法的并行化
当串行循环中的各个迭代操作之间彼此独立,并且每个迭代操作执行的工作量比管理一个新任务时带来的开销更多,那么这个串行循环就适合并行化;
void processSequntially(List<Element> elements){
for(Element e:elements){
process(e);
}
}
void processInParallel(Executor exec,List<Element> elements){
for(final Element e:elements){
exec.execute(new Runnable(){
public void run(){
process(e);
}
});
}
}
在一些递归设计中同样可以采用循环并行化的方法进行优化;
public<T> void sequentialRecursive(List<Node> nodes,Collection<T> results){
for(Node<T> n:nodes){
results.add(n.compute());
sequentialRecursive(n.getChildren(),results);
}
}
public<T> void parrallelRecursive(final Executor exec,List<Node<T>> nodes,final Collection<T> results){
//遍历过程串行
for(final Node<T> n:nodes){
exec.execute(new Runnable(){
public void run(){
//compute并行计算
results.add(n.compute());
}
});
parrallelRecursive(exec,n.getChildren(),results);
}
}
等待通过并行方式计算的结果
public<T> Collection<T> getParrallelResults(List<Node<T>> nodes) throws InterruptedException{
ExecutorService exec=Executors.newCachedThreadPool();
Queue<T> resultQueue=new ConcurrentLinkedQueue<T>();
parrallelRecursive(exec,nodes,resultQueue);
exec.shutdown();
exec.awaitTermination(Long.Max_VALUE,TimeUnit.SECONDS);
return resultQueue;
}
4 批量获取多条线程的执行结果
当向线程池提交 callable 任务后,我们可能需要一次性获取所有返回结果,有三种处理方法。
4.1 方法一:自己维护返回结果
// 创建一个线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
// 存储执行结果的List
List<Future<String>> results = new ArrayList<Future<String>>();
// 提交10个任务
for ( int i=0; i<10; i++ ) {
Future<String> result = executorService.submit( new Callable<String>(){
public String call(){
int sleepTime = new Random().nextInt(1000);
Thread.sleep(sleepTime);
return "线程"+i+"睡了"+sleepTime+"秒";
}
} );
// 将执行结果存入results中
results.add( result );
}
// 获取10个任务的返回结果
for ( int i=0; i<10; i++ ) {
// 获取包含返回结果的future对象
Future<String> future = results.get(i);
// 从future中取出执行结果(若尚未返回结果,则get方法被阻塞,直到结果被返回为止)
String result = future.get();
System.out.println(result);
}
此方法的弊端:
- 需要自己创建容器维护所有的返回结果,比较麻烦;
- 从list中遍历的每个Future对象并不一定处于完成状态,这时调用get()方法就会被阻塞住,如果系统是设计成每个线程完成后就能根据其结果继续做后面的事,这样对于处于list后面的但是先完成的线程就会增加了额外的等待时间。
4.2 方法二:使用 ExecutorService 的 invokeAll 函数
本方法能解决第一个弊端,即并不需要自己去维护一个存储返回结果的容器。当我们需要获取线程池所有的返回结果时,只需调用invokeAll函数即可。 但是,这种方式需要你自己去维护一个用于存储任务的容器。
// 创建一个线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
// 创建存储任务的容器
List<Callable<String>> tasks = new ArrayList<Callable<String>>();
// 提交10个任务
for ( int i=0; i<10; i++ ) {
Callable<String> task = new Callable<String>(){
public String call(){
int sleepTime = new Random().nextInt(1000);
Thread.sleep(sleepTime);
return "线程"+i+"睡了"+sleepTime+"秒";
}
};
executorService.submit( task );
// 将task添加进任务队列
tasks.add( task );
}
// 获取10个任务的返回结果
List<Future<String>> results = executorService.invokeAll( tasks );
// 输出结果
for ( int i=0; i<10; i++ ) {
// 获取包含返回结果的future对象
Future<String> future = results.get(i);
// 从future中取出执行结果(若尚未返回结果,则get方法被阻塞,直到结果被返回为止)
String result = future.get();
System.out.println(result);
}
4.3 方法三:使用 CompletionService
CompletionService 内部维护了一个阻塞队列,只有执行完成的任务结果才会被放入该队列,这样就确保执行时间较短的任务率先被存入阻塞队列中。
ExecutorService exec = Executors.newFixedThreadPool(10);
final BlockingQueue<Future<Integer>> queue = new LinkedBlockingDeque<Future<Integer>>(
10);
//实例化CompletionService
final CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
exec, queue);
// 提交10个任务
for ( int i=0; i<10; i++ ) {
completionService.submit( new Callable<String>(){
public String call(){
int sleepTime = new Random().nextInt(1000);
Thread.sleep(sleepTime);
return "线程"+i+"睡了"+sleepTime+"秒";
}
} );
}
// 输出结果
for ( int i=0; i<10; i++ ) {
// 获取包含返回结果的future对象(若整个阻塞队列中还没有一条线程返回结果,那么调用take将会被阻塞,当然你可以调用poll,不会被阻塞,若没有结果会返回null,poll和take返回正确的结果后会将该结果从队列中删除)
Future<String> future = completionService.take();
// 从future中取出执行结果,这里存储的future已经拥有执行结果,get不会被阻塞
String result = future.get();
System.out.println(result);
}
5 中断服务
- 正确的封装原则:除非拥有某个线程,否则不能对该线程进行操作。
- 线程的所有权:线程有一个相应的所有者,即创建该线程的类。
- 线程的所有权是不可传递的,应用程序不能直接停止工作者线程,服务应该提供生命周期方法来关闭它自己以及它所拥有的线程。
- 对于持有线程的服务,只要服务的存在时间大于创建线程的方法的存在时间,那么就应该提供生命周期方法。ExecutorService 中提供了 shutdown 和 shutdownNow 等方法。
5.1 关闭 ExecutorService
ExecutorService 提供了两种关闭方法:
- shutdown():执行平缓关闭(执行完现有线程,不再接收新的线程)
- shutdownNow():执行粗暴关闭(尝试取消运行中任务,不再启动队列中未开始的任务)
- awaitTermination():等待终止
- isShutdown()
- isTerminated()
5.2 “毒丸”对象
“毒丸”:“当得到这个对象时,立即停止”。 在 FIFO 队列中使用
5.3 示例:只执行一次的服务
如果某个方法需要处理一批任务,并且当所有任务都处理完成后才返回,那么可以通过一个私有的 Executor 来简化服务的生命周期管理,其中该 Executor 的生命周期是由这个方法来控制的。(在这种情况下,invokeAll 和 invokeAny 等方法通常会起较大的作用。)
public class CheckForMail {
public boolean checkMail(Set<String> hosts,long timeout,TimeUnit unit)throws InterruptedException{
ExecutorService exec=Executors.newCachedThreadPool();
//之所以用AtomicBoolean代替volatile,是因为能从内部的Runnable中访问hasNewMail标志,因此它必须是final类型以免修改
final AtomicBoolean hasNewMail=new AtomicBoolean(false);//初始值为false
try{
for(final String host:hosts)
exec.execute(new Runnable(){ //Executes the given command at some time in the future
public void run(){
if(checkMail(host))
hasNewMail.set(true);
}
});
}finally {
exec.shutdown();
exec.awaitTermination(timeout, unit);
}
return hasNewMail.get();
}
private boolean checkMail(String host) {
//检查邮件
return false;
}
}
5.4 shutdownNow 的局限性
使用 shutdownNow 时,虽然他会返回已提交但尚未开始的任务,但我们无法在关闭中知道正在执行的任务的状态
记录哪些任务是在关闭后取消的:
public class TrackingExecutor extends AbstractExecutorService{
private final ExecutorService exec;
private final Set<Runnable> tasksCancelledAtShutdown=
Collections.synchronizedSet(new HashSet<Runnable>()); //创建一个线程安全的Set来记录那些任务是关闭后取消的
public List<Runnable> getCancelledTasks(){ //客户端可以通过这个方法知道,哪些任务是在关闭后取消的
if(!exec.isShutdown())
throw new IllegalStateException();
return new ArrayList<Runnable>(tasksCancelledAtShutdown);
}
public void execute(final Runnable runnable){
exec.execute(new Runnable(){
public void run(){
try{
runnable.run();
}finally{ //如果已经ExecutorService关闭了并且任务被中断(取消),添加到Set中
if(isShutdown()&&Thread.currentThread().isInterrupted())
tasksCancelledAtShutdown.add(runnable);
}
}
});
}
// 将ExecutorService的其他方法委托给exec
}
6 预定执行
ScheduledExecutorService 接口具有为预定执行(Scheduled Execution)或重复执行任务而设计的方法。它是一种允许使用线程池机制的 java.util.Timer 的泛化。
可以预定 Runnable 或 Callable 在初始的延迟之后只运行依次。也可以预定一个 Runnable 对象周期性地运行。
ScheduledExecutorService newScheduledThreadPool(int threads)
//返回一个线程池,它使用给定的线程数来调度任务。
ScheduledExecutorService newSingleThreadScheduledExecutor()
//返回一个执行器,它在一个单独线程中调度任务。
相比 Timer,ScheduledThreadPoolExecutor 更加完善:
- Timer 在执行所有定时任务时只会创建一个线程。若一个任务执行时间过长会影响其他任务的精确定时。
- Timer 线程并不捕获异常,因此在 TimerTask 抛出未受检查异常时定时线程将终止,未完成的任务不会再执行,出现"线程泄露";
- 实现自己的调度服务,可以利用 DelayQueue,它实现了 BlockingQueue,并为 ScheduledThreadPoolExecutor 提供调度功能。DelayQueue 管理着一组 Delay 对象,每个 Delay 对象都有一个相应的延迟时间,只有某个元素逾期后才能从 DelayQueue 中执行 take 操作。从 DelayQueue 中返回的对象将根据它们的延迟时间进行排序
7 Fork-Join 框架
有些应用使用了大量线程, 但其中大多数都是空闲的。另外一些应用可能对每个处理器内核分别使用一个线程,来完成计算密集型任务。
Java SE 7 中引入 fork-join 框架,专门用来支持后一类应用。
Fork 就是把一个大任务切分为若干子任务并行的执行,Join 就是合并这些子任务的执行结果,最后得到这个大任务的结果。
要完成这种递归计算,需要扩展 RecursiveTask
class Conter extends RecursiveTask<integer> {
// ...
protected Integer compute() {
if (to - from < THRESHOLD) {
// solve problem directly
} else {
int mid = (from + to) / 2;
Counter first = new Counter(values, form, mid, filter);
Counter second = new Counter(values, mid, to, filter);
invokeAll(first, second);
return first.join() + second.join();
}
}
}
invokeAll 方法接收到很多任务并阻塞,直到所有这些任务都已经完成。join 方法将生成结果。对每个子任务应用了 join,并返回其总和。
7.1 工作窃取
工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。使用双端队列减少线程竞争,被窃取任务的线程从头部拿任务,窃取任务的线程从尾部拿。
一个工作线程空闲时,它会从一个双端队列的尾部“窃取”一个任务。
public static void main(String[] args) {
// ...
Counter counter = new Counter(numbers, 0, numbers.length, x -> x > 0.5);
ForkJoinPool pool = new ForkJoinPool();
pool.invoke(counter);
long end = System.currentTimeMillis();
System.out.printf("count: %d, time: %d\n", counter.join(), end - mid);
}
8 使用实例
@Slf4j
public class ExecutorUtils {
private void updateAuthorsSourceMedia(List<Author> authors) {
if (authors.size() > MULTI_THREAD_LIMIT) {
List<List<Author>> result = new ArrayList<>(authors.stream().collect(Collectors.groupingBy(x -> authors.indexOf(x) % 10)).values());
List<FutureTask<Integer>> results = new ArrayList<>();
result.forEach(e -> {
FutureTask<Integer> ft = new FutureTask<>(new UpdateAuthorSourceMediaTask(e));
results.add(ft);
});
ExecutorUtils.doExecutor(results);
} else {
new UpdateAuthorSourceMediaTask(authors).call();
}
}
public static void doExecutor(Collection<FutureTask<Integer>> list) {
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("pool-%d").build();
ExecutorService pool = new ThreadPoolExecutor(list.size(), list.size(),
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
list.forEach(pool::submit);
Integer total = 0;
for (FutureTask<Integer> ft : list) {
try {
total = total + ft.get();
} catch (InterruptedException | ExecutionException e1) {
e1.printStackTrace();
}
}
log.info("============== Total:{} ===============", total);
pool.shutdown();
}
}