Active Object 模式简介
Active Object 模式是一种异步编程模式。它通过对方法的调用与方法的执行进行解耦来提高并发性。若以任务的概念来说,Active Object 模式的核心则是它允许任务的提交(相当于对异步方法的调用)和任务的执行(相当于异步方法的真正执行)分离。这有点类似于 System.gc() 这个方法:客户端代码调用完 gc() 后,一个进行垃圾回收的任务被提交,但此时 JVM 并不一定进行了垃圾回收,而可能是在 gc() 方法调用返回后的某段时间才开始执行任务——回收垃圾。我们知道,System.gc() 的调用方代码是运行在自己的线程上(通常是 main 线程派生的子线程),而 JVM 的垃圾回收这个动作则由专门的线程(垃圾回收线程)来执行的。换言之,System.gc() 这个方法所代表的动作(其所定义的功能)的调用方和执行方是运行在不同的线程中的,从而提高了并发性。
再进一步介绍 Active Object 模式,我们可先简单地将其核心理解为一个名为 ActiveObject 的类,该类对外暴露了一些异步方法,如图 1 所示。
图 1. ActiveObject 对象示例
doSomething 方法的调用方和执行方运行在各自的线程上。在并发的环境下,doSomething 方法会被多个线程调用。这时所需的线程安全控制封装在 doSomething 方法背后,使得调用方代码无需关心这点,从而简化了调用方代码:从调用方代码来看,调用一个 Active Object 对象的方法与调用普通 Java 对象的方法并无太大差别。如清单 1 所示。
清单 1. Active Object 方法调用示例
ActiveObject ao=...; Future<string> future = ao.doSomething("data"); // 执行其它操作 String result = future.get(); System.out.println(result); </string>
Active Object 模式的架构
当 Active Object 模式对外暴露的异步方法被调用时,与该方法调用相关的上下文信息,包括被调用的异步方法名(或其代表的操作)、调用方代码所传递的参数等,会被封装成一个对象。该对象被称为方法请求(Method Request)。方法请求对象会被存入 Active Object 模式所维护的缓冲区(Activation Queue)中,并由专门的工作线程负责根据其包含的上下文信息执行相应的操作。也就是说,方法请求对象是由运行调用方代码的线程通过调用 Active Object 模式对外暴露的异步方法生成的,而方法请求所代表的操作则由专门的线程来执行,从而实现了方法的调用与执行的分离,产生了并发。
Active Object 模式的主要参与者有以下几种。其类图如图 2 所示。
图 2. Active Object 模式的类图
(点击图像放大)
- Proxy:负责对外暴露异步方法接口。当调用方代码调用该参与者实例的异步方法 doSomething 时,该方法会生成一个相应的 MethodRequest 实例并将其存储到 Scheduler 所维护的缓冲区中。doSomething 方法的返回值是一个表示其执行结果的外包装对象:Future 参与者的实例。异步方法 doSomething 运行在调用方代码所在的线程中。
- MethodRequest:负责将调用方代码对 Proxy 实例的异步方法的调用封装为一个对象。该对象保留了异步方法的名称及调用方代码传递的参数等上下文信息。它使得将 Proxy 的异步方法的调用和执行分离成为可能。其 call 方法会根据其所包含上下文信息调用 Servant 实例的相应方法。
- ActivationQueue:负责临时存储由 Proxy 的异步方法被调用时所创建的 MethodRequest 实例的缓冲区。
- Scheduler:负责将 Proxy 的异步方法所创建的 MethodRequest 实例存入其维护的缓冲区中。并根据一定的调度策略,对其维护的缓冲区中的 MethodRequest 实例进行执行。其调度策略可以根据实际需要来定,如 FIFO、LIFO 和根据 MethodRequest 中包含的信息所定的优先级等。
- Servant:负责对 Proxy 所暴露的异步方法的具体实现。
- Future:负责存储和返回 Active Object 异步方法的执行结果。
Active Object 模式的序列图如图 3 所示。
图 3. Active Object 模式的序列图
(点击图像放大)
第1 步:调用方代码调用Proxy 的异步方法doSomething。
第2~7 步:doSomething 方法创建Future 实例作为该方法的返回值。并将调用方代码对该方法的调用封装为MethodRequest 对象。然后以所创建的MethodRequest 对象作为参数调用Scheduler 的enqueue 方法,以将MethodRequest 对象存入缓冲区。Scheduler 的enqueue 方法会调用Scheduler 所维护的ActivationQueue 实例的enqueue 方法,将MethodRequest 对象存入缓冲区。
第8 步:doSomething 返回其所创建的Future 实例。
第9 步:Scheduler 实例采用专门的工作线程运行dispatch 方法。
第10~12 步:dispatch 方法调用ActivationQueue 实例的dequeue 方法,获取一个MethodRequest 对象。然后调用MethodRequest 对象的call 方法
第13~16 步:MethodRequest 对象的call 方法调用与其关联的Servant 实例的相应方法doSomething。并将Servant.doSomething 方法的返回值设置到Future 实例上。
第17 步:MethodRequest 对象的call 方法返回。
上述步骤中,第1~8 步是运行在Active Object 的调用者线程中的,这几个步骤实现了将调用方代码对Active Object 所提供的异步方法的调用封装成对象(Method Request),并将其存入缓冲区。这几个步骤实现了任务的提交。第9~17 步是运行在Active Object 的工作线程中,这些步骤实现从缓冲区中读取Method Request,并对其进行执行,实现了任务的执行。从而实现了Active Object 对外暴露的异步方法的调用与执行的分离。
如果调用方代码关心Active Object 的异步方法的返回值,则可以在其需要时,调用Future 实例的get 方法来获得异步方法的真正执行结果。
Active Object 模式实战案例
某电信软件有一个彩信短号模块。其主要功能是实现手机用户给其它手机用户发送彩信时,接收方号码可以填写为对方的短号。例如,用户 13612345678 给其同事 13787654321 发送彩信时,可以将接收方号码填写为对方的短号,如 776,而非其真实的号码。
该模块处理其接收到的下发彩信请求的一个关键操作是查询数据库以获得接收方短号对应的真实号码(长号)。该操作可能因为数据库故障而失败,从而使整个请求无法继续被处理。而数据库故障是可恢复的故障,因此在短号转换为长号的过程中如果出现数据库异常,可以先将整个下发彩信请求消息缓存到磁盘中,等到数据库恢复后,再从磁盘中读取请求消息,进行重试。为方便起见,我们可以通过 Java 的对象序列化 API,将表示下发彩信的对象序列化到磁盘文件中从而实现请求缓存。下面我们讨论这个请求缓存操作还需要考虑的其它因素,以及 Active Object 模式如何帮助我们满足这些考虑。
首先,请求消息缓存到磁盘中涉及文件 I/O 这种慢的操作,我们不希望它在请求处理的主线程(即 Web 服务器的工作线程)中执行。因为这样会使该模块的响应延时增大,降低系统的响应性。并使得 Web 服务器的工作线程因等待文件 I/O 而降低了系统的吞吐量。这时,异步处理就派上用场了。Active Object 模式可以帮助我们实现请求缓存这个任务的提交和执行分离:任务的提交是在 Web 服务器的工作线程中完成,而任务的执行(包括序列化对象到磁盘文件中等操作)则是在 Active Object 工作线程中执行。这样,请求处理的主线程在侦测到短号转长号失败时即可以触发对当前彩信下发请求进行缓存,接着继续其请求处理,如给客户端响应。而此时,当前请求消息可能正在被 Active Object 线程缓存到文件中。如图 4 所示。
图 4 . 异步实现缓存
其次,每个短号转长号失败的彩信下发请求消息会被缓存为一个磁盘文件。但我们不希望这些缓存文件被存在同一个子目录下。而是希望多个缓存文件会被存储到多个子目录中。每个子目录最多可以存储指定个数(如 2000 个)的缓存文件。若当前子目录已存满,则新建一个子目录存放新的缓存文件,直到该子目录也存满,依此类推。当这些子目录的个数到达指定数量(如 100 个)时,最老的子目录(连同其下的缓存文件,如果有的话)会被删除。从而保证子目录的个数也是固定的。显然,在并发环境下,实现这种控制需要一些并发访问控制(如通过锁来控制),但是我们不希望这种控制暴露给处理请求的其它代码。而 Active Object 模式中的 Proxy 参与者可以帮助我们封装并发访问控制。
下面,我们看该案例的相关代码通过应用 Active Object 模式在实现缓存功能时满足上述两个目标。首先看请求处理的入口类。该类就是本案例的 Active Object 模式的客调用方代码。如清单 2 所示。
清单 2. 彩信下发请求处理的入口类
public class MMSDeliveryServlet extends HttpServlet { private static final long serialVersionUID = 5886933373599895099L; @Override public void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { // 将请求中的数据解析为内部对象 MMSDeliverRequest mmsDeliverReq = this.parseRequest(req.getInputStream()); Recipient shortNumberRecipient = mmsDeliverReq.getRecipient(); Recipient originalNumberRecipient = null; try { // 将接收方短号转换为长号 originalNumberRecipient = convertShortNumber(shortNumberRecipient); } catch (SQLException e) { // 接收方短号转换为长号时发生数据库异常,触发请求消息的缓存 AsyncRequestPersistence.getInstance().store(mmsDeliverReq); // 继续对当前请求的其它处理,如给客户端响应 resp.setStatus(202); } } private MMSDeliverRequest parseRequest(InputStream reqInputStream) { MMSDeliverRequest mmsDeliverReq = new MMSDeliverRequest(); // 省略其它代码 return mmsDeliverReq; } private Recipient convertShortNumber(Recipient shortNumberRecipient) throws SQLException { Recipient recipent = null; // 省略其它代码 return recipent; } }
清单 2 中的 doPost 方法在侦测到短号转换过程中发生的数据库异常后,通过调用 AsyncRequestPersistence 类的 store 方法触发对彩信下发请求消息的缓存。这里,AsyncRequestPersistence 类相当于 Active Object 模式中的 Proxy 参与者。尽管本案例涉及的是一个并发环境,但从清单 2 中的代码可见,AsyncRequestPersistence 类的调用方代码无需处理多线程同步问题。这是因为多线程同步问题被封装在 AsyncRequestPersistence 类之后。
AsyncRequestPersistence 类的代码如清单 3 所示。
清单 3. 彩信下发请求缓存入口类(Active Object 模式的 Proxy)
// ActiveObjectPattern.Proxy public class AsyncRequestPersistence implements RequestPersistence { private static final long ONE_MINUTE_IN_SECONDS = 60; private final Logger logger; private final AtomicLong taskTimeConsumedPerInterval = new AtomicLong(0); private final AtomicInteger requestSubmittedPerIterval = new AtomicInteger(0); // ActiveObjectPattern.Servant private final DiskbasedRequestPersistence delegate = new DiskbasedRequestPersistence(); // ActiveObjectPattern.Scheduler private final ThreadPoolExecutor scheduler; private static class InstanceHolder { final static RequestPersistence INSTANCE = new AsyncRequestPersistence(); } private AsyncRequestPersistence() { logger = Logger.getLogger(AsyncRequestPersistence.class); scheduler = new ThreadPoolExecutor(1, 3, 60 * ONE_MINUTE_IN_SECONDS, TimeUnit.SECONDS, // ActiveObjectPattern.ActivationQueue new LinkedBlockingQueue<runnable>(200), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t; t = new Thread(r, "AsyncRequestPersistence"); return t; } }); scheduler.setRejectedExecutionHandler( new ThreadPoolExecutor.DiscardOldestPolicy()); // 启动队列监控定时任务 Timer monitorTimer = new Timer(true); monitorTimer.scheduleAtFixedRate( new TimerTask() { @Override public void run() { if (logger.isInfoEnabled()) { logger.info("task count:" + requestSubmittedPerIterval + ",Queue size:" + scheduler.getQueue().size() + ",taskTimeConsumedPerInterval:" + taskTimeConsumedPerInterval.get() + " ms"); } taskTimeConsumedPerInterval.set(0); requestSubmittedPerIterval.set(0); } }, 0, ONE_MINUTE_IN_SECONDS * 1000); } public static RequestPersistence getInstance() { return InstanceHolder.INSTANCE; } @Override public void store(final MMSDeliverRequest request) { /* * 将对 store 方法的调用封装成 MethodRequest 对象, 并存入缓冲区。 */ // ActiveObjectPattern.MethodRequest Callable<boolean> methodRequest = new Callable<boolean>() { @Override public Boolean call() throws Exception { long start = System.currentTimeMillis(); try { delegate.store(request); } finally { taskTimeConsumedPerInterval.addAndGet( System.currentTimeMillis() - start); } return Boolean.TRUE; } }; scheduler.submit(methodRequest); requestSubmittedPerIterval.incrementAndGet(); } } </boolean></boolean></runnable>
AsyncRequestPersistence 类所实现的接口 RequestPersistence 定义了 Active Object 对外暴露的异步方法:store 方法。由于本案例不关心请求缓存的结果,故该方法没有返回值。其代码如清单 4 所示。
清单 4. RequestPersistence 接口源码
public interface RequestPersistence { void store(MMSDeliverRequest request); }
AsyncRequestPersistence 类的实例变量 scheduler 相当于 Active Object 模式中的 Scheduler 参与者实例。这里我们直接使用了 JDK1.5 引入的 Executor Framework 中的 ThreadPoolExecutor。在 ThreadPoolExecutor 类的实例化时,其构造器的第 5 个参数(BlockingQueue
AsyncRequestPersistence 类的实例变量 delegate 相当于 Active Object 模式中的 Servant 参与者实例。
AsyncRequestPersistence 类的 store 方法利用匿名类生成一个 java.util.concurrent.Callable 实例 methodRequest。该实例相当于 Active Object 模式中的 MethodRequest 参与者实例。利用闭包(Closure),该实例封装了对 store 方法调用的上下文信息(包括调用参数、所调用的方法对应的操作信息)。AsyncRequestPersistence 类的 store 方法通过调用 scheduler 的 submit 方法,将 methodRequest 送入 ThreadPoolExecutor 所维护的缓冲区(阻塞队列)中。确切地说,ThreadPoolExecutor 是 Scheduler 参与者的一个“近似”实现。ThreadPoolExecutor 的 submit 方法相对于 Scheduler 的 enqueue 方法,该方法用于接纳 MethodRequest 对象,以将其存入缓冲区。当 ThreadPoolExecutor 当前使用的线程数量小于其核心线程数量时,submit 方法所接收的任务会直接被新建的线程执行。当 ThreadPoolExecutor 当前使用的线程数量大于其核心线程数时,submit 方法所接收的任务才会被存入其维护的阻塞队列中。不过,ThreadPoolExecutor 的这种任务处理机制,并不妨碍我们将它用作 Scheduler 的实现。
methodRequest 的 call 方法会调用 delegate 的 store 方法来真正实现请求缓存功能。delegate 实例对应的类 DiskbasedRequestPersistence 是请求消息缓存功能的真正实现者。其代码如清单 5 所示。
清单 5. DiskbasedRequestPersistence 类的源码
public class DiskbasedRequestPersistence implements RequestPersistence { // 负责缓存文件的存储管理 private final SectionBasedDiskStorage storage = new SectionBasedDiskStorage(); private final Logger logger = Logger .getLogger(DiskbasedRequestPersistence.class); @Override public void store(MMSDeliverRequest request) { // 申请缓存文件的文件名 String[] fileNameParts = storage.apply4Filename(request); File file = new File(fileNameParts[0]); try { ObjectOutputStream objOut = new ObjectOutputStream( new FileOutputStream(file)); try { objOut.writeObject(request); } finally { objOut.close(); } } catch (FileNotFoundException e) { storage.decrementSectionFileCount(fileNameParts[1]); logger.error("Failed to store request", e); } catch (IOException e) { storage.decrementSectionFileCount(fileNameParts[1]); logger.error("Failed to store request", e); } } class SectionBasedDiskStorage { private Deque<string> sectionNames = new LinkedList<string>(); /* * Key->value: 存储子目录名 -> 子目录下缓存文件计数器 */ private Map<string atomicinteger=""> sectionFileCountMap = new HashMap<string atomicinteger="">(); private int maxFilesPerSection = 2000; private int maxSectionCount = 100; private String storageBaseDir = System.getProperty("user.dir") + "/vpn"; private final Object sectionLock = new Object(); public String[] apply4Filename(MMSDeliverRequest request) { String sectionName; int iFileCount; boolean need2RemoveSection = false; String[] fileName = new String[2]; synchronized (sectionLock) { // 获取当前的存储子目录名 sectionName = this.getSectionName(); AtomicInteger fileCount; fileCount = sectionFileCountMap.get(sectionName); iFileCount = fileCount.get(); // 当前存储子目录已满 if (iFileCount >= maxFilesPerSection) { if (sectionNames.size() >= maxSectionCount) { need2RemoveSection = true; } // 创建新的存储子目录 sectionName = this.makeNewSectionDir(); fileCount = sectionFileCountMap.get(sectionName); } iFileCount = fileCount.addAndGet(1); } fileName[0] = storageBaseDir + "/" + sectionName + "/" + new DecimalFormat("0000").format(iFileCount) + "-" + request.getTimeStamp().getTime() / 1000 + "-" + request.getExpiry() + ".rq"; fileName[1] = sectionName; if (need2RemoveSection) { // 删除最老的存储子目录 String oldestSectionName = sectionNames.removeFirst(); this.removeSection(oldestSectionName); } return fileName; } public void decrementSectionFileCount(String sectionName) { AtomicInteger fileCount = sectionFileCountMap.get(sectionName); if (null != fileCount) { fileCount.decrementAndGet(); } } private boolean removeSection(String sectionName) { boolean result = true; File dir = new File(storageBaseDir + "/" + sectionName); for (File file : dir.listFiles()) { result = result && file.delete(); } result = result && dir.delete(); return result; } private String getSectionName() { String sectionName; if (sectionNames.isEmpty()) { sectionName = this.makeNewSectionDir(); } else { sectionName = sectionNames.getLast(); } return sectionName; } private String makeNewSectionDir() { String sectionName; SimpleDateFormat sdf = new SimpleDateFormat("MMddHHmmss"); sectionName = sdf.format(new Date()); File dir = new File(storageBaseDir + "/" + sectionName); if (dir.mkdir()) { sectionNames.addLast(sectionName); sectionFileCountMap.put(sectionName, new AtomicInteger(0)); } else { throw new RuntimeException( "Cannot create section dir " + sectionName); } return sectionName; } } } </string>,></string>,></string></string>
methodRequest 的 call 方法的调用者代码是运行在 ThreadPoolExecutor 所维护的工作者线程中,这就保证了 store 方法的调用方和真正的执行方是分别运行在不同的线程中:服务器工作线程负责触发请求消息缓存,ThreadPoolExecutor 所维护的工作线程负责将请求消息序列化到磁盘文件中。
DiskbasedRequestPersistence 类的 store 方法中调用的 SectionBasedDiskStorage 类的 apply4Filename 方法包含了一些多线程同步控制代码(见清单 5)。这部分控制由于是封装在 DiskbasedRequestPersistence 的内部类中,对于该类之外的代码是不可见的。因此,AsyncRequestPersistence 的调用方代码无法知道该细节,这体现了 Active Object 模式对并发访问控制的封装。
小结
本篇介绍了 Active Object 模式的意图及架构,并以一个实际的案例展示了该模式的代码实现。下篇将对 Active Object 模式进行评价,并结合本文案例介绍实际运用 Active Object 模式时需要注意的一些事项。
感谢张龙对本文的审校。
给InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家通过新浪微博( @InfoQ )或者腾讯微博( @InfoQ )关注我们,并与我们的编辑和其他读者朋友交流。
评论