对时间跨距很长(小时,天,周)的活动进行编排是一个常见的设计问题。尽管在技术上 BMP 引擎就是专门为调用长运行的活动而设计的,然而由于它们在注入和抽取流程状态方面的能力限制,长运行的活动经常被实现成独立流程 1 ,所以,回调的实现以及如何通知 BPM 服务器总是一个不小的设计问题。在这篇文章中我们将展示一种使用 JBoss/jBPM 解决这类问题的方法。
整体实现方案
整体实现方案(如图 1 所示)是非常直截了当的。BPM 服务器发起一个长运行活动的执行,并“等待”它的结束。
图 1 基本实施架构
任何使用过基于 WS-BPEL 的 BPM 实现的人可能立刻会说——这个问题很简单——只需要使用 send/receive 活动在业务流程和活动之间进行同步,问题就可以解决了。然而,jBPM 提供的标准节点中不包含 receive 节点。不过它提供了丰富的客户端 API,可以通过它们实现所需的功能 2 。这些 API 为我们的实现提供了两个重要能力——设置流程变量和“强制”流程继续执行。
将内部 API 向外暴露的一种通用方法是使用中间件。比如,jBPM 和 ESB 的集成就是通过 ESB 的消息机制将一些现有的 jBPM API 有效地暴露成 ESB 服务。另一种常用的方法是使用 Web/REST 3 。我们决定使用 JBoss JAX-RS 的实现——RestEasy 4 将所需的客户端 API 暴露成 REST 服务。
另一个困难是很多现有的长运行的活动总是被实现成自包含的流程,它们只提供很少的 API 甚至没有。因此,并不总是能用监听器(listener)或者回调(callback)的实现方式来编排它们。我们采用一种更为通用的方式:创建集成脚本(integration script),由它负责调用长活动以及客户端的完成通知(completion notifier),它还与 REST 服务器交互(如图 2 所示)。由于引入了集成脚本,我们可以在不做任何修改的情况下就能够使用到现有实现(用于编排)的所有功能(参数传递,“回调”处理器)。
图 2 整体实现架构
基于整体实现架构(图 2),必须要开发的组件有以下几个:jBPM REST 服务器,完成通知器(Completion notifier),集成脚本和实际流程。下面将对它们进行分别介绍。
jBPM REST 服务器
正如上文提到的,jBPM REST 服务器的实现有好几款,但它们中的大多数都偏向对引擎的查询操作,并且常用于创建 GUI 或简单的 jBPM 客户端。
这里我们需要一个这样的 REST 服务器,它能够设置流程变量,还可以向 jBPM 服务器发出让其继续执行的信号。为了更好地理解我们将要实现的的功能,首先请看一下 jBPM 执行对象的结构图(图 3)。
一个 jBPM 引擎可能会同时部署一个或多个流程定义;而且一个流程定义可能有多个实例同时运行 5 ;一个流程实现在执行过程中可能使用一个或多个令牌(token)。
图 3 jBPM 执行对象的结构图
对于几乎任何业务流程引擎的实现,流程定义和流程实例的概念都是通用的,而令牌(token)的概念可能是 jBPM 独有的。令牌是 jBPM 对执行线程的抽象。引擎实现的本身不支持线程,而是使用令牌将一个执行过程在逻辑上划分成几段。每一个流程实例都是从一个令牌开始的,这个令牌叫根令牌(root token),并且在需要时启动其他令牌。例如,jPDL 的 Fork 节点启动多个执行令牌,而当执行通过对应的 Join 节点后这些令牌才结束(图 4)。
图 4 jBPM 的令牌管理
令牌在 jBPM 中不仅用于划分执行,还用于流程变量。变量在 jBPM 中不直接与流程实例关联,而是与令牌关联的。事实上流程实例的变量存在于它的根令牌中。父令牌的变量在子令牌中可以被访问,反之不成立;变量在兄弟令牌之间互相不可见。
基于此我们的 REST 服务器实现提供了以下方法(列表 1) 6
@GET @Path(<span color="#0000ff">"instance/{id}/variables/xml"</span>) @Produces(<span color="#0000ff">"application/xml"</span>) <span color="#800080"><b>public</b></span> ParameterRefWrapper getInstanceVariablesXML( @PathParam(<span color="#0000ff">"id"</span>) String instanceId ) …………………… @GET @Path(<span color="#0000ff">"instance/{id}/variables/json"/</span>) @Produces("application/json") @Mapped <span color="#800080"><b>public</b></span> ParameterRefWrapper getInstanceVariablesJSON( @PathParam(<span color="#0000ff">"id"</span>) String instanceId ) …………………… @POST @Path(<span color="#0000ff">"instance/{id}/parameters"</span>) <span color="#800080"><b>public</b></span> Response setInstanceParameters( @PathParam(<span color="#0000ff">"id"</span>) String id, @QueryParam("parameter") List<parameterref> params)<br></br> ……………………<br></br> @POST<br></br> @Path(<span color="#0000ff">"instance/{id}/signal"</span>)<br></br><span color="#800080"><b>public</b></span> Response signalProcess(<br></br> @PathParam(<span color="#0000ff">"id"</span>) String id<br></br> )<br></br> ……………………<br></br> @GET<br></br> @Path(<span color="#0000ff">"token/{id}/variables/xml"</span>)<br></br> @Produces(<span color="#0000ff">"application/xml"</span>)<br></br><span color="#800080"><b>public</b></span> ParameterRefWrapper getTokenParametersXML(<br></br> @PathParam(<span color="#0000ff">"id"</span>)String tokenId<br></br> )<br></br> ……………………<br></br> @GET<br></br> @Path(<span color="#0000ff">"token/{id}/variables/json"</span>)<br></br> @Produces("application/json")<br></br> @Mapped<br></br><span color="#800080"><b>public</b></span> ParameterRefWrapper getTokenParametersJSON(<br></br> @PathParam(<span color="#0000ff">"id"</span>) String tokenId<br></br> )<br></br> ……………………<br></br> @POST<br></br> @Path(<span color="#0000ff">"token/{id}/parameters"</span>)<br></br><span color="#800080"><b>public</b></span> Response setTokenParameters(<br></br> @PathParam(<span color="#0000ff">"id"</span>) String id,<br></br> @QueryParam("parameter") List<span color="#0000ff"><parameterref></parameterref></span> params)<br></br> ……………………<br></br> @POST<br></br> @Path(<span color="#0000ff">"token/{id}/signal"</span>)<br></br><span color="#800080"><b>public</b></span> Response signalToken(<br></br> @PathParam(<span color="#0000ff">"id"</span>)String id<br></br> )<br></br> ……………………</parameterref>
列表 1 jBPM REST API
业务流程实现
图 5 展示了一个最简单的 jBPM 流程调用长运行活动的场景。
图 5 简单 jBPM 流程
该流程包含 4 个步骤:
- 开始节点
- 一个负责调用集成脚本的节点(上图中的 Starting pipeline 节点)
- 一个状态节点(state,上图中的 Pipeline completed 节点),它是一个同步点,回调处理器调用它设置(长运行活动的)执行结果并继续流程的执行
- 结束节点
这个流程最重要的部分是 Starting pipeline 节点使用的 action 处理器(译者注:图中齿轮轮形状的图标所指的)。action 的一种实现是基于 ProcessBuilder 的 Java 类,流程通过它调用外部流程,本文示例中的外部流程是上文中提到的集成脚本。使用 ProcessBuilder 类调用外部流程有两种方式:同步和异步。同步调用也有两种实现方式:1)显式,调用 process.waitFor() 方法;2)隐式,读取流程执行结果。这本文的示例中,当外部流程执行时,在内存中(译者注:流程实例所在的内存)保存着一个调用者线程。如果上述两种方式都不适用的话,则用异步调用。异步调用时,调用者可以在被调用的长运行的流程完成之前结束 7 。我们使用异步方式来调用长运行的流程。
如上文所述,REST 服务器需要通过流程实例 ID 或令牌 ID 来确定与哪个流程实例或令牌请求连接。流程实例 ID 和令牌 ID 都可以被传到集成脚本,而集成脚本可以通过这些参数来调用回调处理器。
由于我们基于 jBPM 服务器的数据库的内容实现了 REST 服务器,从一个 action 处理器直接调用集成脚本可能导致潜在的竞争条件(race condition)。如果在 Starting pipeline 节点的结果提交到数据库之前,回调处理器(后文会对此详述)就去连接 REST 服务器,则 REST 服务器的调用结果可能会越出同步点。为了避免这种潜在的竞争条件,我们引入了一个简单的线程池。本文的示例中,action 处理器不直接调用集成脚本,而是创建一个 runnable 对象,将这个对象提交给线程池,在调用集成脚本之前,这个 runnable 对象休眠一小会儿,等待 jBPM 服务器将 Starting pipeline 节点的执行结果提交给数据库。
一旦 Starting pipeline 节点完成其执行,流程状态转变成 Pipeline completed 状态,在这里等待回调处理器的通知——长运行的活动已经完成,流程可以继续执行。
回调处理器
我们基于 jBPM 服务器用来设置参数以及流程继续的 REST API 的用法实现回调(callback)类 8 。 基本实现可以通过一小段代码片段完成,如列表 2 所示
URLString = baseURL + <span color="#0000ff">"token/"</span> + tokenId + <span color="#0000ff">"/parameters?parameter="</span> + …………….. postURL = <span color="#800080"><b>new</b></span> URL(URLString); connection = (HttpURLConnection) postURL.openConnection(); connection.setRequestMethod(<span color="#0000ff">"POST"</span>); connection.disconnect();
列表 2 向 REST 服务器发送 POST 请求
在该实现中,HTTP POST 的实现与普通实现有一点点不同。我们没有定义编码的(encoded)URL 作为内容向输出流写入参数,而使用一个带有真实查询串的 URL。之所以可以这么做是因为 RestEasy 的 Servlet 实现了一个服务,而不是 doPost 和 doGet 方法。因此,所有的请求(POST/GET /PUT) 都被发送到这个方法,然后由 RestEasy 的实现去处理并正确分析请求 URL。
错误处理
如果不出意外,图 5 所示的简单流程可以正确运行。然而,在真实世界里,错误总是会发生,因此这个简单实现不得不在错误处理方面有所加强。
对长运行活动的调用可能会产生两种额外的错误类型:
- 异常的脚本——脚本执行的异常情况会阻碍回调处理器的执行,这样,调用长运行活动的流程就会被阻塞。
- 长运行活动执行过程中的错误。因为这些错误是在流程之外发生的,所以它们必须要显式地向流程报告结果。
异常的脚本
解决异常的脚本问题的有效机制是超时。jBPM 支持超时处理,这样,当某些执行的时间超出了预订的时间间隔时,可以改变流程正常的执行过程。计时器(timer)的执行可以调用一个合适的纠正动作(action),也可以将流程状态转向一个指定的节点。我们对异常脚本进行错误处理的实现是基于计时器的状态转换功能。通常的做法是转向一个人工任务,这样对于任何特定的情况都可以通过人的介入去执行相应的纠正动作(action)。这种活动的最简单例子(图 6 所示)就是一个节点,它只是将执行迁移到 Starting pipeline 节点。这里的 Timing out 节点是被配置在 Running pipeline 这个节点上的计时器调用的,当计时器触发时,它通过 timeOut 迁移(transition)调用 timing out 节点。
图 6 带有超时机制的 jBPM 流程
至此,图 6 中所实现的流程实现还需要一点修改就能满足我们的基本实现。首先,timing out 节点可以决定继续等待,或则采取一个纠正动作,不过这两种情况的最终结果都是控制权转移到 Starting pipeline 节点上;另外,这个 timing out 节点还必须要知晓脚本是否正在运行(继续等待动作)还是不在运行(启动脚本)。该逻辑的最简单实现方式是引入流程变量(见列表 3):
String state = (String)executionContext.getVariable(processName);
<span color="#800080"><b>if</b></span>(state != <span color="#800080"><b>null</b></span>){ <span color="#800080"><b>if</b></span>("completed".equals(state)){ executionContext.leaveNode(<span color="#0000ff">completeTransition</span>); <span color="#800080"><b>return</b></span>; } <span color="#800080"><b>else</b></span>{ executionContext.leaveNode(<span color="#0000ff">waitTransition</span>); <span color="#800080"><b>return</b></span>; } } executionContext.setVariable(<span color="#0000ff">processName</span>, "started"); …………
列表 3 执行路径选择
其次,在本例中,通过流程实例 / 令牌的的“盲”信令(signaling)来完成执行的做法是不成立的。原因有以下两个方面:
- 因为本例中的 Running Pipeline 状态有多个迁移(transition),REST API 应该具备选择特定的迁移(transition)作为流程或令牌的信令结果的能力。
- 当回调处理器被调用时,图 6 中的例子无法保证流程正处于 Running Pipeline 状态。譬如,它可能正处于 Timing out 节点。这就意味着 REST API 应该具备只针对特定流程状态发出信令的能力。
我们可以对列表 1 中的 REST API 做些修改让它支持流程实例或令牌信令的上面所描述的两个参数(见列表 4)
@POST @Path(<span color="#0000ff">"instance/{id}/signal"</span>) <span color="#800080"><b>public</b></span> Response signalProcess( @PathParam(<span color="#0000ff">"id"</span>) String id, @QueryParam(<span color="#0000ff">"transition"</span>) String transition. @QueryParam(<span color="#0000ff">"state"</span>) String state ) …………………… @POST @Path("token/{id}/signal") <span color="#800080"><b>public</b></span> Response signalToken( @PathParam(<span color="#0000ff">"id"</span>)String id, @QueryParam(<span color="#0000ff">"transition"</span>) String transition, @QueryParam(<span color="#0000ff">"state"</span>) String state ) ……………………
列表 4 扩展的 REST API
由于可以使用 jBPM 的 API 获得特定流程实例的当前节点,并根据要求触发相应的迁移,因此扩展实现是相当直截了当的。
脚本执行结果
支持脚本执行结果的实现也是相当直观的。通过简单地再引入一些流程变量就可以实现,这些变量可以通过回调处理器对其进行设置。
流程组件化
一个非常重要的例子是长运行活动本身就是一个 jBMP 流程。jPDL 的目前版本(jPDL-3)没有显式提供多流程之间协作的支持。所以,流程设计者经常把所需的功能都实现到一个大流程块中。这样的做法和创建很大的 Java 类一样存在一些弊端,如可读性和管理上的问题,以及重用的限制等。将流程分解成多个流程 9 ,并在运行时协作的做法在一定程度上可以减轻上述弊端。
JBoss SOA 平台所提供的 jBPM/ESB 运行多流程之间的协作 10 ,它通过将这些流程包装成 JBoss ESB 服务,并使用 jBMP ESB 节点去调用它们。尽管这种方式没有问题,但需要在整体方案中引入 JBoss ESB。如果 ESB 仅仅用于流程调用协作的话,理由难免有点牵强。一个更轻量的做法是通过程序的方式(使用一个流程节点)启动从属流程,然后使用 jBPM REST 服务器和回调处理器(上文描述的)实现主从流程间的协作。
总结
JBoss jBPM 的简单性和扩展性使得它在实现 jBPM 本身不具备的附加功能时非常简单,这在很大程度上开拓了基于 jBPM 的解决方案的应用范围。
致谢
非常感谢 NAVTEQ 的同事们,特别是 Catalin Capota,他参与了实现方案的讨论和原型设计。
查看英文原文: Orchestrating Long Running Activities with JBoss / JBPM 。
感谢胡键对本文的审校。
给InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家加入到 InfoQ 中文站用户讨论组中与我们的编辑和其他读者朋友交流。
评论