写点什么

戏(细)说 Executor 框架线程池任务执行全过程(下)

  • 2015-06-04
  • 本文字数:5448 字

    阅读完需:约 18 分钟

上一篇文章中通过引入的一个例子介绍了在Executor 框架下,提交一个任务的过程,这个过程就像我们老大的老大要找个老大来执行一个任务那样简单。并通过剖析ExecutorService 的一种经典实现ThreadPoolExecutor 来分析接收任务的主要逻辑,发现ThreadPoolExecutor 的工作思路和我们带项目的老大的工作思路完全一致。在本文中我们将继续后面的步骤,着重描述下任务执行的过程和任务执行结果获取的过程。会很容易发现,这个过程我们更加熟悉,因为正是每天我们工作的过程。除了ThreadPoolExecutor 的内部类Worker 外,对执行内容和执行结果封装的FutureTask 的表现是这部分着重需要了解的。

为了连贯期间,内容的编号延续上篇。

2. 任务执行

其实应该说是任务被执行,任务是宾语。动宾结构:execute the task,执行任务,无论写成英文还是中文似乎都是这样。那么主语是是 who 呢?明显不是调用 submit 的那位(线程),那是哪位呢?上篇介绍 ThreadPoolExecutor 主要属性时提到其中有一个 HashSet workers 的集合,我们有说明这里存储的就是线程池的工作队列的集合,队列的对象是 Worker 类型的工作线程,是 ThreadPoolExecutor 的一个内部类,实现了 Runnable 接口:

复制代码
private final class Worker implements Runnable

8) 看作业线程干什么当然是看它的 run 方法在干什么。如我们所料,作业线程就是在一直调用 getTask 方法获取任务,然后调用 runTask(task) 方法执行任务。看到没有,是在 while 循环里面,就是不干完不罢休的意思!在加班干活的苦逼的朋友们,有没有遇见战友的亲切感觉?

复制代码
public void run() {
try {
Runnable task = firstTask;
// 循环从线程池的任务队列获取任务
while (task != null || (task = getTask()) != null) {
// 执行任务
runTask(task);
task = null;
}
} finally {
workerDone(this);
}
}

然后简单看下 getTask 和 runTask(task) 方法的内容。

9) getTask 方法是 ThreadPoolExecutor 提供给其内部类 Worker 的的方法。作用就是一个,从任务队列中取任务,源源不断地输出任务。有没有想到老大手里拿的总是满满当当的 project,也是源源不断的。

复制代码
Runnable getTask() {
for (;;) {
// 从任务队列的头部取任务
r = workQueue.take();
return r;
}
}

10) runTask(Runnable task) 是工作线程 Worker 真正处理拿到的每个具体任务。看到这里才可用确认我们的猜想,之前提到 [y1] 的“执行任务”这个动宾结构前面的主语正是这些 Worker 呀。唠叨了半天(看主要方法都看到了整整第 10 个了),前面都是派活,这里才是干活。和我们的工作何其相似!老大(LD),老大的老大(LD^2),老大的老大(LD^n) 非常辛苦,花了很多时间、精力在会议室、在 project 上想着怎么生成和安排任务,然而真的轮到咱哥们干活,可能花了不少时间,但看看流程就是这么简单。三个大字:“Just do it”。

复制代码
private void runTask(Runnable task) {
// 调用任务的 run 方法,即在 Worker 线程中执行 Task 内定义内容。
task.run();
}

需要注意的地方出现了,调用的其实是 task 的 run 方法。看下 FutureTask 的 run 方法做了什么事情。

这里插入一个 FutureTask 的类图。可以看到 FutureTask 实现了 RunnableFuture 接口,所以 FutureTask 即有 Runnable 接口的 run 方法来定义任务内容,也有 Future 接口中定义的 get、cancel 等方法来控制任务执行和获取执行结果。Runnable 接口自不用说,Future 接口的伟大设计,就是使得实现该接口的对象可以阻塞线程直到任务执行完毕,也可以取消任务执行,检测任务是执行完毕还是被取消了。想想在之前我们使用 Thread.join() 或者 Thread.join(long millis) 等待任务结束是多么苦涩啊。

FutureTask 内部定义了一个 Sync 的内部类,继承自 AQS,来维护任务状态。关于 AQS 的设计思路,可以参照参考 Doug Lea 大师的原著 The java.util.concurrent Synchronizer Framework

(点击放大图像)

11) 和其他的同步工具类一样,FutureTask 的主要工作内容也是委托给其定义的内部类 Sync 来完成。

复制代码
public void run() {
// 调用 Sync 的对应方法
sync.innerRun();
}

12) FutureTask.Sync.innerRun(),这样做的目的就是为了维护任务执行的状态,只有当执行完后才能够获得任务执行结果。在该方法中,首先设置执行状态为 RUNNING 只有判断任务的状态是运行状态,才调用任务内封装的回调,并且在执行完成后设置回调的返回值到 FutureTask 的 result 变量上。在 FutureTask 中,innerRun 等每个“写”方法都会首先修改状态位,在后续会看到 innerGet 等“读”方法会先判断状态,然后才能决定后续的操作是否可以继续。下图是 FutureTask.Sync 中几个重要状态的流转情况,和其他的同步工具类一样,状态位使用的也是父类 AQS 的 state 属性。

(点击放大图像)

复制代码
void innerRun() {
// 通过对 AQS 的状态位 state 的判断来判断任务的状态是运行状态,则调用任务内封装的回调,并且设置回调的返回值
if (getState() == RUNNING)
innerSet(callable.call());
}
void innerSet(V v) {
for (;;) {
int s = getState();
// 设置运行状态为完成,并且把回调额执行结果设置给 result 变量
if (compareAndSetState(s, RAN)) {
result = v;
releaseShared(0);
done();
return;
}
}

至此工作线程执行 Task 就结束了。提交的任务是由 Worker 工作线程执行,正是在该线程上调用 Task 中定义的任务内容,即封装的 Callable 回调,并设置执行结果。下面就是最重要的部分:调用者如何获取执行的结果。让你加班那么久,总得把成果交出来吧。老大在等,因为老大的老大在等!

3. 获取执行结果

前面说过,对于老大的老大这样的使用者来说,获取执行结果这个过程总是最容易的事情,只需调用 FutureTask 的 get() 方法即可。该方法是在 Future 接口中就定义的。get 方法的作用就是等待执行结果。(Waits if necessary for the computation to complete, and then retrieves its result.)Future 这个接口命名得真好,虽然是在未来,但是定义有一个 get() 方法,总是“可以掌控的未来,总是有收获的未来!”实现该接口的 FutureTask 也应该是这个意思,在未来要完成的任务,但是一样要有结果哦。

13) FutureTask 的 get 方法同样委托给 Sync 来执行。和该方法类似,还有一个 V get(long timeout, TimeUnit unit),可以配置超时时间。

复制代码
public V get() throws InterruptedException, ExecutionException {
return sync.innerGet();
}

14) 在 Sync 的 innerGet 方法中,调用 AQS 父类定义的获取共享锁的方法 acquireSharedInterruptibly 来等待执行完成。如果执行完成了则可以继续执行后面的代码,返回 result 结果,否则如果还未完成,则阻塞线程等待执行完成。 [bd2] 再大的老大要想获得结果也得等老子干完了才行!可以看到调用 FutureTask 的 get 方法,进而调用到该方法的一定是想要执行结果的线程,一般应该就是提交 Task 的线程,而这个任务的执行是在 Worker 的工作线程上,通过 AQS 来保证执行完毕才能获取执行结果。该方法中 acquireSharedInterruptibly 是 AQS 父类中定义的获取共享锁的方法,但是到底满足什么条件可以成功获取共享锁,这是 Sync 的 tryAcquireShared 方法内定义的。 [bd3] 具体说来,innerIsDone 用来判断是否执行完毕,如果执行完毕则向下执行,返回 result 即可;如果判断未完成,则调用 AQS 的 doAcquireSharedInterruptibly 来挂起当前线程,一直到满足条件。这种思路在其他的几种同步工具类 Semaphore CountDownLatch ReentrantLock ReentrantReadWriteLock 也广泛使用。借助 AQS 框架,在获取锁时,先判断当前状态是否允许获取锁,若是允许则获取锁,否则获取不成功。获取不成功则会阻塞,进入阻塞队列。而释放锁时,一般会修改状态位,唤醒队列中的阻塞线程。每个同步工具类的自定义同步器都继承自 AQS 父类,是否可以获取锁根据同步类自身的功能要求覆盖 AQS 对应的 try 前缀方法,这些方法在 AQS 父类中都是只有定义没有内容。可以参照《源码剖析 AQS 在几个同步工具类中的使用》来详细了解。

突然想到想想那些被称为老大的,是不是整个 career 流程就是只干两件事情:submit a task, then wait and get the result。不对,还有一件事情,不是等待,而是催。“完了没,完了没?schedule 很紧的,抓点紧啊,要不要适当加点班啊……”

复制代码
V innerGet() throws InterruptedException, ExecutionException {
// 获得锁,表示执行完毕,才能获得后执行结果,否则阻塞等待执行完成再获取执行结果
acquireSharedInterruptibly(0);
return result;
}
protected int tryAcquireShared(int ignore) {
return innerIsDone()? 1 : -1;
}

至此,获得执行结果,圆满完成任务!

老大的老大,拍着咱们老大的肩膀(或者深情的抚摸着咱们老大唏嘘胡茬的脸庞)说:“亲,你这活干的漂亮!”而隔壁桌座位的几个兄弟,刚熬了几个晚上加班交付完这波 task 后,发现任务队列里又有新任务了,俺们老大又从他的另外一个老大手里接来的任务了。每个人都按照这样的角色进行着,依照这样的角色安排和谐愉快地进行着。。。

角色名

任务用户

任务管理者

任务执行者

角色属性

任务的甲方

任务的乙方

乙方的工具

角色说明

选择合适的任务执行服务,如可以根据需要选择 ThreadPoolExecutor 还是 ScheduledThreadPoolExecutor,并定制 ExecutorService 的配置。
定义好任务的工作内容和结果类型,提交任务,等待任务的执行结果

接收提交的任务;
维护执行服务内部管理;
配置工作线程执行任务

每个工作线程一直从任务执行服务获取待执行的任务,保证任务完成后返回执行结果。

Executor**** 中对应

创建获取 ExecutorService、并提交 Task 的外部接口

ExecutorService 的各种实现。如经典的 ThreadPoolExecutor,ScheduledThreadPoolExecutor

执行服务内定义的配套的 Worker 线程。如 ThreadPoolExecutor.Worker

主要接口方法

submit(Callable task)

execute(Runnable command)

runTask(Runnable task)

现实角色映射

手里有活的大老大

领人干活的老大

真正干活的码农

主要工作伪代码

taskService = createService()
future=taskService.submitTask()
future.get()

executeTask()
{ addTask()
createThread()
}

while(ture) {
getTask()
runTask()
}

四、 总结

从时序图上看主要的几个角色是这样配合完成任务提交、任务执行、获取执行结果这几个步骤的。

(点击放大图像)

  1. 外面需要提交任务的角色(如例子中老大的老大),首先创建一个任务执行服务 ExecutorService,一般使用工具类 Executors 的若干个工厂方法 创建不同特征的线程池 ThreadPoolExecutor,例子中是使用 newFixedThreadPool 方法创建有 n 个固定工作线程的线程池。
  2. 线程池是专门负责从外面接活的老大。把任务封装成一个 FutureTask 对象,并根据输入定义好要获得结果的类型,就可以 submit 任务了。
  3. 线程池就像我们团队里管人管项目的老大,各个都有一套娴熟、有效的办法来对付输入的任务和手下干活的兄弟一样,内部有一套比较完整、细致的任务管理办法,工作线程管理办法,以便应付输入的任务。这些逻辑全部在其 execute 方法中体现。
  4. 线程池接收输入的 task,根据需要创建工作线程,启动工作线程来执行 task。
  5. 工作线程在其 run 方法中一直循环,从线程池领取可以执行的 task,调用 task 的 run 方法执行 task 内定义的任务。
  6. FutureTask 的 run 方法中调用其内部类 Sync 的 innerRun 方法来执行封装的具体任务,并把任务的执行结果返回给 FutureTask 的 result 变量。
  7. 当提及任务的角色调用 FutureTask 的 get 方法获取执行结果时,Sync 的 innerGet 方法被调用。根据任务的执行状态判断,任务执行完毕则返回执行结果;未执行完毕则等待。

还记得我们费了半天劲试图找出任务执行时那个动宾结构的主语吗?从示例上看更像是线程池在向外提供任务执行的服务。就像我们的老大在代表我们接收任务、执行任务、提交执行结果。明显我们这些真正的 Worker 成了延伸,有点搞不懂到底我们是主语,还是主语延伸的工具,就像定义 ThreadPoolExecutor 的内部类 Worker 一样。我们只是工具,不是主语,是状语: execute the task by workers。突然想到毛主席当年的“数风流人物,还看今朝”,说的应该是这些 Worker 的劳苦大众吧,怎么都今朝这么久了,俺们这些 Woker 们还是风流不起来呢?风骚的作者居然在上面严肃的时序图上加了个风骚的小星星,向同行的 Worker 们致敬!

作者简介

张超盟,an ExTrender,‍CS 数据管理方向工学硕士。与妻儿蜗居于钱江畔,就职一初创安全公司任数据服务团队负责人,做数据 (存储、挖掘、服务) 方面研发。爱数据,爱代码,爱技术,爱豆吧!( idouba.net )。


感谢徐川对本文的审校。

给InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家通过新浪微博( @InfoQ @丁晓昀),微信(微信号: InfoQChina )关注我们,并与我们的编辑和其他读者朋友交流(欢迎加入 InfoQ 读者交流群)。

2015-06-04 12:3711645

评论

发布
暂无评论
发现更多内容

Amazon CodeWhisperer代码提示体验测评

查拉图斯特拉说

CodeWhisperer 代码提示

实用且简洁的Python语法

进基的小张

Python 学习 经验分享 技巧分享 python小技巧

IDEA 终端命令行设置

Andy

实用自定义 Mac Dock 的隐藏终端命令

Rose

Mac Dock 自定义Dock

Hazel for Mac(自动化清理软件)v5.2.2激活版

Rose

Mac清理软件 Hazel 下载 Hazel Mac版 自动化文件管理工具

破防了!阿里大佬DDD(领域驱动设计)不破不立,GitHub直接霸榜

Java你猿哥

Java 领域驱动设计 DDD ssm 领域驱动

C语言编程—函数指针与回调函数

芯动大师

Parallels使用到期怎么办?PD17虚拟机无限试用版安装教程

Rose

Parallels Desktop 17下载 Parallels到期 PD17虚拟机 PD无限试用版

2023年互联网Java工程师高级面试八股文汇总(1260道题目附解析)

Java你猿哥

MySQL redis Spring Boot mybatis 多线程

万众瞩目的Nautilus Chain即将上线主网,生态正式起航

BlockChain先知

MySQL踩坑笔记,加了唯一索引还会出现重复数据?

Java你猿哥

Java MySQL sql ssm 索引

Mac电脑Photoshop 2023 Beta版完美解锁!打字生成图像,Ai绘图功能版!

理理

Ps最新版下载 Photoshop2023破解 Ai绘图

volatile 底层是如何实现的?

javacn.site

Mac版PS2023 24.5 大更新,新增4大AI功能,看完忍不住换了!

Rose

PS2023最新版 PS支持AI功能 Photoshop破解

揭秘!为何阿里P8亲身经历撰写的架构师核心笔记竟如此成功

Java你猿哥

Java 领域驱动设计 软件架构 架构师 分布式架构

科兴未来|深圳创新创业大赛开始啦!

科兴未来News

企业 深圳 #双创赛事# 新能源行业 深创赛

OpenHarmony 3.2 Release新特性解读之驱动HCS

OpenHarmony开发者

OpenHarmony

终极指南!Terraform的进阶技巧

SEAL安全

IaC Terraform 5月 企业号 5 月 PK 榜

简洁高效:Java代码中If-Else结构的优化实践与技巧

xfgg

Java 代码优化

StampedLock:高并发场景下一种比读写锁更快的锁

华为云开发者联盟

开发 华为云 华为云开发者联盟 企业号 5 月 PK 榜

万众瞩目的Nautilus Chain即将上线主网,生态正式起航

股市老人

万众瞩目的Nautilus Chain即将上线主网,生态正式起航

西柚子

跨平台开发的优势:ReactNative与小程序容器

没有用户名丶

基于Spring Boot+VUE Java小程序商城项目(附源码),接私活利器

Java你猿哥

Java 小程序 源码 Spring Boot Vue

CleanMyMac闪退怎么办?解决CleanMyMac X闪退

理理

mac系统清理优化软件 CleanMyMac下载 CleanMyMac闪退 CleanMyMac最新版

Mac电脑怎么删除VMware Fusion虚拟机系统,vmware fusion如何删除虚拟机

Rose

VMware Fusion虚拟机 Mac虚拟机 删除虚拟机教程 Win系统

Python实现KNN算法

TiAmo

算法 决策 KNN算法

RocketMQ 顺序消费机制

Java你猿哥

Java RocketMQ 获取 topic等信息 ssm

DMG镜像制作软件:DMG Canvas 激活版

真大的脸盆

Mac Mac 软件 镜像文件制作工具 镜像文件管理

从2000ms缩短到50ms,亿级ES数据搜索性能调优实践

Java你猿哥

Java elasticsearch ELK ssm ES

戏(细)说Executor框架线程池任务执行全过程(下)_Java_张超盟_InfoQ精选文章