QCon 演讲火热征集中,快来分享技术实践与洞见! 了解详情
写点什么

Java 多线程编程模式实战指南一:Active Object 模式(上)

  • 2014-11-21
  • 本文字数:9157 字

    阅读完需:约 30 分钟

Active Object 模式简介

Active Object 模式是一种异步编程模式。它通过对方法的调用与方法的执行进行解耦来提高并发性。若以任务的概念来说,Active Object 模式的核心则是它允许任务的提交(相当于对异步方法的调用)和任务的执行(相当于异步方法的真正执行)分离。这有点类似于 System.gc() 这个方法:客户端代码调用完 gc() 后,一个进行垃圾回收的任务被提交,但此时 JVM 并不一定进行了垃圾回收,而可能是在 gc() 方法调用返回后的某段时间才开始执行任务——回收垃圾。我们知道,System.gc() 的调用方代码是运行在自己的线程上(通常是 main 线程派生的子线程),而 JVM 的垃圾回收这个动作则由专门的线程(垃圾回收线程)来执行的。换言之,System.gc() 这个方法所代表的动作(其所定义的功能)的调用方和执行方是运行在不同的线程中的,从而提高了并发性。

再进一步介绍 Active Object 模式,我们可先简单地将其核心理解为一个名为 ActiveObject 的类,该类对外暴露了一些异步方法,如图 1 所示。

图 1. ActiveObject 对象示例

doSomething 方法的调用方和执行方运行在各自的线程上。在并发的环境下,doSomething 方法会被多个线程调用。这时所需的线程安全控制封装在 doSomething 方法背后,使得调用方代码无需关心这点,从而简化了调用方代码:从调用方代码来看,调用一个 Active Object 对象的方法与调用普通 Java 对象的方法并无太大差别。如清单 1 所示。

清单 1. Active Object 方法调用示例

复制代码
ActiveObject ao=...;
Future<string> future = ao.doSomething("data");
// 执行其它操作
String result = future.get();
System.out.println(result);
</string>

Active Object 模式的架构

当 Active Object 模式对外暴露的异步方法被调用时,与该方法调用相关的上下文信息,包括被调用的异步方法名(或其代表的操作)、调用方代码所传递的参数等,会被封装成一个对象。该对象被称为方法请求(Method Request)。方法请求对象会被存入 Active Object 模式所维护的缓冲区(Activation Queue)中,并由专门的工作线程负责根据其包含的上下文信息执行相应的操作。也就是说,方法请求对象是由运行调用方代码的线程通过调用 Active Object 模式对外暴露的异步方法生成的,而方法请求所代表的操作则由专门的线程来执行,从而实现了方法的调用与执行的分离,产生了并发。

Active Object 模式的主要参与者有以下几种。其类图如图 2 所示。

图 2. Active Object 模式的类图

(点击图像放大)

  • Proxy:负责对外暴露异步方法接口。当调用方代码调用该参与者实例的异步方法 doSomething 时,该方法会生成一个相应的 MethodRequest 实例并将其存储到 Scheduler 所维护的缓冲区中。doSomething 方法的返回值是一个表示其执行结果的外包装对象:Future 参与者的实例。异步方法 doSomething 运行在调用方代码所在的线程中。
  • MethodRequest:负责将调用方代码对 Proxy 实例的异步方法的调用封装为一个对象。该对象保留了异步方法的名称及调用方代码传递的参数等上下文信息。它使得将 Proxy 的异步方法的调用和执行分离成为可能。其 call 方法会根据其所包含上下文信息调用 Servant 实例的相应方法。
  • ActivationQueue:负责临时存储由 Proxy 的异步方法被调用时所创建的 MethodRequest 实例的缓冲区。
  • Scheduler:负责将 Proxy 的异步方法所创建的 MethodRequest 实例存入其维护的缓冲区中。并根据一定的调度策略,对其维护的缓冲区中的 MethodRequest 实例进行执行。其调度策略可以根据实际需要来定,如 FIFO、LIFO 和根据 MethodRequest 中包含的信息所定的优先级等。
  • Servant:负责对 Proxy 所暴露的异步方法的具体实现。
  • Future:负责存储和返回 Active Object 异步方法的执行结果。

Active Object 模式的序列图如图 3 所示。

图 3. Active Object 模式的序列图

(点击图像放大)

第1 步:调用方代码调用Proxy 的异步方法doSomething。

第2~7 步:doSomething 方法创建Future 实例作为该方法的返回值。并将调用方代码对该方法的调用封装为MethodRequest 对象。然后以所创建的MethodRequest 对象作为参数调用Scheduler 的enqueue 方法,以将MethodRequest 对象存入缓冲区。Scheduler 的enqueue 方法会调用Scheduler 所维护的ActivationQueue 实例的enqueue 方法,将MethodRequest 对象存入缓冲区。

第8 步:doSomething 返回其所创建的Future 实例。

第9 步:Scheduler 实例采用专门的工作线程运行dispatch 方法。

第10~12 步:dispatch 方法调用ActivationQueue 实例的dequeue 方法,获取一个MethodRequest 对象。然后调用MethodRequest 对象的call 方法

第13~16 步:MethodRequest 对象的call 方法调用与其关联的Servant 实例的相应方法doSomething。并将Servant.doSomething 方法的返回值设置到Future 实例上。

第17 步:MethodRequest 对象的call 方法返回。

上述步骤中,第1~8 步是运行在Active Object 的调用者线程中的,这几个步骤实现了将调用方代码对Active Object 所提供的异步方法的调用封装成对象(Method Request),并将其存入缓冲区。这几个步骤实现了任务的提交。第9~17 步是运行在Active Object 的工作线程中,这些步骤实现从缓冲区中读取Method Request,并对其进行执行,实现了任务的执行。从而实现了Active Object 对外暴露的异步方法的调用与执行的分离。

如果调用方代码关心Active Object 的异步方法的返回值,则可以在其需要时,调用Future 实例的get 方法来获得异步方法的真正执行结果。

Active Object 模式实战案例

某电信软件有一个彩信短号模块。其主要功能是实现手机用户给其它手机用户发送彩信时,接收方号码可以填写为对方的短号。例如,用户 13612345678 给其同事 13787654321 发送彩信时,可以将接收方号码填写为对方的短号,如 776,而非其真实的号码。

该模块处理其接收到的下发彩信请求的一个关键操作是查询数据库以获得接收方短号对应的真实号码(长号)。该操作可能因为数据库故障而失败,从而使整个请求无法继续被处理。而数据库故障是可恢复的故障,因此在短号转换为长号的过程中如果出现数据库异常,可以先将整个下发彩信请求消息缓存到磁盘中,等到数据库恢复后,再从磁盘中读取请求消息,进行重试。为方便起见,我们可以通过 Java 的对象序列化 API,将表示下发彩信的对象序列化到磁盘文件中从而实现请求缓存。下面我们讨论这个请求缓存操作还需要考虑的其它因素,以及 Active Object 模式如何帮助我们满足这些考虑。

首先,请求消息缓存到磁盘中涉及文件 I/O 这种慢的操作,我们不希望它在请求处理的主线程(即 Web 服务器的工作线程)中执行。因为这样会使该模块的响应延时增大,降低系统的响应性。并使得 Web 服务器的工作线程因等待文件 I/O 而降低了系统的吞吐量。这时,异步处理就派上用场了。Active Object 模式可以帮助我们实现请求缓存这个任务的提交和执行分离:任务的提交是在 Web 服务器的工作线程中完成,而任务的执行(包括序列化对象到磁盘文件中等操作)则是在 Active Object 工作线程中执行。这样,请求处理的主线程在侦测到短号转长号失败时即可以触发对当前彩信下发请求进行缓存,接着继续其请求处理,如给客户端响应。而此时,当前请求消息可能正在被 Active Object 线程缓存到文件中。如图 4 所示。

图 4 . 异步实现缓存

其次,每个短号转长号失败的彩信下发请求消息会被缓存为一个磁盘文件。但我们不希望这些缓存文件被存在同一个子目录下。而是希望多个缓存文件会被存储到多个子目录中。每个子目录最多可以存储指定个数(如 2000 个)的缓存文件。若当前子目录已存满,则新建一个子目录存放新的缓存文件,直到该子目录也存满,依此类推。当这些子目录的个数到达指定数量(如 100 个)时,最老的子目录(连同其下的缓存文件,如果有的话)会被删除。从而保证子目录的个数也是固定的。显然,在并发环境下,实现这种控制需要一些并发访问控制(如通过锁来控制),但是我们不希望这种控制暴露给处理请求的其它代码。而 Active Object 模式中的 Proxy 参与者可以帮助我们封装并发访问控制。

下面,我们看该案例的相关代码通过应用 Active Object 模式在实现缓存功能时满足上述两个目标。首先看请求处理的入口类。该类就是本案例的 Active Object 模式的客调用方代码。如清单 2 所示。

清单 2. 彩信下发请求处理的入口类

复制代码
public class MMSDeliveryServlet extends HttpServlet {
private static final long serialVersionUID = 5886933373599895099L;
@Override
public void doPost(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
// 将请求中的数据解析为内部对象
MMSDeliverRequest mmsDeliverReq = this.parseRequest(req.getInputStream());
Recipient shortNumberRecipient = mmsDeliverReq.getRecipient();
Recipient originalNumberRecipient = null;
try {
// 将接收方短号转换为长号
originalNumberRecipient = convertShortNumber(shortNumberRecipient);
} catch (SQLException e) {
// 接收方短号转换为长号时发生数据库异常,触发请求消息的缓存
AsyncRequestPersistence.getInstance().store(mmsDeliverReq);
// 继续对当前请求的其它处理,如给客户端响应
resp.setStatus(202);
}
}
private MMSDeliverRequest parseRequest(InputStream reqInputStream) {
MMSDeliverRequest mmsDeliverReq = new MMSDeliverRequest();
// 省略其它代码
return mmsDeliverReq;
}
private Recipient convertShortNumber(Recipient shortNumberRecipient)
throws SQLException {
Recipient recipent = null;
// 省略其它代码
return recipent;
}
}

清单 2 中的 doPost 方法在侦测到短号转换过程中发生的数据库异常后,通过调用 AsyncRequestPersistence 类的 store 方法触发对彩信下发请求消息的缓存。这里,AsyncRequestPersistence 类相当于 Active Object 模式中的 Proxy 参与者。尽管本案例涉及的是一个并发环境,但从清单 2 中的代码可见,AsyncRequestPersistence 类的调用方代码无需处理多线程同步问题。这是因为多线程同步问题被封装在 AsyncRequestPersistence 类之后。

AsyncRequestPersistence 类的代码如清单 3 所示。

清单 3. 彩信下发请求缓存入口类(Active Object 模式的 Proxy)

复制代码
// ActiveObjectPattern.Proxy
public class AsyncRequestPersistence implements RequestPersistence {
private static final long ONE_MINUTE_IN_SECONDS = 60;
private final Logger logger;
private final AtomicLong taskTimeConsumedPerInterval = new AtomicLong(0);
private final AtomicInteger requestSubmittedPerIterval = new AtomicInteger(0);
// ActiveObjectPattern.Servant
private final DiskbasedRequestPersistence
delegate = new DiskbasedRequestPersistence();
// ActiveObjectPattern.Scheduler
private final ThreadPoolExecutor scheduler;
private static class InstanceHolder {
final static RequestPersistence INSTANCE = new AsyncRequestPersistence();
}
private AsyncRequestPersistence() {
logger = Logger.getLogger(AsyncRequestPersistence.class);
scheduler = new ThreadPoolExecutor(1, 3,
60 * ONE_MINUTE_IN_SECONDS,
TimeUnit.SECONDS,
// ActiveObjectPattern.ActivationQueue
new LinkedBlockingQueue<runnable>(200),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t;
t = new Thread(r, "AsyncRequestPersistence");
return t;
}
});
scheduler.setRejectedExecutionHandler(
new ThreadPoolExecutor.DiscardOldestPolicy());
// 启动队列监控定时任务
Timer monitorTimer = new Timer(true);
monitorTimer.scheduleAtFixedRate(
new TimerTask() {
@Override
public void run() {
if (logger.isInfoEnabled()) {
logger.info("task count:"
+ requestSubmittedPerIterval
+ ",Queue size:"
+ scheduler.getQueue().size()
+ ",taskTimeConsumedPerInterval:"
+ taskTimeConsumedPerInterval.get()
+ " ms");
}
taskTimeConsumedPerInterval.set(0);
requestSubmittedPerIterval.set(0);
}
}, 0, ONE_MINUTE_IN_SECONDS * 1000);
}
public static RequestPersistence getInstance() {
return InstanceHolder.INSTANCE;
}
@Override
public void store(final MMSDeliverRequest request) {
/*
* 将对 store 方法的调用封装成 MethodRequest 对象, 并存入缓冲区。
*/
// ActiveObjectPattern.MethodRequest
Callable<boolean> methodRequest = new Callable<boolean>() {
@Override
public Boolean call() throws Exception {
long start = System.currentTimeMillis();
try {
delegate.store(request);
} finally {
taskTimeConsumedPerInterval.addAndGet(
System.currentTimeMillis() - start);
}
return Boolean.TRUE;
}
};
scheduler.submit(methodRequest);
requestSubmittedPerIterval.incrementAndGet();
}
}
</boolean></boolean></runnable>

AsyncRequestPersistence 类所实现的接口 RequestPersistence 定义了 Active Object 对外暴露的异步方法:store 方法。由于本案例不关心请求缓存的结果,故该方法没有返回值。其代码如清单 4 所示。

清单 4. RequestPersistence 接口源码

复制代码
public interface RequestPersistence {
void store(MMSDeliverRequest request);
}

AsyncRequestPersistence 类的实例变量 scheduler 相当于 Active Object 模式中的 Scheduler 参与者实例。这里我们直接使用了 JDK1.5 引入的 Executor Framework 中的 ThreadPoolExecutor。在 ThreadPoolExecutor 类的实例化时,其构造器的第 5 个参数(BlockingQueue workQueue)我们指定了一个有界阻塞队列:new LinkedBlockingQueue(200)。该队列相当于 Active Object 模式中的 ActivationQueue 参与者实例。

AsyncRequestPersistence 类的实例变量 delegate 相当于 Active Object 模式中的 Servant 参与者实例。

AsyncRequestPersistence 类的 store 方法利用匿名类生成一个 java.util.concurrent.Callable 实例 methodRequest。该实例相当于 Active Object 模式中的 MethodRequest 参与者实例。利用闭包(Closure),该实例封装了对 store 方法调用的上下文信息(包括调用参数、所调用的方法对应的操作信息)。AsyncRequestPersistence 类的 store 方法通过调用 scheduler 的 submit 方法,将 methodRequest 送入 ThreadPoolExecutor 所维护的缓冲区(阻塞队列)中。确切地说,ThreadPoolExecutor 是 Scheduler 参与者的一个“近似”实现。ThreadPoolExecutor 的 submit 方法相对于 Scheduler 的 enqueue 方法,该方法用于接纳 MethodRequest 对象,以将其存入缓冲区。当 ThreadPoolExecutor 当前使用的线程数量小于其核心线程数量时,submit 方法所接收的任务会直接被新建的线程执行。当 ThreadPoolExecutor 当前使用的线程数量大于其核心线程数时,submit 方法所接收的任务才会被存入其维护的阻塞队列中。不过,ThreadPoolExecutor 的这种任务处理机制,并不妨碍我们将它用作 Scheduler 的实现。

methodRequest 的 call 方法会调用 delegate 的 store 方法来真正实现请求缓存功能。delegate 实例对应的类 DiskbasedRequestPersistence 是请求消息缓存功能的真正实现者。其代码如清单 5 所示。

清单 5. DiskbasedRequestPersistence 类的源码

复制代码
public class DiskbasedRequestPersistence implements RequestPersistence {
// 负责缓存文件的存储管理
private final SectionBasedDiskStorage storage = new SectionBasedDiskStorage();
private final Logger logger = Logger
.getLogger(DiskbasedRequestPersistence.class);
@Override
public void store(MMSDeliverRequest request) {
// 申请缓存文件的文件名
String[] fileNameParts = storage.apply4Filename(request);
File file = new File(fileNameParts[0]);
try {
ObjectOutputStream objOut = new ObjectOutputStream(
new FileOutputStream(file));
try {
objOut.writeObject(request);
} finally {
objOut.close();
}
} catch (FileNotFoundException e) {
storage.decrementSectionFileCount(fileNameParts[1]);
logger.error("Failed to store request", e);
} catch (IOException e) {
storage.decrementSectionFileCount(fileNameParts[1]);
logger.error("Failed to store request", e);
}
}
class SectionBasedDiskStorage {
private Deque<string> sectionNames = new LinkedList<string>();
/*
* Key->value: 存储子目录名 -> 子目录下缓存文件计数器
*/
private Map<string atomicinteger=""> sectionFileCountMap
= new HashMap<string atomicinteger="">();
private int maxFilesPerSection = 2000;
private int maxSectionCount = 100;
private String storageBaseDir = System.getProperty("user.dir") + "/vpn";
private final Object sectionLock = new Object();
public String[] apply4Filename(MMSDeliverRequest request) {
String sectionName;
int iFileCount;
boolean need2RemoveSection = false;
String[] fileName = new String[2];
synchronized (sectionLock) {
// 获取当前的存储子目录名
sectionName = this.getSectionName();
AtomicInteger fileCount;
fileCount = sectionFileCountMap.get(sectionName);
iFileCount = fileCount.get();
// 当前存储子目录已满
if (iFileCount >= maxFilesPerSection) {
if (sectionNames.size() >= maxSectionCount) {
need2RemoveSection = true;
}
// 创建新的存储子目录
sectionName = this.makeNewSectionDir();
fileCount = sectionFileCountMap.get(sectionName);
}
iFileCount = fileCount.addAndGet(1);
}
fileName[0] = storageBaseDir + "/" + sectionName + "/"
+ new DecimalFormat("0000").format(iFileCount) + "-"
+ request.getTimeStamp().getTime() / 1000 + "-"
+ request.getExpiry()
+ ".rq";
fileName[1] = sectionName;
if (need2RemoveSection) {
// 删除最老的存储子目录
String oldestSectionName = sectionNames.removeFirst();
this.removeSection(oldestSectionName);
}
return fileName;
}
public void decrementSectionFileCount(String sectionName) {
AtomicInteger fileCount = sectionFileCountMap.get(sectionName);
if (null != fileCount) {
fileCount.decrementAndGet();
}
}
private boolean removeSection(String sectionName) {
boolean result = true;
File dir = new File(storageBaseDir + "/" + sectionName);
for (File file : dir.listFiles()) {
result = result && file.delete();
}
result = result && dir.delete();
return result;
}
private String getSectionName() {
String sectionName;
if (sectionNames.isEmpty()) {
sectionName = this.makeNewSectionDir();
} else {
sectionName = sectionNames.getLast();
}
return sectionName;
}
private String makeNewSectionDir() {
String sectionName;
SimpleDateFormat sdf = new SimpleDateFormat("MMddHHmmss");
sectionName = sdf.format(new Date());
File dir = new File(storageBaseDir + "/" + sectionName);
if (dir.mkdir()) {
sectionNames.addLast(sectionName);
sectionFileCountMap.put(sectionName, new AtomicInteger(0));
} else {
throw new RuntimeException(
"Cannot create section dir " + sectionName);
}
return sectionName;
}
}
}
</string>,></string>,></string></string>

methodRequest 的 call 方法的调用者代码是运行在 ThreadPoolExecutor 所维护的工作者线程中,这就保证了 store 方法的调用方和真正的执行方是分别运行在不同的线程中:服务器工作线程负责触发请求消息缓存,ThreadPoolExecutor 所维护的工作线程负责将请求消息序列化到磁盘文件中。

DiskbasedRequestPersistence 类的 store 方法中调用的 SectionBasedDiskStorage 类的 apply4Filename 方法包含了一些多线程同步控制代码(见清单 5)。这部分控制由于是封装在 DiskbasedRequestPersistence 的内部类中,对于该类之外的代码是不可见的。因此,AsyncRequestPersistence 的调用方代码无法知道该细节,这体现了 Active Object 模式对并发访问控制的封装。

小结

本篇介绍了 Active Object 模式的意图及架构,并以一个实际的案例展示了该模式的代码实现。下篇将对 Active Object 模式进行评价,并结合本文案例介绍实际运用 Active Object 模式时需要注意的一些事项。


感谢张龙对本文的审校。

给InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家通过新浪微博( @InfoQ )或者腾讯微博( @InfoQ )关注我们,并与我们的编辑和其他读者朋友交流。

2014-11-21 09:4523995

评论

发布
暂无评论
发现更多内容

海外云手机解决IP、成本、稳定性问题

Ogcloud

云手机 海外云手机 云手机海外版 海外原生IP 海外IP

大咖领衔,2天AI创业创收训练营即刻启程!不要错过,速来占位!

霍格沃兹测试开发学社

低代码开发应用:确保数字化项目成功的5个技巧

不在线第一只蜗牛

低代码 数字化

InfoQ精选 | 10款项目管理利器助力企业效率提升

爱吃小舅的鱼

项目管理 项目管理工具

使用豆包Marscode 创建了一个”天气预报“小应用

豆包MarsCode

Python 人工智能 程序员 AI 项目

mac苹果电脑游戏推荐:暗黑2:毁灭之王 for Mac(含各职业存档)

你的猪会飞吗

Mac游戏下载 Mac游戏推荐

一条SQL语句在MySQL中是如何执行的?

快乐非自愿限量之名

MySQL 数据库 sql

SaaS业务架构:业务能力分析

不在线第一只蜗牛

架构 SaaS

“数据思维人才培养论坛” 于大湾区大学举行,和鲸科技受邀共话产教创新路径

ModelWhale

人工智能 大数据 人才培养 高等教育

如何免费调用有道翻译API实现多语言翻译

幂简集成

翻译软件 API

漆包线工厂生产管理MES系统功能介绍

万界星空科技

mes 万界星空科技 漆包线mes 铜线mes 漆包线

谷歌发布新 RL 方法,性能提升巨大;苹果前设计总监正与 OpenAI 合作开发 AI 设备丨 RTE 开发者日报

声网

BPM(业务流程管理)的最佳开源工具

NocoBase

开源 项目管理 低代码 BPM 无代码

如何确定性能测试指标

老张

软件测试 性能测试 技术指标 高性能高可用

记一次 RabbitMQ 消费者莫名消失问题的排查

EquatorCoco

Rabbit MQ

中国可观测日「成都站」圆满落幕

观测云

可观测性

Yihong,从多元职业到代码之路 | MarsCoders 开发者说

豆包MarsCode

Python 人工智能 编程 程序员 AI

NetFlow Analyzer:精准流量洞察,引领网络安全新纪元

Geek_a83400

【XIAOJUSURVEY& 北大】实现数据导出的前后端全流程

XIAOJUSURVEY

数据分析 Vue Node 问卷 数据导出

BOE(京东方)携故宫博物院举办2024“照亮成长路”公益项目落地仪式以创新科技赋能教育可持续发展

科技汇

精彩回顾|博睿数据Bonree ONE 3.0产品发布会圆满落幕:三城联动 共襄盛举!

博睿数据

座无虚席!首期流程挖掘实践训练营火爆收官

望繁信科技

数字化转型 流程挖掘 流程资产 流程智能 望繁信科技

振动韧性与智能的双翼,让数智金融飞向未来之屿

脑极体

AI

Facebook养号与推广技巧

Ogcloud

facebook 云手机 海外云手机 FB推广 FB引流

软件项目全套资料、全方案、源码梳理清单

金陵老街

开发文档 软件文档 实施文档 运维文档

MES管理系统助力企业车间管理可视化

万界星空科技

数字化转型 mes 可视化大屏 万界星空科技 生产可视化

IPQ9574 and IPQ9578 details - The core debate of the Wi-Fi 7 era

wifi6-yiyi

参与滴滴开源项目,获得精美礼品

XIAOJUSURVEY

GitHub 开源 活动 PR Issue

反DDD模式之“复用”

快乐非自愿限量之名

DDD

荣誉加冕|数造科技荣获“2024爱分析·数据智能优秀厂商”

数造万象

人工智能 大数据 敏捷开发 智能化 大模型

直播标准权威发布,阿里云RTS获首批卓越级评估认证

阿里云CloudImagine

云计算 音视频 视频云 超低延时直播

Java多线程编程模式实战指南一:Active Object模式(上)_Java_黄文海_InfoQ精选文章