在前面的两篇文章中 [ 1 , 2 ],我们描述了 Oozie 工作流服务器,并且展示了几个工作流的示例。我们还描述了针对 Oozie 的工作流的部署和配置,以及用来启动、停止和监控 Oozie 工作流的工具。
在本文中,我们会向你展示 Oozie 的可扩展性,并说明它是如何支持我们实现自定义的、协同工作的语言扩展。
为什么需要自定义节点(Custom Node)?
正如我们在文章 [1] 中所说明的,Oozie 之所以与众不同,是因为它提供了一种“最小化”的工作流语言,其中只包含少数几种控制和动作节点。尽管其中的一种动作节点是 java 动作节点,它让我们可以从 Oozie 工作流调用任意一个带有 main 方法的 java 类,但这种方法并非总是最佳的。原因之一就在于,java 动作是在 Hadoop 簇集中作为 map-reduce 作业执行的,并且只带有唯一的 Mapper 任务。一方面,这带来了很多好处:
- 它拥有内建的可伸缩性以及对 map/reduce 框架的灾难恢复支持,这样我们就不必这些特性构建到 Oozie 中。
- 外部执行机制,这让 Oozie 引擎变得更轻量级,从而可以支持更多并发运行的过程。
另一方面,这种方法也有一些缺点:
- 它把每个 java 节点都作为 mapper 任务启动,这会导致在 Hadoop 簇集中启动新的 JVM 而产生额外的开销。
- 在外部执行 java 类导致了额外的网络传输,用于与 Oozie 服务器同步这些执行结果。
- 从 java 节点传递参数成了非常耗费资源的操作。
尽管如此,在运行时间相对较长(几分钟甚至几小时)的 map/reduce 或者 Pig 作业中,好处会大大超过负载的缺点,但是,在简单的 java 节点中(参见 [2]),我们就需要注意外部执行所导致的开销了。所以,使用自定义动作节点的原因之一,就是为了支持在 Oozie 的执行上下文中直接执行轻量级的 java 类 [1] 。
使用自定义动作的另一个原因是为了提高工作流的语义和可读性。由于 Oozie 是一种支持基于 Hadoop 处理组件的工作流引擎,所以它的语法完全是以 Hadoop 执行为中心的——Hadoop 文件系统、map/reduce、Pig 等等。这种语法能够很好地符合 Hadoop 开发者的习惯,但是并没有涉及到太多关于给定动作的功能信息。我们可以为动作本身制定与业务相关的命名转换规则,但这只是特别用来解决问题的——对动作的命名只反映了给定过程的语法,既不是总体上的主题领域,也不能解决动作参数的问题,那些问题仍然只能由开发者来解决。
幸运的是,Oozie 支持非常棒的扩展机制——自定义动作节点 [3],它让我们可以很容易地解决这两个问题。自定义的动作节点让我们可以使用附加的动作(动词)来扩展 Oozie 的语言。Oozie 的动作节点可以是同步的,也可以是异步的。
- 同步节点——它在 Oozie 内部执行,在继续执行之前会等待这些节点完成动作。这些节点是为轻量级的任务所用的,像自定义计算,文件系统的移动、创建目录、删除等等。
- 异步节点——它是由 Oozie 启动的,但是在 Oozie 引擎的外部执行,它会监控正在执行的动作,直到完成。这是通过动作的回调或者 Oozie 针对动作状态的 polling 操作完成的。
实现 Oozie 自定义动作处理程序
在这个例子中,我们会对独立的邮件程序进行转换,并展现到自定义的 email 动作中 [2](代码 1)。
package com.navteq.assetmgmt.oozie.custom; import java.util.Properties; import java.util.StringTokenizer; import javax.mail.Message; import javax.mail.Session; import javax.mail.Transport; import javax.mail.internet.InternetAddress; import javax.mail.internet.MimeMessage; import org.apache.oozie.ErrorCode; import org.apache.oozie.action.ActionExecutor; import org.apache.oozie.action.ActionExecutorException; import org.apache.oozie.action.ActionExecutorException.ErrorType; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.util.XmlUtils; import org.jdom.Element; import org.jdom.Namespace; public class EmailActionExecutor extends ActionExecutor { private static final String NODENAME = "eMail"; private static final String SUCCEEDED = "OK"; private static final String FAILED = "FAIL"; private static final String KILLED = "KILLED"; private static final String DEFAULMAILSERVER = "imailchi.navtech.com"; private static final String EMAILSERVER = "emailServer"; private static final String SUBJECT = "emailSubject"; private static final String MESSAGE = "emailBody"; private static final String FROM = "emailFrom"; private static final String TO = "emailTo"; public EmailActionExecutor() { super(NODENAME); } @Override public void check(Context context, WorkflowAction action) throws ActionExecutorException { // Should not be called for synch operation throw new UnsupportedOperationException(); } @Override public void end(Context context, WorkflowAction action)throws ActionExecutorException { String externalStatus = action.getExternalStatus(); WorkflowAction.Status status = externalStatus.equals(SUCCEEDED) ? WorkflowAction.Status.OK : WorkflowAction.Status.ERROR; context.setEndData(status, getActionSignal(status)); } @Override public boolean isCompleted(String arg0) { return true; } @Override public void kill(Context context, WorkflowAction action) throws ActionExecutorException { context.setExternalStatus(KILLED); context.setExecutionData(KILLED, null); } @Override public void start(Context context, WorkflowAction action) throws ActionExecutorException { // Get parameters from Node configuration try{ Element actionXml = XmlUtils.parseXml(action.getConf()); Namespace ns = Namespace.getNamespace("uri:custom:email-action:0.1"); String server = actionXml.getChildTextTrim(EMAILSERVER, ns); String subject = actionXml.getChildTextTrim(SUBJECT, ns); String message = actionXml.getChildTextTrim(MESSAGE, ns); String from = actionXml.getChildTextTrim(FROM, ns); String to = actionXml.getChildTextTrim(TO, ns); // Check if all parameters are there if(server == null) server = DEFAULMAILSERVER; if((message == null) || (from == null) || (to == null)) throw new ActionExecutorException(ErrorType.FAILED, ErrorCode.E0000.toString(), "Not all parameters are defined"); // Execute action synchronously SendMail(server, subject, message, from, to); context.setExecutionData(SUCCEEDED, null); } catch(Exception e){ context.setExecutionData(FAILED, null); throw new ActionExecutorException(ErrorType.FAILED, ErrorCode.E0000.toString(), e.getMessage()); } } // Sending an email public void SendMail(String server, String subject, String message, String from, String to) throws Exception { // create some properties and get the default Session Properties props = new Properties(); props.setProperty("mail.smtp.host", server); Session session = Session.getDefaultInstance(props, null); // create a message Message msg = new MimeMessage(session); // set the from and to address InternetAddress addressFrom = new InternetAddress(from); msg.setFrom(addressFrom); // To is a comma separated list StringTokenizer st = new StringTokenizer(to, ","); String [] recipients = new String[st.countTokens()]; int rc = 0; while(st.hasMoreTokens()) recipients[rc++] = st.nextToken(); InternetAddress[] addressTo = new InternetAddress[recipients.length]; for (int i = 0; i < recipients.length; i++){ addressTo[i] = new InternetAddress(recipients[i]); } msg.setRecipients(Message.RecipientType.TO, addressTo); // Setting the Subject and Content Type msg.setSubject(subject); msg.setContent(message, "text/plain"); Transport.send(msg); } }
代码 1: Email 自定义动作
这个实现对 ActionExecutor [2] 类(由 Oozie 提供)进行了扩展,并重写了一些必要的方法。因为邮件的发送过程是一种非常快速的操作,所以我们决定将其实现为同步的动作处理程序,那意味着它会在 Oozie 的执行上下文中执行。
我们的实现(代码 1)遵循了 Oozie 文档并实现了所有必需的方法:
- 对于所有自定义动作处理程序都需要没有参数的构造函数。这个构造函数会注册动作处理程序的名称(使用动作名称来调用父类),我们会在工作流 XML 中使用它。
- 我们可以使用 InitActionType [3] 方法来注册执行动作时可能发生的异常,以及它们的类型和错误信息,并为执行程序本身执行初始化操作。
- Start 方法是用来启动动作的执行操作的。因为我们实现的是同步动作,所以整个动作都会在此执行。这个方法是由 Oozie 调用的,它有两个参数:Context 和 WorkflowAction。Context 参数提供了对 Oozie 工作流执行上下文的访问,其中主要包含了工作流的变量,并提供了对其进行操作的非常简单的 API(set、get) [4] 。而 WorkflowAction 则提供了 Oozie 对当前动作的定义。
- 还有 Check 方法,Oozie 会使用它来检查动作的状态。它不能作为同步动作来调用。
- Kill 方法,可以用来中断运行的作业或者动作。
- End 方法,可以用于所有清理动作,或者用于可能在完成动作之后所要做的处理。它还需要设置执行的结果。
部署并使用 Oozie 自定义动作处理程序
实现了自定义的动作执行方式之后,我们需要为新的 email 动作定义 XML 模式 [5] (代码 2)。
<?<span color="#008080">xml</span> <span color="#800080">version</span>=<i><span color="#0000ff">"1.0"</span></i> <span color="#800080">encoding</span>=<i><span color="#0000ff">"UTF-8"</span></i>?> <<span color="#008080">xs:schema</span> <span color="#800080">xmlns:xs</span>=<span color="#0000ff"><i>"http://www.w3.org/2001/XMLSchema"</i> </span> <span color="#800080">xmlns:email</span>=<i><span color="#0000ff">"uri:custom:email-action:0.1"</span></i> <span color="#800080">elementFormDefault</span>=<i><span color="#0000ff">"qualified"</span></i> <span color="#800080">targetNamespace</span>=<i><span color="#0000ff">"uri:custom:email-action:0.1"</span></i>> <<span color="#008080">xs:complexType</span> <span color="#800080">name</span>=<i><span color="#0000ff">"EMAIL"</span></i>> <<span color="#008080">xs:sequence</span>> <<span color="#008080">xs:element</span> <span color="#800080">name</span>=<i><span color="#0000ff">"emailServer"</span></i> <span color="#800080">type</span>=<i><span color="#0000ff">"xs:string"</span></i> <span color="#800080">minOccurs</span>=<i><span color="#0000ff">"0"</span></i> <span color="#800080">maxOccurs</span>=<i><span color="#0000ff">"1"</span></i> /> <<span color="#008080">xs:element</span> <span color="#800080">name</span>=<i><span color="#0000ff">"emailSubject"</span></i> <span color="#800080">type</span>=<i><span color="#0000ff">"xs:string"</span></i> /> <<span color="#008080">xs:element</span> <span color="#800080">name</span>=<i><span color="#0000ff">"emailFrom"</span></i> <span color="#800080">type</span>=<i><span color="#0000ff">"xs:string"</span></i> /> <<span color="#008080">xs:element</span> <span color="#800080">name</span>=<i><span color="#0000ff">"emailTo"</span></i> <span color="#800080">type</span>=<i><span color="#0000ff">"xs:string"</span></i> /> <<span color="#008080">xs:element</span> <span color="#800080">name</span>=<i><span color="#0000ff">"emailBody"</span></i> <span color="#800080">type</span>=<i><span color="#0000ff">"xs:string"</span></i> /> </<span color="#008080">xs:sequence</span>> </<span color="#008080">xs:complexType</span>> <p><<span color="#008080">xs:element</span> <span color="#800080">name</span>=<i><span color="#0000ff">"eMail"</span></i> <span color="#800080">type</span>=<i><span color="#0000ff">"email:EMAIL"</span></i>></<span color="#008080">xs:element</span>><br></br></<span color="#008080">xs:schema</span>></p>
代码 2: 为 email 组件所用的 XML schema
自定义动作节点和 XML schema 文件都需要打包在单独的 jar 文件中,比方说 emailAction.Jar。我们可以使用 Oozie 的 oozie-setup.sh 脚本执行下面的命令,从而把这个(以及其他所有)jar 文件添加到 Oozie 的 war 文件中。
$ bin/oozie-setup.sh -jars emailAction.jar:mail.jar (See Adding Jars to Oozie)
代码 3: 部署命令
向 Oozie 添加 Jar 文件
你要知道,Cloudera 推荐的 oozie-setup.sh 命令行会重新构建你的 war 文件,并且, 如果你使用网页来监控作业,那么就会丢失 java 的脚本扩展。在测试方面,我们难以同时包含 -extjs 和 -jars 选项。作为权宜之计,我们会把 jar 文件复制到 ${CATALINA_BASE}/webapps/oozie/WEB-INF/lib 中,其中 ${CATALINA_BASE}代表的是 /var/lib/oozie/oozie-server。请注意,这里存在一定的风险,如果其他人重新构建了 war 文件,那么你就会丢失这些扩展,并且他们以后需要手动添加。对于测试来说,我们建议复制 jar 文件,然而,对于生产环境,我们建议把 jar 添加到 war 文件中。
现在我们需要把关于自定义执行器的信息注册到 Oozie 的运行时中。这是通过扩展 oozie-site.xml 完成的 [6] 。我们可以通过在 Oozie 配置文件 oozie-site.xml 中添加或者修改“oozie.service.ActionService.executor.ext.classes” [7] 来注册自定义动作本身(代码 4)。
…………………………………… <property> <name>oozie.service.ActionService.executor.ext.classes</name> <value>com.navteq.assetmgmt.oozie.custom. EmailActionExecutor </value> </property> ……………………………………
代码 4: 自定义执行配置
为新动作(代码 2)所用的 XML schema 应该添加到 oozie-site.xml 中,位于属性“oozie.service.WorkflowSchemaService.ext.schemas” [8] 之下(代码 5)。
……………………………………… <property> <name>oozie.service.SchemaService.wf.ext.schemas</name> <value> emailAction.xsd</value> </property> …………………………………
代码 5: 自定义模式配置
最后,Tomcat 启动之后,我们就可以在工作流中使用自定义的动作节点了。
为了测试我们的实现,我们创建了简单的工作流(代码 6),它会使用我们的执行器来发送 email。
<!-- Copyright (c) 2011 NAVTEQ! Inc. All rights reserved. Test email <u>Oozie</u> Script --> <span color="#008080"><workflow-app <span color="#800080">xmlns</span><span color="#000000">=</span><i><span color="#0000ff">'uri:oozie:workflow:0.1'</span></i> <span color="#800080">name</span><span color="#000000">=</span><i><span color="#0000ff">'emailTester'</span></i>></span> <span color="#008080"><start <span color="#800080">to</span><span color="#000000">=</span><i><span color="#0000ff">'simpleEmail'</span></i>/></span> <span color="#008080"><action <span color="#800080">name</span><span color="#000000">=</span><i><span color="#0000ff">'simpleEmail'</span></i>></span> <span color="#008080"><eMail <span color="#000000">xlmns</span>=“ uri:custom:email-action:0.1”></span> <span color="#008080"><emailSubject><span color="#000000">test</span></emailSubject></span> <span color="#008080"><emailFrom><span color="#000000">mike.segel<u>@<mycompany>.com</u></span></emailFrom></span> <span color="#008080"><emailTo><span color="#000000">boris.lublinsky<u>@<mycompany>.com</u></span></emailTo></span> <span color="#008080"><emailMessage><span color="#000000">This is a test message, if you can see this, Mikey did something right! :)</span></emailMessage></span> <span color="#008080"></eMail></span> <span color="#008080"><error <span color="#800080">to</span><span color="#000000">=</span><i><span color="#0000ff">"fail"</span></i>/></span> <span color="#008080"><ok <span color="#800080">to</span><span color="#000000">=</span><i><span color="#0000ff">"end"</span></i>/></span> <span color="#008080"></action></span> <span color="#008080"><kill <span color="#800080">name</span><span color="#000000">=</span><i><span color="#0000ff">"fail"</span></i>></span> <span color="#008080"><message><span color="#000000"><u>Workflow</u> failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</span></message></span> <span color="#008080"></kill></span> <span color="#008080"><end <span color="#800080">name</span><span color="#000000">=</span><i><span color="#0000ff">'end'</span></i>/></span> <span color="#008080"></workflow-app></span>
代码 6: 使用 email 自定义动作执行器的简单工作流
结论
在这篇文章中,我们展示了如何通过创建自定义的动作执行器来扩展 Oozie。这样做让我们可以定义和实现部门或者企业专用的 Oozie 语言(领域专用语言),其中具有部门或者企业的功能。这样的领域专用语言能够简化特定部门或者企业的构建过程,并提高代码的可读性。
尽管 Oozie 相对还不够成熟,但是它已经为处理包含多个 map/reduce 作业、并且能够向总体的业务过程添加非 map-reduce 作业的过程提供了基本的框架。随着越来越多的用户使用 Oozie 并提供反馈,我们相信它有足够的潜力,可以成为 Hadoop 环境中强大的集成部分。
对于 Oozie 来说,还有很多我们没有在这三篇文章中包含的内容。我们只是期望它们可以引起你对 Oozie 工作流引擎的足够兴趣,并且能够成为进一步研究 Oozie 的不错的起点。
参考信息
- Boris Lublinsky, Mike Segel. Introduction to Oozie.
- Boris Lublinsky, Mike Segel. Oozie by Example
- Oozie custom Action Nodes
- Oozie source code
关于作者
Boris Lublinsky是 NAVTEQ 公司的首席架构师,在这家公司中他的工作是为大型数据管理和处理、SOA 以及实现各种 NAVTEQ 的项目定义架构的愿景。他还是 InfoQ 的 SOA 编辑,以及 OASIS 的 SOA RA 工作组的参与者。Boris 是一位作者,还经常发表演讲,他最新的一本书是《Applied SOA》。
Michael Segel在过去二十多年间一直与客户写作,识别并解决他们的业务问题。Michael 已经作为多种角色、在多个行业中工作过。他是一位独立顾问,总是期望能够解决所有有挑战的问题。Michael 拥有俄亥俄州立大学的软件工程学位。
[1] 这种类的例子可能是各种计数器操作、简单的计算等等。
[2] 所有 Oozie 动作执行器都是 Oozie 分发程序的一部分,它们都是通过扩展这个类实现的。
[3] 在我们的实现中,我们使用的是默认的实现,这样就是为什么没有在代码中展现的原因。你可以查看一下 Oozie 的源代码 [4],就可以知道在现存的 Oozie 动作处理程序中是如何实现这个方法的。
[4] 配置自定义执行器有两种方式——工作流变量和 / 或动作配置。在我们的例子中展示的是后者,但是在实际情况中,实际上总是二者的组合。
[5] 确保不仅要定义复杂的类型,还要对元素进行定义。那是 Oozie 所期望的。
[6] 通常在 Oozie 的分发包中叫做 oozie-default.xml。
[7] 对于多个执行器,类名应该以逗号分隔。
[8] 对于多次执行,我们可以为多种模式使用逗号分隔的列表。
查看英文原文: Extending Oozie
给 InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家加入到 InfoQ 中文站用户讨论组中与我们的编辑和其他读者朋友交流。
评论