写点什么

Java 多线程编程模式实战指南(三):Two-phase Termination 模式

  • 2015-01-22
  • 本文字数:8441 字

    阅读完需:约 28 分钟

停止线程是一个目标简单而实现却不那么简单的任务。首先,Java 没有提供直接的 API 用于停止线程。此外,停止线程时还有一些额外的细节需要考虑,如待停止的线程处于阻塞(等待锁)或者等待状态(等待其它线程)、尚有未处理完的任务等。本文介绍的 Two-phase Termination 模式提供了一种通用的用于优雅地停止线程的方法。

Two-phase Termination 模式简介

Java 并没有提供直接的 API 用于停止线程。Two-phase Termination 模式通过将停止线程这个动作分解为准备阶段和执行阶段这两个阶段,以应对停止线程过程中可能存在的问题。

准备阶段。该阶段主要动作是“通知”目标线程(欲停止的线程)准备进行停止。这一步会设置一个标志变量用于指示目标线程可以准备停止了。但是,由于目标线程可能正处于阻塞状态(等待锁的获得)、等待状态(如调用 Object.wait)或者 I/O(如 InputStream.read)等待等状态,即便设置了这个标志,目标线程也无法立即“看到”这个标志而做出相应动作。因此,这一阶段还需要通过调用目标线程的 interrupt 方法,以期望目标线程能够通过捕获相关的异常侦测到该方法调用,从而中断其阻塞状态、等待状态。对于能够对 interrupt 方法调用作出响应的方法(参见表 1),目标线程代码可以通过捕获这些方法抛出的 InterruptedException 来侦测线程停止信号。但也有一些方法(如 InputStream.read)并不对 interrupt 调用作出响应,此时需要我们手工处理,如同步的 Socket I/O 操作中通过关闭 socket,使处于 I/O 等待的 socket 抛出 java.net.SocketException。

表 1. 能够对 Thread.interrupt 作出响应的一些方法

方法

响应interrupt调用抛出的异常

Object.wait() 、 Object.wait(long timeout) 、Object.wait(long timeout, int nanos)

InterruptedException

Thread.sleep(long millis) 、Thread.sleep(long millis, int nanos)

InterruptedException

Thread.join()、Thread.join(long millis) 、Thread.join(long millis, int nanos)

InterruptedException

java.util.concurrent.BlockingQueue.take()

InterruptedException

java.util.concurrent.locks.Lock.lockInterruptibly()

InterruptedException

java.nio.channels.InterruptibleChannel

java.nio.channels.ClosedByInterruptException

执行阶段。该阶段的主要动作是检查准备阶段所设置的线程停止标志和信号,在此基础上决定线程停止的时机,并进行适当的“清理”操作。

Two-phase Termination 模式的架构

Two-phase Termination 模式的主要参与者有以下几种。其类图如图 1 所示。

图 1. Two-phase Termination 模式的类图

  • ThreadOwner:目标线程的拥有者。Java 语言中,并没有线程的拥有者的概念,但是线程的背后是其要处理的任务或者其所提供的服务,因此我们不能在不清楚某个线程具体是做什么的情况下贸然将其停止。一般地,我们可以将目标线程的创建者视为该线程的拥有者,并假定其“知道”目标线程的工作内容,可以安全地停止目标线程。
  • TerminatableThread:可停止的线程。其主要方法及职责如下:
    • terminate:设置线程停止标志,并发送停止“信号”给目标线程。
    • doTerminate留给子类实现线程停止时所需的一些额外操作,如目标线程代码中包含 Socket I/O,子类可以在该方法中关闭 Socket 以达到快速停止线程,而不会使目标线程等待 I/O 完成才能侦测到线程停止标记。
    • doRun留给子类实现线程的处理逻辑。相当于 Thread.run,只不过该方法中无需关心停止线程的逻辑,因为这个逻辑已经被封装在 TerminatableThread 的 run 方法中了。
    • doCleanup留给子类实现线程停止后可能需要的一些清理动作。
  • TerminationToken线程停止标志。toShutdown 用于指示目标线程可以停止了。reservations 可用于反映目标线程还有多少数量未完成的任务,以支持等目标线程处理完其任务后再行停止。

准备阶段的序列图如图 2 所示:

图 2. 准备阶段的序列图

1、客户端代码调用线程拥有者的 shutdown 方法。

2、shutdown 方法调用目标线程的 terminate 方法。

3~4、terminate 方法将 terminationToken 的 toShutdown 标志设置为 true。

5、terminate 方法调用由 TerminatableThread 子类实现的 doTerminate 方法,使得子类可以为停止目标线程做一些其它必要的操作。

6、若 terminationToken 的 reservations 属性值为 0,则表示目标线程没有未处理完的任务或者 ThreadOwner 在停止线程时不关心其是否有未处理的任务。此时,terminate 方法会调用目标线程的 interrupt 方法。

7、terminate 方法调用结束。

8、shutdown 调用返回,此时目标线程可能还仍然在运行。

执行阶段由目标线程的代码去检查 terminationToken 的 toShutdown 属性、reservations 属性的值,并捕获由 interrupt 方法调用抛出的相关异常以决定是否停止线程。在线程停止前由 TerminatableThread 子类实现的 doCleanup 方法会被调用。

Two-phase Termination 模式实战案例

某系统需要对接告警系统以实现告警功能。告警系统是一个 C/S 结构的系统,它提供了一套客户端 API(AlarmAgent)用于与其对接的系统给其发送告警。该系统将告警功能封装在一个名为 AlarmMgr 的单件类(Singleton)中,系统中其它代码需要发送告警的只需要调用该类的 sendAlarm 方法。该方法将告警信息缓存入队列,由专门的告警发送线程负责调用 AlarmAgent 的相关方法将告警信息发送至告警服务器。

告警发送线程是一个用户线程(User Thread),因此在系统的停止过程中,该线程若未停止则会阻止 JVM 正常关闭。所以,在系统停止过程中我们必须主动去停止告警发送线程,而非依赖 JVM。为了能够尽可能快的以优雅的方式将告警发送线程停止,我们需要处理以下两个问题:

  1. 当告警缓存队列非空时,需要将队列中已有的告警信息发送至告警服务器。
  2. 由于缓存告警信息的队列是一个阻塞队列(LinkedBlockingQueue),在该队列为空的情况下,告警发送线程会一直处于等待状态。这会导致其无法响应我们的关闭线程的请求。

上述问题可以通过使用 Two-phase Termination 模式来解决。

AlarmMgr 相当于图 1 中的 ThreadOwner 参与者实例,它是告警发送线程的拥有者。系统停止过程中调用其 shutdown 方法(AlarmMgr.getInstance().shutdown())即可请求告警发送线程停止。其代码如清单 1 所示:

清单 1. AlarmMgr 源码

复制代码
public class AlarmMgr {
private final BlockingQueue<AlarmInfo> alarms = new LinkedBlockingQueue<AlarmInfo>();
// 告警系统客户端 API
private final AlarmAgent alarmAgent = new AlarmAgent();
// 告警发送线程
private final AbstractTerminatableThread alarmSendingThread;
private boolean shutdownRequested = false;
private static final AlarmMgr INSTANCE = new AlarmMgr();
private AlarmMgr() {
alarmSendingThread = new AbstractTerminatableThread() {
@Override
protected void doRun() throws Exception {
if (alarmAgent.waitUntilConnected()) {
AlarmInfo alarm;
alarm = alarms.take();
terminationToken.reservations.decrementAndGet();
try {
alarmAgent.sendAlarm(alarm);
} catch (Exception e) {
e.printStackTrace();
}
}
}
@Override
protected void doCleanup(Exception exp) {
if (null != exp) {
exp.printStackTrace();
}
alarmAgent.disconnect();
}
};
alarmAgent.init();
}
public static AlarmMgr getInstance() {
return INSTANCE;
}
public void sendAlarm(AlarmType type, String id, String extraInfo) {
final TerminationToken terminationToken = alarmSendingThread.terminationToken;
if (terminationToken.isToShutdown()) {
// log the alarm
System.err.println("rejected alarm:" + id + "," + extraInfo);
return;
}
try {
AlarmInfo alarm = new AlarmInfo(id, type);
alarm.setExtraInfo(extraInfo);
terminationToken.reservations.incrementAndGet();
alarms.add(alarm);
} catch (Throwable t) {
t.printStackTrace();
}
}
public void init() {
alarmSendingThread.start();
}
public synchronized void shutdown() {
if (shutdownRequested) {
throw new IllegalStateException("shutdown already requested!");
}
alarmSendingThread.terminate();
shutdownRequested = true;
}
public int pendingAlarms() {
return alarmSendingThread.terminationToken.reservations.get();
}
}
class AlarmAgent {
// 省略其它代码
private volatile boolean connectedToServer = false;
public void sendAlarm(AlarmInfo alarm) throws Exception {
// 省略其它代码
System.out.println("Sending " + alarm);
try {
Thread.sleep(50);
} catch (Exception e) {
}
}
public void init() {
// 省略其它代码
connectedToServer = true;
}
public void disconnect() {
// 省略其它代码
System.out.println("disconnected from alarm server.");
}
public boolean waitUntilConnected() {
// 省略其它代码
return connectedToServer;
}
}

从上面的代码可以看出,AlarmMgr 每接受一个告警信息放入缓存队列便将 terminationToken 的 reservations 值增加 1,而告警发送线程每发送一个告警到告警服务器则将 terminationToken 的 reservations 值减少 1。这为我们可以在停止告警发送线程前确保队列中现有的告警信息会被处理完毕提供了线索:AbstractTerminatableThread 的 run 方法会根据 terminationToken 的 reservations 是否为 0 来判断待停止的线程已无未处理的任务,或者无需关心其是否有待处理的任务。

AbstractTerminatableThread 的源码见清单 2:

清单 2. AbstractTerminatableThread 源码

复制代码
public abstract class AbstractTerminatableThread extends Thread
implements Terminatable {
public final TerminationToken terminationToken;
public AbstractTerminatableThread() {
super();
this.terminationToken = new TerminationToken();
}
/**
*
* @param terminationToken 线程间共享的线程终止标志实例
*/
public AbstractTerminatableThread(TerminationToken terminationToken) {
super();
this.terminationToken = terminationToken;
}
protected abstract void doRun() throws Exception;
protected void doCleanup(Exception cause) {}
protected void doTerminiate() {}
@Override
public void run() {
Exception ex = null;
try {
while (true) {
/*
* 在执行线程的处理逻辑前先判断线程停止的标志。
*/
if (terminationToken.isToShutdown()
&& terminationToken.reservations.get() <= 0) {
break;
}
doRun();
}
} catch (Exception e) {
// Allow the thread to terminate in response of a interrupt invocation
ex = e;
} finally {
doCleanup(ex);
}
}
@Override
public void interrupt() {
terminate();
}
@Override
public void terminate() {
terminationToken.setToShutdown(true);
try {
doTerminiate();
} finally {
// 若无待处理的任务,则试图强制终止线程
if (terminationToken.reservations.get() <= 0) {
super.interrupt();
}
}
}
}

AbstractTerminatableThread 是一个可复用的 TerminatableThread 参与者实例。其 terminate 方法完成了线程停止的准备阶段。该方法首先将 terminationToken 的 toShutdown 变量设置为 true,指示目标线程可以准备停止了。但是,此时目标线程可能处于一些阻塞(Blocking)方法的调用,如调用 Object.sleep、InputStream.read 等,无法侦测到该变量。调用目标线程的 interrupt 方法可以使一些阻塞方法(参见表 1)通过抛出异常从而使目标线程停止。但也有些阻塞方法如 InputStream.read 并不对 interrupt 方法调用作出响应,此时需要由 TerminatableThread 的子类实现 doTerminiate 方法,在该方法中实现一些关闭目标线程所需的额外操作。例如,在 Socket 同步 I/O 中通过关闭 socket 使得使用该 socket 的线程若处于 I/O 等待会抛出 SocketException。因此,terminate 方法下一步调用 doTerminate 方法。接着,若 terminationToken.reservations 的值为非正数(表示目标线程无待处理任务、或者我们不关心其是否有待处理任务),则 terminate 方法会调用目标线程的 interrupt 方法,强制目标线程的阻塞方法中断,从而强制终止目标线程。

执行阶段在 AbstractTerminatableThread 的 run 方法中完成。该方法通过对 TerminationToken 的 toShutdown 属性和 reservations 属性的判断或者通过捕获由 interrupt 方法调用而抛出的异常来终止线程。并在线程终止前调用由 TerminatableThread 子类实现的 doCleanup 方法用于执行一些清理动作。

在执行阶段,由于 AbstractTerminatableThread.run 方法每次执行线程处理逻辑(通过调用 doRun 方法实现)前都先判断下 toShutdown 属性和 reservations 属性的值,在目标线程处理完其待处理的任务后(此时 reservations 属性的值为非正数)目标线程 run 方法也就退出了 while 循环。因此,线程的处理逻辑代码(doRun 方法)将不再被调用,从而使本案例在不使用 Two-phase Termination 模式的情况下停止目标线程存在的两个问题得以解决(目标线程停止前可以保证其处理完待处理的任务——发送队列中现有的告警信息到服务器)和规避(目标线程发送完队列中现有的告警信息后,doRun 方法不再被调用,从而避免了队列为空时 BlockingQueue.take 调用导致的阻塞)。

从上可知,准备阶段、执行阶段需要通过 TerminationToken 作为“中介”来协调二者的动作。TerminationToken 的源码如清单 3 所示:

清单 3. TerminationToken 源码

复制代码
public class TerminationToken {
// 使用 volatile 修饰,以保证无需显示锁的情况下该变量的内存可见性
protected volatile boolean toShutdown = false;
public final AtomicInteger reservations = new AtomicInteger(0);
public boolean isToShutdown() {
return toShutdown;
}
protected void setToShutdown(boolean toShutdown) {
this.toShutdown = true;
}
}

Two-phase Termination 模式的评价与实现考量

Two-phase Termination 模式使得我们可以对各种形式的目标线程进行优雅的停止。如目标线程调用了能够对 interrupt 方法调用作出响应的阻塞方法、目标线程调用了不能对 interrupt 方法调用作出响应的阻塞方法、目标线程作为消费者处理其它线程生产的“产品”在其停止前需要处理完现有“产品”等。Two-phase Termination 模式实现的线程停止可能出现延迟,即客户端代码调用完 ThreadOwner.shutdown 后,该线程可能仍在运行。

本文案例展示了一个可复用的 Two-phase Termination 模式实现代码。读者若要自行实现该模式,可能需要注意以下几个问题。

线程停止标志

本文案例使用了 TerminationToken 作为目标线程可以准备停止的标志。从清单 3 的代码我们可以看到,TerminationToken 使用了 toShutdown 这个 boolean 变量作为主要的停止标志,而非使用 Thread.isInterrupted()。这是因为,调用目标线程的 interrupt 方法无法保证目标线程的 isInterrupted() 方法返回值为 true:目标线程可能调用一些能够捕获 InterruptedException 而不保留线程中断状态的代码。另外,toShutdown 这个变量为了保证内存可见性而又能避免使用显式锁的开销,采用了 volatile 修饰。这点也很重要,笔者曾经见过一些采用 boolean 变量作为线程停止标志的代码,只是这些变量没有用 volatile 修饰,对其访问也没有加锁,这就可能无法停止目标线程。

生产者——消费者问题中的线程停止

在多线程编程中,许多问题和一些多线程编程模式都可以看作生产者——消费者问题。停止处于生产者——消费者问题中的线程,需要考虑更多的问题:需要注意线程的停止顺序,如果消费者线程比生产者线程先停止则会导致生产者生产的新”产品“无法被处理,而如果先停止生产者线程又可能使消费者线程处于空等待(如生产者消费者采用阻塞队列中转”产品“)。并且,停止消费者线程前是否考虑要等待其处理完所有待处理的任务或者将这些任务做个备份也是个问题。本文案例部分地展示生产者——消费者问题中线程停止的处理,其核心就是通过使用 TerminationToken 的 reservations 变量:生产者每”生产“一个产品,Two-phase Termination 模式的调用方代码要使 reservations 变量值增加 1(terminationToken.reservations.incrementAndGet());消费者线程每处理一个产品,Two-phase Termination 模式的调用方代码要使 reservations 变量值减少 1(terminationToken.reservations.decrementAndGet())。当然,在停止消费者线程时如果我们不关心其待处理的任务,Two-phase Termination 模式的调用方代码可以忽略对 reservations 变量的操作。清单 4 展示了一个完整的停止生产者——消费者问题中的线程的例子:

清单 4. 停止生产者——消费者问题中的线程的例子

复制代码
public class ProducerConsumerStop {
class SampleConsumer<P> {
private final BlockingQueue<P> queue = new LinkedBlockingQueue<P>();
private AbstractTerminatableThread workThread
= new AbstractTerminatableThread() {
@Override
protected void doRun() throws Exception {
terminationToken.reservations.decrementAndGet();
P product = queue.take();
// ...
System.out.println(product);
}
};
public void placeProduct(P product) {
if (workThread.terminationToken.isToShutdown()) {
throw new IllegalStateException("Thread shutdown");
}
try {
queue.put(product);
workThread.terminationToken.reservations.incrementAndGet();
} catch (InterruptedException e) {
}
}
public void shutdown() {
workThread.terminate();
}
public void start() {
workThread.start();
}
}
public void test() {
final SampleConsumer<String> aConsumer = new SampleConsumer<String>();
AbstractTerminatableThread aProducer = new AbstractTerminatableThread() {
private int i = 0;
@Override
protected void doRun() throws Exception {
aConsumer.placeProduct(String.valueOf(i));
}
@Override
protected void doCleanup(Exception cause) {
// 生产者线程停止完毕后再请求停止消费者线程
aConsumer.shutdown();
}
};
aProducer.start();
aConsumer.start();
}
}

隐藏而非暴露可停止的线程

为了保证可停止的线程不被其它代码误停止,一般我们将可停止线程隐藏在线程拥有者背后,而使系统中其它代码无法直接访问该线程,正如本案例代码(见清单 1)所展示:AlarmMgr 定义了一个 private 字段 alarmSendingThread 用于引用告警发送线程(可停止的线程),系统中的其它代码只能通过调用 AlarmMgr 的 shutdown 方法来请求该线程停止,而非通过引用该线程对象自身来停止它。

总结

本文介绍了 Two-phase Termination 模式的意图及架构。并结合笔者工作经历提供了一个实际的案例用于展示一个可复用的 Two-phase Termination 模式实现代码,在此基础上对该模式进行了评价并分享在实际运用该模式时需要注意的事项。

参考资源

作者简介

黄文海,有多年敏捷项目管理经验和丰富的技术指导经验。关注敏捷开发、Java 多线程编程和 Web 开发。在 InfoQ 中文站和 IBM DeveloperWorks 上发表过多篇文章。其博客: http://viscent.iteye.com/


感谢张龙对本文的审校。

给InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家通过新浪微博( @InfoQ )或者腾讯微博( @InfoQ )关注我们,并与我们的编辑和其他读者朋友交流。

2015-01-22 09:0111738

评论

发布
暂无评论
发现更多内容

Objective-C 语言基础知识:编写测试代码

测吧(北京)科技有限公司

测试

代码审查完整指南来了!

敏捷开发

编程 软件开发 代码审查 代码管理

SQLAlchemy 安装与配置指南

测吧(北京)科技有限公司

测试

某新员工大量使用Lambda表达式,老员工喷是炫技

源字节1号

小程序 开源 前端 后端

接入单元测试框架:原理与执行流程介绍

测吧(北京)科技有限公司

测试

为什么从 Demo 测试开始上手

测吧(北京)科技有限公司

测试

InfoQ对话天润融通CTO|AI时代,开发者的机遇与挑战?

天润融通

人工智能

基于快照的异步远程复制介绍

天翼云开发者社区

Java 前端 rbd

Java/OC 语言知识讲解:反射与回调

测吧(北京)科技有限公司

测试

没有它,你的Scrum无法实现!

敏捷开发

项目管理 Scrum 敏捷开发

etl 常用数据类型转换 元数据配置说明

weigeonlyyou

Go 大数据 物联网 ETL MySQL 8.0

抖音商品详情API实战指南:轻松获取并利用商品数据

tbapi

抖音商品数据采集 抖音API 抖音数据采集 抖音商品详情接口

Web 应用框架环境安装指南

测吧(北京)科技有限公司

测试

软件测试学习笔记丨接口请求体-form表单

测试人

软件测试

搭建安全测试演练环境:实操常见安全漏洞

测吧(北京)科技有限公司

测试

云灾备场景

天翼云开发者社区

公有云 容灾场景

官宣!玲珑项目升级,如意玲珑(Linyaps)全新启航!

nn-30

Linux 开源 操作系统 deepin deepin V23

京东按图搜索商品新视角:jd.item_search_img API返回值解读

技术冰糖葫芦

API 安全 API 文档 API 开发 API 协议

deepin Meetup 上海站回顾,揭秘如意玲珑(Linyaps)升级“内幕” | 附 PPT下载

nn-30

Linux 开源 操作系统 deepin deepin V23

AI大模型深度对比:腾讯混元大模型 VS 百川大模型

幂简集成

AI API 大模型

在 Xcode 中运行和调试单元测试:使用 Debug 和日志

测吧(北京)科技有限公司

测试

S3基准测试工具 - Warp使用简介

天翼云开发者社区

测试 S3 开源云工具

天润融通用小改进,盘活大资产

天润融通

人工智能 天润融通

rbd常用的配置参数

天翼云开发者社区

rbd 配置参数

为什么要构建小程序生态

Geek_2305a8

Python循环控制

不在线第一只蜗牛

Python

性能测试:行业流行性能剖析工具介绍

测吧(北京)科技有限公司

测试

什么是云抄表?

源字节1号

小程序 开源 后端‘’

测试平台环境配置指南

测吧(北京)科技有限公司

测试

C 语言中的 sscanf 详解

EquatorCoco

MySQL 数据库 C语言

Java多线程编程模式实战指南(三):Two-phase Termination模式_Java_黄文海_InfoQ精选文章