写点什么

戏(细)说 Executor 框架线程池任务执行全过程(下)

  • 2015-06-04
  • 本文字数:5448 字

    阅读完需:约 18 分钟

上一篇文章中通过引入的一个例子介绍了在Executor 框架下,提交一个任务的过程,这个过程就像我们老大的老大要找个老大来执行一个任务那样简单。并通过剖析ExecutorService 的一种经典实现ThreadPoolExecutor 来分析接收任务的主要逻辑,发现ThreadPoolExecutor 的工作思路和我们带项目的老大的工作思路完全一致。在本文中我们将继续后面的步骤,着重描述下任务执行的过程和任务执行结果获取的过程。会很容易发现,这个过程我们更加熟悉,因为正是每天我们工作的过程。除了ThreadPoolExecutor 的内部类Worker 外,对执行内容和执行结果封装的FutureTask 的表现是这部分着重需要了解的。

为了连贯期间,内容的编号延续上篇。

2. 任务执行

其实应该说是任务被执行,任务是宾语。动宾结构:execute the task,执行任务,无论写成英文还是中文似乎都是这样。那么主语是是 who 呢?明显不是调用 submit 的那位(线程),那是哪位呢?上篇介绍 ThreadPoolExecutor 主要属性时提到其中有一个 HashSet workers 的集合,我们有说明这里存储的就是线程池的工作队列的集合,队列的对象是 Worker 类型的工作线程,是 ThreadPoolExecutor 的一个内部类,实现了 Runnable 接口:

复制代码
private final class Worker implements Runnable

8) 看作业线程干什么当然是看它的 run 方法在干什么。如我们所料,作业线程就是在一直调用 getTask 方法获取任务,然后调用 runTask(task) 方法执行任务。看到没有,是在 while 循环里面,就是不干完不罢休的意思!在加班干活的苦逼的朋友们,有没有遇见战友的亲切感觉?

复制代码
public void run() {
try {
Runnable task = firstTask;
// 循环从线程池的任务队列获取任务
while (task != null || (task = getTask()) != null) {
// 执行任务
runTask(task);
task = null;
}
} finally {
workerDone(this);
}
}

然后简单看下 getTask 和 runTask(task) 方法的内容。

9) getTask 方法是 ThreadPoolExecutor 提供给其内部类 Worker 的的方法。作用就是一个,从任务队列中取任务,源源不断地输出任务。有没有想到老大手里拿的总是满满当当的 project,也是源源不断的。

复制代码
Runnable getTask() {
for (;;) {
// 从任务队列的头部取任务
r = workQueue.take();
return r;
}
}

10) runTask(Runnable task) 是工作线程 Worker 真正处理拿到的每个具体任务。看到这里才可用确认我们的猜想,之前提到 [y1] 的“执行任务”这个动宾结构前面的主语正是这些 Worker 呀。唠叨了半天(看主要方法都看到了整整第 10 个了),前面都是派活,这里才是干活。和我们的工作何其相似!老大(LD),老大的老大(LD^2),老大的老大(LD^n) 非常辛苦,花了很多时间、精力在会议室、在 project 上想着怎么生成和安排任务,然而真的轮到咱哥们干活,可能花了不少时间,但看看流程就是这么简单。三个大字:“Just do it”。

复制代码
private void runTask(Runnable task) {
// 调用任务的 run 方法,即在 Worker 线程中执行 Task 内定义内容。
task.run();
}

需要注意的地方出现了,调用的其实是 task 的 run 方法。看下 FutureTask 的 run 方法做了什么事情。

这里插入一个 FutureTask 的类图。可以看到 FutureTask 实现了 RunnableFuture 接口,所以 FutureTask 即有 Runnable 接口的 run 方法来定义任务内容,也有 Future 接口中定义的 get、cancel 等方法来控制任务执行和获取执行结果。Runnable 接口自不用说,Future 接口的伟大设计,就是使得实现该接口的对象可以阻塞线程直到任务执行完毕,也可以取消任务执行,检测任务是执行完毕还是被取消了。想想在之前我们使用 Thread.join() 或者 Thread.join(long millis) 等待任务结束是多么苦涩啊。

FutureTask 内部定义了一个 Sync 的内部类,继承自 AQS,来维护任务状态。关于 AQS 的设计思路,可以参照参考 Doug Lea 大师的原著 The java.util.concurrent Synchronizer Framework

(点击放大图像)

11) 和其他的同步工具类一样,FutureTask 的主要工作内容也是委托给其定义的内部类 Sync 来完成。

复制代码
public void run() {
// 调用 Sync 的对应方法
sync.innerRun();
}

12) FutureTask.Sync.innerRun(),这样做的目的就是为了维护任务执行的状态,只有当执行完后才能够获得任务执行结果。在该方法中,首先设置执行状态为 RUNNING 只有判断任务的状态是运行状态,才调用任务内封装的回调,并且在执行完成后设置回调的返回值到 FutureTask 的 result 变量上。在 FutureTask 中,innerRun 等每个“写”方法都会首先修改状态位,在后续会看到 innerGet 等“读”方法会先判断状态,然后才能决定后续的操作是否可以继续。下图是 FutureTask.Sync 中几个重要状态的流转情况,和其他的同步工具类一样,状态位使用的也是父类 AQS 的 state 属性。

(点击放大图像)

复制代码
void innerRun() {
// 通过对 AQS 的状态位 state 的判断来判断任务的状态是运行状态,则调用任务内封装的回调,并且设置回调的返回值
if (getState() == RUNNING)
innerSet(callable.call());
}
void innerSet(V v) {
for (;;) {
int s = getState();
// 设置运行状态为完成,并且把回调额执行结果设置给 result 变量
if (compareAndSetState(s, RAN)) {
result = v;
releaseShared(0);
done();
return;
}
}

至此工作线程执行 Task 就结束了。提交的任务是由 Worker 工作线程执行,正是在该线程上调用 Task 中定义的任务内容,即封装的 Callable 回调,并设置执行结果。下面就是最重要的部分:调用者如何获取执行的结果。让你加班那么久,总得把成果交出来吧。老大在等,因为老大的老大在等!

3. 获取执行结果

前面说过,对于老大的老大这样的使用者来说,获取执行结果这个过程总是最容易的事情,只需调用 FutureTask 的 get() 方法即可。该方法是在 Future 接口中就定义的。get 方法的作用就是等待执行结果。(Waits if necessary for the computation to complete, and then retrieves its result.)Future 这个接口命名得真好,虽然是在未来,但是定义有一个 get() 方法,总是“可以掌控的未来,总是有收获的未来!”实现该接口的 FutureTask 也应该是这个意思,在未来要完成的任务,但是一样要有结果哦。

13) FutureTask 的 get 方法同样委托给 Sync 来执行。和该方法类似,还有一个 V get(long timeout, TimeUnit unit),可以配置超时时间。

复制代码
public V get() throws InterruptedException, ExecutionException {
return sync.innerGet();
}

14) 在 Sync 的 innerGet 方法中,调用 AQS 父类定义的获取共享锁的方法 acquireSharedInterruptibly 来等待执行完成。如果执行完成了则可以继续执行后面的代码,返回 result 结果,否则如果还未完成,则阻塞线程等待执行完成。 [bd2] 再大的老大要想获得结果也得等老子干完了才行!可以看到调用 FutureTask 的 get 方法,进而调用到该方法的一定是想要执行结果的线程,一般应该就是提交 Task 的线程,而这个任务的执行是在 Worker 的工作线程上,通过 AQS 来保证执行完毕才能获取执行结果。该方法中 acquireSharedInterruptibly 是 AQS 父类中定义的获取共享锁的方法,但是到底满足什么条件可以成功获取共享锁,这是 Sync 的 tryAcquireShared 方法内定义的。 [bd3] 具体说来,innerIsDone 用来判断是否执行完毕,如果执行完毕则向下执行,返回 result 即可;如果判断未完成,则调用 AQS 的 doAcquireSharedInterruptibly 来挂起当前线程,一直到满足条件。这种思路在其他的几种同步工具类 Semaphore CountDownLatch ReentrantLock ReentrantReadWriteLock 也广泛使用。借助 AQS 框架,在获取锁时,先判断当前状态是否允许获取锁,若是允许则获取锁,否则获取不成功。获取不成功则会阻塞,进入阻塞队列。而释放锁时,一般会修改状态位,唤醒队列中的阻塞线程。每个同步工具类的自定义同步器都继承自 AQS 父类,是否可以获取锁根据同步类自身的功能要求覆盖 AQS 对应的 try 前缀方法,这些方法在 AQS 父类中都是只有定义没有内容。可以参照《源码剖析 AQS 在几个同步工具类中的使用》来详细了解。

突然想到想想那些被称为老大的,是不是整个 career 流程就是只干两件事情:submit a task, then wait and get the result。不对,还有一件事情,不是等待,而是催。“完了没,完了没?schedule 很紧的,抓点紧啊,要不要适当加点班啊……”

复制代码
V innerGet() throws InterruptedException, ExecutionException {
// 获得锁,表示执行完毕,才能获得后执行结果,否则阻塞等待执行完成再获取执行结果
acquireSharedInterruptibly(0);
return result;
}
protected int tryAcquireShared(int ignore) {
return innerIsDone()? 1 : -1;
}

至此,获得执行结果,圆满完成任务!

老大的老大,拍着咱们老大的肩膀(或者深情的抚摸着咱们老大唏嘘胡茬的脸庞)说:“亲,你这活干的漂亮!”而隔壁桌座位的几个兄弟,刚熬了几个晚上加班交付完这波 task 后,发现任务队列里又有新任务了,俺们老大又从他的另外一个老大手里接来的任务了。每个人都按照这样的角色进行着,依照这样的角色安排和谐愉快地进行着。。。

角色名

任务用户

任务管理者

任务执行者

角色属性

任务的甲方

任务的乙方

乙方的工具

角色说明

选择合适的任务执行服务,如可以根据需要选择 ThreadPoolExecutor 还是 ScheduledThreadPoolExecutor,并定制 ExecutorService 的配置。
定义好任务的工作内容和结果类型,提交任务,等待任务的执行结果

接收提交的任务;
维护执行服务内部管理;
配置工作线程执行任务

每个工作线程一直从任务执行服务获取待执行的任务,保证任务完成后返回执行结果。

Executor**** 中对应

创建获取 ExecutorService、并提交 Task 的外部接口

ExecutorService 的各种实现。如经典的 ThreadPoolExecutor,ScheduledThreadPoolExecutor

执行服务内定义的配套的 Worker 线程。如 ThreadPoolExecutor.Worker

主要接口方法

submit(Callable task)

execute(Runnable command)

runTask(Runnable task)

现实角色映射

手里有活的大老大

领人干活的老大

真正干活的码农

主要工作伪代码

taskService = createService()
future=taskService.submitTask()
future.get()

executeTask()
{ addTask()
createThread()
}

while(ture) {
getTask()
runTask()
}

四、 总结

从时序图上看主要的几个角色是这样配合完成任务提交、任务执行、获取执行结果这几个步骤的。

(点击放大图像)

  1. 外面需要提交任务的角色(如例子中老大的老大),首先创建一个任务执行服务 ExecutorService,一般使用工具类 Executors 的若干个工厂方法 创建不同特征的线程池 ThreadPoolExecutor,例子中是使用 newFixedThreadPool 方法创建有 n 个固定工作线程的线程池。
  2. 线程池是专门负责从外面接活的老大。把任务封装成一个 FutureTask 对象,并根据输入定义好要获得结果的类型,就可以 submit 任务了。
  3. 线程池就像我们团队里管人管项目的老大,各个都有一套娴熟、有效的办法来对付输入的任务和手下干活的兄弟一样,内部有一套比较完整、细致的任务管理办法,工作线程管理办法,以便应付输入的任务。这些逻辑全部在其 execute 方法中体现。
  4. 线程池接收输入的 task,根据需要创建工作线程,启动工作线程来执行 task。
  5. 工作线程在其 run 方法中一直循环,从线程池领取可以执行的 task,调用 task 的 run 方法执行 task 内定义的任务。
  6. FutureTask 的 run 方法中调用其内部类 Sync 的 innerRun 方法来执行封装的具体任务,并把任务的执行结果返回给 FutureTask 的 result 变量。
  7. 当提及任务的角色调用 FutureTask 的 get 方法获取执行结果时,Sync 的 innerGet 方法被调用。根据任务的执行状态判断,任务执行完毕则返回执行结果;未执行完毕则等待。

还记得我们费了半天劲试图找出任务执行时那个动宾结构的主语吗?从示例上看更像是线程池在向外提供任务执行的服务。就像我们的老大在代表我们接收任务、执行任务、提交执行结果。明显我们这些真正的 Worker 成了延伸,有点搞不懂到底我们是主语,还是主语延伸的工具,就像定义 ThreadPoolExecutor 的内部类 Worker 一样。我们只是工具,不是主语,是状语: execute the task by workers。突然想到毛主席当年的“数风流人物,还看今朝”,说的应该是这些 Worker 的劳苦大众吧,怎么都今朝这么久了,俺们这些 Woker 们还是风流不起来呢?风骚的作者居然在上面严肃的时序图上加了个风骚的小星星,向同行的 Worker 们致敬!

作者简介

张超盟,an ExTrender,‍CS 数据管理方向工学硕士。与妻儿蜗居于钱江畔,就职一初创安全公司任数据服务团队负责人,做数据 (存储、挖掘、服务) 方面研发。爱数据,爱代码,爱技术,爱豆吧!( idouba.net )。


感谢徐川对本文的审校。

给InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家通过新浪微博( @InfoQ @丁晓昀),微信(微信号: InfoQChina )关注我们,并与我们的编辑和其他读者朋友交流(欢迎加入 InfoQ 读者交流群)。

2015-06-04 12:3711563

评论

发布
暂无评论
发现更多内容

bucket表:数仓存算分离中CU与DN解绑的关键

华为云开发者联盟

数据库 后端 华为云 华为云开发者联盟 企业号 3 月 PK 榜

解密数仓高可用failover流程

华为云开发者联盟

数据库 后端 华为云 华为云开发者联盟 企业号 3 月 PK 榜

博睿“她”力量 :这份专业值得信赖

博睿数据

博睿数据 节日祝福

如何判断多账号是同一个人?用图技术搞定 ID Mapping

NebulaGraph

图数据库 风险控制 安全控制

ChatGPT 未来发展趋势 | 社区征文

魏铁锤

ChatGPT

如何规范 RESTful API 的业务错误处理

江湖十年

Go 后端 Error RESTful API

DLRover:蚂蚁开源大规模智能分布式训练系统

SOFAStack

人工智能 互联网 DLRover

及刻周边惠:拥抱HarmonyOS原子化服务

HarmonyOS开发者

HarmonyOS

CNStack 多集群服务:基于 OCM 打造完善的集群管理能力

阿里巴巴云原生

阿里云 云原生 kubenetes 集群管理

科技和女性的今天,《赛博格宣言》半个世纪前就预言了

脑极体

赛博格 女性

GitLab 凭借什么连续 3 年上榜 Gartner 应用程序安全测试魔力象限?听听 GitLab 自己的分析

极狐GitLab

DevOps DevSecOps 安全测试 极狐GitLab 安全合规

MASA MAUI Plugin (十)iOS消息推送(原生APNS方式)

MASA技术团队

blazor MASA MAUI Xamarin

从小程序容器和微服务架构的结合,看未来应用程序开发的主流方式

没有用户名丶

2023年2月国产数据库大事记-墨天轮

墨天轮

数据库 opengauss TiDB oceanbase 国产数据库

秒懂算法 | DP概述和常见DP面试题

TiAmo

算法 DP算法

瓴羊Quick BI真心不错,已获得官方认可!

对不起该用户已成仙‖

探索ChatGPT技术在文本生成、机器翻译领域的简单应用 | 社区征文

兴科Sinco

人工智能 机器翻译 OpenAPI openai ChatGPT

Meta Force佛萨奇2.0合约开发系统源码部署

薇電13242772558

智能合约

直播预约丨 微服务x容器开源开发者 Meetup 北京站回顾 & PPT 下载

阿里巴巴云原生

阿里云 容器 微服务 云原生

携程 x TiDB丨应对全球业务海量数据增长,一栈式 HTAP 实现架构革新

PingCAP

数据库 TiDB

“中国的ChatGPT”真的要来了吗?

科技热闻

设备离线时控制指令如何下发:通过设备影子实现离线设备的控制指令触达方案——设备管理运维类

阿里云AIoT

物联网

【物联网开发实战】- 设备上云方案详解——设备接入类

阿里云AIoT

物联网 传感器

如何通过C#/VB.NET代码在Word中插入或删除脚注

在下毛毛雨

C# .net word 脚注

车载小程序发展现状:使用环境、用户体验、应用场景及未来趋势

没有用户名丶

小程序化

3 月 9 日「融云 2023 政企数智办公新品巡展 · 北京站」邀您入席!

融云 RongCloud

产品 数字化 政企

汇率市场大幅波动,用友BIP全球司库助力企业外汇避险

用友BIP

金融 外汇避险

IoT物联网设备OTA固件升级开发实践——设备管理运营类

阿里云AIoT

物联网

大咖说·阿里研究院|数实融合的第三次浪潮

大咖说

开放下载丨云原生架构容器&微服务优秀案例集

阿里巴巴云原生

阿里云 容器 微服务 云原生

百度智能云首批通过信通院MLOps旗舰级评测 全面加速文心一言产业落地

Geek_2d6073

戏(细)说Executor框架线程池任务执行全过程(下)_Java_张超盟_InfoQ精选文章