在前一篇文章《 Oozie 简介》中,我们已经描述了 Oozie 工作流服务器,并且展示了一个非常简单的工作流示例。我们还描述了针对 Oozie 的工作流的部署和配置,以及用来启动、停止和监控 Oozie 工作流的工具。
在本文中,我们会描述一个更加复杂的例子,通过它我们可以讨论更多 Oozie 特性,并演示如何来使用它们。
定义过程
我们在此描述的工作流会实现汽车 GPS 探测数据的获取过程。我们每个小时都会以文件的形式把探测数据传递到指定的 HDFS 目录中 [1] ,其中包含有这个小时之内的所有探测数据。探测数据的获取是每天针对一天内所有的 24 个文件完成的。如果文件的数量是 24,那么获取过程就会启动。否则:
- 当天什么都不做
- 对前一天——最多到 7 天,发送剩下的内容到探测数据提供程序
- 如果目录的存在时间已达到 7 天,那么就获取所有可用的探测数据文件。
过程的总体实现请见图 1
(点击可以查看大图)。
图1: 过程图
在此,主流程(数据获取流程)首先会为今天以及之前的六天计算出目录的名称,然后启动(fork)七个目录的子过程(子流程)。待所有子过程的状态都变成终止之后,join 步骤就会把控制权交给end 状态。
子过程启动时,首先会获得关于目录的信息——它的日期以及文件数量。基于这条信息,它会决定是获取数据还是把数据归档,或者发送剩下的邮件,或者不做任何工作。
Directory 子过程实现
以下代码负责实现的是 directory 子过程(代码 1)。
<workflow-app xmlns='uri:oozie:workflow:0.1' name='processDir'> <start to='getDirInfo' /> <!-- STEP ONE --> <action name='getDirInfo'> <!--writes 2 properties: dir.num-files: returns -1 if dir doesn't exist, otherwise returns # of files in dir dir.age: returns -1 if dir doesn't exist, otherwise returns age of dir in days --> <java> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <main-class>com.navteq.oozie.GetDirInfo</main-class> <arg>${inputDir}</arg> <capture-output /> </java> <ok to="makeIngestDecision" /> <error to="fail" /> </action> <!-- STEP TWO --> <decision name="makeIngestDecision"> <switch> <!-- empty or doesn't exist --> <case to="end"> ${wf:actionData('getDirInfo')['dir.num-files'] lt 0 || (wf:actionData('getDirInfo')['dir.age'] lt 1 and wf:actionData('getDirInfo')['dir.num-files'] lt 24)} </case> <!-- # of files >= 24 --> <case to="ingest"> ${wf:actionData('getDirInfo')['dir.num-files'] gt 23 || wf:actionData('getDirInfo')['dir.age'] gt 6} </case> <default to="sendEmail"/> </switch> </decision> <!--EMAIL--> <action name="sendEmail"> <java> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <main-class>com.navteq.oozie.StandaloneMailer</main-class> <arg>probedata2@navteq.com</arg> <arg>gregory.titievsky@navteq.com</arg> <arg>${inputDir}</arg> <arg>${wf:actionData('getDirInfo')['dir.num-files']}</arg> <arg>${wf:actionData('getDirInfo')['dir.age']}</arg> </java> <ok to="end" /> <error to="fail" /> </action> <!--INGESTION --> <action name="ingest"> <java> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <prepare> <delete path="${outputDir}" /> </prepare> <configuration> <property> <name>mapred.reduce.tasks</name> <value>300</value> </property> </configuration> <main-class>com.navteq.probedata.drivers.ProbeIngest</main-class> <arg>-conf</arg> <arg>action.xml</arg> <arg>${inputDir}</arg> <arg>${outputDir}</arg> </java> <ok to=" archive-data" /> <error to="ingest-fail" /> </action> <!—Archive Data --> <action name="archive-data"> <fs> <move source='${inputDir}' target='/probe/backup/${dirName}' /> <delete path = '${inputDir}' /> </fs> <ok to="end" /> <error to="ingest-fail" /> </action> <kill name="ingest-fail"> <message>Ingestion failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> </kill> <kill name="fail"> <message>Java failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> </kill> <end name='end' /> </workflow-app>
代码 1: Directory 子过程
这个子过程的 start 节点会触发自定义的 java 节点,这个节点会获得目录信息(代码 2)。
package com.navteq.oozie; import java.io.File; import java.io.FileOutputStream; import java.io.OutputStream; import java.util.GregorianCalendar; import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class GetDirInfo { private static final String OOZIE_ACTION_OUTPUT_PROPERTIES = "oozie.action.output.properties"; public static void main(String[] args) throws Exception { String dirPath = args[0]; String propKey0 = "dir.num-files"; String propVal0 = "-1"; String propKey1 = "dir.age"; String propVal1 = "-1"; System.out.println("Directory path: '"+dirPath+"'"); Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); Path hadoopDir = new Path(dirPath); if (fs.exists(hadoopDir)){ FileStatus[] files = FileSystem.get(conf).listStatus(hadoopDir); int numFilesInDir = files.length; propVal0 = Integer.toString(numFilesInDir); long timePassed, daysPassedLong; int daysPassed; String dirName = hadoopDir.getName(); String[] dirNameArray = dirName.split("-"); if (dirNameArray.length == 3) { int year = Integer.valueOf(dirNameArray[0]); int month = Integer.valueOf(dirNameArray[1]) - 1; //months are 0 based int date = Integer.valueOf(dirNameArray[2]); GregorianCalendar dirCreationDate = new GregorianCalendar(year, month, date); timePassed = (new GregorianCalendar()).getTimeInMillis() - dirCreationDate.getTimeInMillis(); daysPassed = (int) = timePassed / 1000 / 60 / 60 / 24;; propVal1 = Integer.toString(daysPassed); } } String oozieProp = System.getProperty(OOZIE_ACTION_OUTPUT_PROPERTIES); if (oozieProp != null) { File propFile = new File(oozieProp); Properties props = new Properties(); props.setProperty(propKey0, propVal0); props.setProperty(propKey1, propVal1); OutputStream os = new FileOutputStream(propFile); props.store(os, ""); os.close(); } else throw new RuntimeException(OOZIE_ACTION_OUTPUT_PROPERTIES + " System property not defined"); } }
代码 2: 获得目录信息的节点
这个类会获得目录名作为输入的参数,并首先检查该目录是否存在。如果目录不存在,那么存在时间(age)和文件数量都会返回 -1,否则,这两个值就会返回给子过程。
子过程的下一步是一个 switch(决定)声明,它会决定如何处理目录。如果目录不存在(文件数 < 0),或者是当前日期(存在时间 < 1)并且文件数量少于 24(文件数 < 24),那么子过程就会直接转换到终止状态。如果所有文件都位于子目录中(文件数 > 23)或者目录是在至少七天前创建的(存在时间 > 6),那么就会有如下操作:
- 使用现存的 Map/reduce 程序 [2] 获取数据
- 目录会备份在数据归档中,然后删除
对 action 节点的其它配置
获取动作向你展示了另外一些 Oozie 配置参数,包括: - Prepare——如果出现了 prepare 参数,就意味着在启动作业(job)之前会删除路径列表。这应该专门用于清理目录。删除操作会在 fs.default.name 文件系统中执行。
- Configuration——如果出现了 configuration 元素,它其中就会包含针对 Map/Reduce 作业的 JobConf 属性。它不仅可以用于 map/reduce 动作, 而且还可以用于启动 map/reduce 作业的 java 动作。
如果不是以上两种情况,那么子过程就会发送剩余的邮件,然后退出。邮件是作为另一个 java 主类实现的(代码 3)。
package com.navteq.oozie; import java.util.Properties; import javax.mail.Message; import javax.mail.Session; import javax.mail.Transport; import javax.mail.internet.InternetAddress; import javax.mail.internet.MimeMessage; public class StandaloneMailer { private static String _mServer = "imailchi.navtech.com"; private static Properties _props = null; private StandaloneMailer(){} public static void init(String mServer){ _mServer = mServer; _props = new Properties(); _props.setProperty("mail.smtp.host", _mServer); } public static void SendMail(String subject, String message, String from, String to) throws Exception { // create some properties and get the default Session Session session = Session.getDefaultInstance(_props, null); // create a message Message msg = new MimeMessage(session); // set the from and to address InternetAddress addressFrom = new InternetAddress(from); msg.setFrom(addressFrom); String [] recipients = new String[] {to}; InternetAddress[] addressTo = new InternetAddress[recipients.length]; for (int i = 0; i < recipients.length; i++){ addressTo[i] = new InternetAddress(recipients[i]); } msg.setRecipients(Message.RecipientType.TO, addressTo); // Setting the Subject and Content Type msg.setSubject(subject); msg.setContent(message, "text/plain"); Transport.send(msg); } public static void main (String[] args) throws Exception { if (args.length ==5){ init(_mServer); StringBuilder subject = new StringBuilder(); StringBuilder body = new StringBuilder(); subject.append("Directory ").append(args[2]).append(" contains").append(args[3]).append(" files."); body.append("Directory ").append(args[2]).append(" is ").append(args[4]). append(" days old and contains only ").append(args[3]).append(" files instead of 24."); SendMail(subject.toString(), body.toString(), args[0], args[1]); } else throw new Exception("Invalid number of parameters provided for email"); } }
列表 3: 发送提醒邮件
这是使用了 javax.mail API 的简单实现,用于发送邮件。
主过程的实现
我们已经实现了子过程,然后,对主过程的实现就变得非常简单了(列表 4) [3] 。
<workflow-app xmlns='uri:oozie:workflow:0.1' name='processDirsWF'> <start to='getDirs2Process' /> <!-- STEP ONE --> <action name='getDirs2Process'> <!--writes 2 properties: dir.num-files: returns -1 if dir doesn't exist, otherwise returns # of files in dir dir.age: returns -1 if dir doesn't exist, otherwise returns age of dir in days --> <java> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <main-class>com.navteq.oozie.GenerateLookupDirs</main-class> <capture-output /> </java> <ok to="forkSubWorkflows" /> <error to="fail" /> </action> <fork name="forkSubWorkflows"> <path start="processDir0"/> <path start="processDir1"/> <path start="processDir2"/> <path start="processDir3"/> <path start="processDir4"/> <path start="processDir5"/> <path start="processDir6"/> <path start="processDir7"/> </fork> <action name="processDir0"> <sub-workflow> <app-path>hdfs://sachicn001:8020/user/gtitievs/workflows/ingest</app-path> <configuration> <property> <name>inputDir</name> <value>hdfs://sachicn001:8020/user/data/probedev/files/${wf:actionData('getDirs2Process')['dir0']}</value> </property> <property> <name>outputDir</name> <value>hdfs://sachicn001:8020/user/gtitievs/probe-output/${wf:actionData('getDirs2Process')['dir0']}</value> </property> <property> <name>jobTracker</name> <value>${jobTracker}</value> </property> <property> <name>nameNode</name> <value>${nameNode}</value> </property> <property> <name>activeDir</name> <value>hdfs://sachicn001:8020/user/gtitievs/test-activeDir</value> </property> <property> <name>dirName</name> <value>${wf:actionData('getDirs2Process')['dir0']}</value> </property> </configuration> </sub-workflow> <ok to="joining"/> <error to="fail"/> </action> …. <action name="processDir7"> <sub-workflow> <app-path>hdfs://sachicn001:8020/user/gtitievs/workflows/ingest</app-path> <configuration> <property> <name>inputDir</name> <value>hdfs://sachicn001:8020/user/data/probedev/files/${wf:actionData('getDirs2Process')['dir7']}</value> </property> <property> <name>outputDir</name> <value>hdfs://sachicn001:8020/user/gtitievs/probe-output/${wf:actionData('getDirs2Process')['dir7']}</value> </property> <property> <name>dirName</name> <value>${wf:actionData('getDirs2Process')['dir7']}</value> </property> </configuration> </sub-workflow> <ok to="joining"/> <error to="fail"/> </action> <join name="joining" to="end"/> <kill name="fail"> <message>Java failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> </kill> <end name='end' /> </workflow-app>
代码 4: 数据获取主过程
这个过程首先会触发 java 节点,计算需要处理的目录列表(列表 5),然后对每个目录执行子过程,从而处理给定的目录。
package com.navteq.oozie; import java.io.File; import java.io.FileOutputStream; import java.io.OutputStream; import java.util.Calendar; import java.util.GregorianCalendar; import java.util.Properties; public class GenerateLookupDirs { public static final long dayMillis = 1000 * 60 * 60 * 24; private static final String OOZIE_ACTION_OUTPUT_PROPERTIES = "oozie.action.output.properties"; public static void main(String[] args) throws Exception { Calendar curDate = new GregorianCalendar(); int year, month, date; String propKey, propVal; String oozieProp = System.getProperty(OOZIE_ACTION_OUTPUT_PROPERTIES); if (oozieProp != null) { File propFile = new File(oozieProp); Properties props = new Properties(); for (int i = 0; i<8; ++i) { year = curDate.get(Calendar.YEAR); month = curDate.get(Calendar.MONTH) + 1; date = curDate.get(Calendar.DATE); propKey = "dir"+i; propVal = year + "-" + (month < 10 ? "0" + month : month) + "-" + (date < 10 ? "0" + date : date); props.setProperty(propKey, propVal); curDate.setTimeInMillis(curDate.getTimeInMillis() - dayMillis); } OutputStream os = new FileOutputStream(propFile); props.store(os, ""); os.close(); } else throw new RuntimeException(OOZIE_ACTION_OUTPUT_PROPERTIES + " System property not defined"); } }
代码 5: 目录计算程序
结论
在这篇文章中,我们向你展示了一个更复杂的完整的工作流示例,它让我们可以演示更多的 Oozie 特性以及对它们的应用。在下一篇文章中,我们会讨论构建可重用的 Oozie 组件库,并使用自定义的节点扩展 Oozie。
致谢
非常感谢我们在 Navteq 的同事 Gregory Titievsky,他为我们实现了大部分代码。
关于作者
Boris Lublinsky是 NAVTEQ 公司的首席架构师,在这家公司中他的工作是为大型数据管理和处理、SOA 以及实现各种 NAVTEQ 的项目定义架构的愿景。他还是 InfoQ 的 SOA 编辑,以及 OASIS 的 SOA RA 工作组的参与者。Boris 是一位作者,还经常发表演讲,他最新的一本书是《Applied SOA》。
Michael Segel在过去二十多年间一直与客户写作,识别并解决他们的业务问题。Michael 已经作为多种角色、在多个行业中工作过。他是一位独立顾问,总是期望能够解决所有有挑战的问题。Michael 拥有俄亥俄州立大学的软件工程学位。
参考信息
1. Boris Lublinsky , Mike Segel 《Oozie 简介》
[1] 目录的名称是搜集这条数据的日期。
[2] 这是已经存在的程序,对它的描述与本文无关。
[3] 在此省略了一些重复代码。
查看英文原文: Oozie by Example
给 InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家加入到 InfoQ 中文站用户讨论组中与我们的编辑和其他读者朋友交流。
评论