写点什么

跟着示例学 Oozie

  • 2011-08-26
  • 本文字数:8273 字

    阅读完需:约 27 分钟

在前一篇文章《 Oozie 简介》中,我们已经描述了 Oozie 工作流服务器,并且展示了一个非常简单的工作流示例。我们还描述了针对 Oozie 的工作流的部署和配置,以及用来启动、停止和监控 Oozie 工作流的工具。

在本文中,我们会描述一个更加复杂的例子,通过它我们可以讨论更多 Oozie 特性,并演示如何来使用它们。

定义过程

我们在此描述的工作流会实现汽车 GPS 探测数据的获取过程。我们每个小时都会以文件的形式把探测数据传递到指定的 HDFS 目录中 [1] ,其中包含有这个小时之内的所有探测数据。探测数据的获取是每天针对一天内所有的 24 个文件完成的。如果文件的数量是 24,那么获取过程就会启动。否则:

  • 当天什么都不做
  • 对前一天——最多到 7 天,发送剩下的内容到探测数据提供程序
  • 如果目录的存在时间已达到 7 天,那么就获取所有可用的探测数据文件。

过程的总体实现请见图 1

(点击可以查看大图)。

图1: 过程图

在此,主流程(数据获取流程)首先会为今天以及之前的六天计算出目录的名称,然后启动(fork)七个目录的子过程(子流程)。待所有子过程的状态都变成终止之后,join 步骤就会把控制权交给end 状态。

子过程启动时,首先会获得关于目录的信息——它的日期以及文件数量。基于这条信息,它会决定是获取数据还是把数据归档,或者发送剩下的邮件,或者不做任何工作。

Directory 子过程实现

以下代码负责实现的是 directory 子过程(代码 1)。

复制代码
<workflow-app xmlns='uri:oozie:workflow:0.1' name='processDir'>
<start to='getDirInfo' />
<!-- STEP ONE -->
<action name='getDirInfo'>
<!--writes 2 properties: dir.num-files: returns -1 if dir doesn't exist,
otherwise returns # of files in dir dir.age: returns -1 if dir doesn't exist,
otherwise returns age of dir in days -->
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>com.navteq.oozie.GetDirInfo</main-class>
<arg>${inputDir}</arg>
<capture-output />
</java>
<ok to="makeIngestDecision" />
<error to="fail" />
</action>
<!-- STEP TWO -->
<decision name="makeIngestDecision">
<switch>
<!-- empty or doesn't exist -->
<case to="end">
${wf:actionData('getDirInfo')['dir.num-files'] lt 0 ||
(wf:actionData('getDirInfo')['dir.age'] lt 1 and
wf:actionData('getDirInfo')['dir.num-files'] lt 24)}
</case>
<!-- # of files >= 24 -->
<case to="ingest">
${wf:actionData('getDirInfo')['dir.num-files'] gt 23 ||
wf:actionData('getDirInfo')['dir.age'] gt 6}
</case>
<default to="sendEmail"/>
</switch>
</decision>
<!--EMAIL-->
<action name="sendEmail">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>com.navteq.oozie.StandaloneMailer</main-class>
<arg>probedata2@navteq.com</arg>
<arg>gregory.titievsky@navteq.com</arg>
<arg>${inputDir}</arg>
<arg>${wf:actionData('getDirInfo')['dir.num-files']}</arg>
<arg>${wf:actionData('getDirInfo')['dir.age']}</arg>
</java>
<ok to="end" />
<error to="fail" />
</action>
<!--INGESTION -->
<action name="ingest">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${outputDir}" />
</prepare>
<configuration>
<property>
<name>mapred.reduce.tasks</name>
<value>300</value>
</property>
</configuration>
<main-class>com.navteq.probedata.drivers.ProbeIngest</main-class>
<arg>-conf</arg>
<arg>action.xml</arg>
<arg>${inputDir}</arg>
<arg>${outputDir}</arg>
</java>
<ok to=" archive-data" />
<error to="ingest-fail" />
</action>
<!—Archive Data -->
<action name="archive-data">
<fs>
<move source='${inputDir}' target='/probe/backup/${dirName}' />
<delete path = '${inputDir}' />
</fs>
<ok to="end" />
<error to="ingest-fail" />
</action>
<kill name="ingest-fail">
<message>Ingestion failed, error
message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<kill name="fail">
<message>Java failed, error
message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name='end' />
</workflow-app>

代码 1: Directory 子过程

这个子过程的 start 节点会触发自定义的 java 节点,这个节点会获得目录信息(代码 2)。

复制代码
package com.navteq.oozie;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.util.GregorianCalendar;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class GetDirInfo {
private static final String OOZIE_ACTION_OUTPUT_PROPERTIES = "oozie.action.output.properties";
public static void main(String[] args) throws Exception {
String dirPath = args[0];
String propKey0 = "dir.num-files";
String propVal0 = "-1";
String propKey1 = "dir.age";
String propVal1 = "-1";
System.out.println("Directory path: '"+dirPath+"'");
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path hadoopDir = new Path(dirPath);
if (fs.exists(hadoopDir)){
FileStatus[] files = FileSystem.get(conf).listStatus(hadoopDir);
int numFilesInDir = files.length;
propVal0 = Integer.toString(numFilesInDir);
long timePassed, daysPassedLong;
int daysPassed;
String dirName = hadoopDir.getName();
String[] dirNameArray = dirName.split("-");
if (dirNameArray.length == 3) {
int year = Integer.valueOf(dirNameArray[0]);
int month = Integer.valueOf(dirNameArray[1]) - 1; //months are 0 based
int date = Integer.valueOf(dirNameArray[2]);
GregorianCalendar dirCreationDate = new GregorianCalendar(year,
month, date);
timePassed = (new GregorianCalendar()).getTimeInMillis()
- dirCreationDate.getTimeInMillis();
daysPassed = (int) = timePassed / 1000 / 60 / 60 / 24;;
propVal1 = Integer.toString(daysPassed);
}
}
String oozieProp = System.getProperty(OOZIE_ACTION_OUTPUT_PROPERTIES);
if (oozieProp != null) {
File propFile = new File(oozieProp);
Properties props = new Properties();
props.setProperty(propKey0, propVal0);
props.setProperty(propKey1, propVal1);
OutputStream os = new FileOutputStream(propFile);
props.store(os, "");
os.close();
} else
throw new RuntimeException(OOZIE_ACTION_OUTPUT_PROPERTIES
+ " System property not defined");
}
}

代码 2: 获得目录信息的节点

这个类会获得目录名作为输入的参数,并首先检查该目录是否存在。如果目录不存在,那么存在时间(age)和文件数量都会返回 -1,否则,这两个值就会返回给子过程。

子过程的下一步是一个 switch(决定)声明,它会决定如何处理目录。如果目录不存在(文件数 < 0),或者是当前日期(存在时间 < 1)并且文件数量少于 24(文件数 < 24),那么子过程就会直接转换到终止状态。如果所有文件都位于子目录中(文件数 > 23)或者目录是在至少七天前创建的(存在时间 > 6),那么就会有如下操作:

  • 使用现存的 Map/reduce 程序 [2] 获取数据
  • 目录会备份在数据归档中,然后删除

对 action 节点的其它配置

获取动作向你展示了另外一些 Oozie 配置参数,包括: - Prepare——如果出现了 prepare 参数,就意味着在启动作业(job)之前会删除路径列表。这应该专门用于清理目录。删除操作会在 fs.default.name 文件系统中执行。

  • Configuration——如果出现了 configuration 元素,它其中就会包含针对 Map/Reduce 作业的 JobConf 属性。它不仅可以用于 map/reduce 动作, 而且还可以用于启动 map/reduce 作业的 java 动作。

如果不是以上两种情况,那么子过程就会发送剩余的邮件,然后退出。邮件是作为另一个 java 主类实现的(代码 3)。

复制代码
package com.navteq.oozie;
import java.util.Properties;
import javax.mail.Message;
import javax.mail.Session;
import javax.mail.Transport;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;
public class StandaloneMailer {
private static String _mServer = "imailchi.navtech.com";
private static Properties _props = null;
private StandaloneMailer(){}
public static void init(String mServer){
_mServer = mServer;
_props = new Properties();
_props.setProperty("mail.smtp.host", _mServer);
}
public static void SendMail(String subject, String message, String from, String to) throws Exception {
// create some properties and get the default Session
Session session = Session.getDefaultInstance(_props, null);
// create a message
Message msg = new MimeMessage(session);
// set the from and to address
InternetAddress addressFrom = new InternetAddress(from);
msg.setFrom(addressFrom);
String [] recipients = new String[] {to};
InternetAddress[] addressTo = new InternetAddress[recipients.length];
for (int i = 0; i < recipients.length; i++){
addressTo[i] = new InternetAddress(recipients[i]);
}
msg.setRecipients(Message.RecipientType.TO, addressTo);
// Setting the Subject and Content Type
msg.setSubject(subject);
msg.setContent(message, "text/plain");
Transport.send(msg);
}
public static void main (String[] args) throws Exception {
if (args.length ==5){
init(_mServer);
StringBuilder subject = new StringBuilder();
StringBuilder body = new StringBuilder();
subject.append("Directory ").append(args[2]).append(" contains").append(args[3]).append(" files.");
body.append("Directory ").append(args[2]).append(" is ").append(args[4]).
append(" days old and contains only ").append(args[3]).append(" files instead of 24.");
SendMail(subject.toString(), body.toString(), args[0], args[1]);
}
else throw new Exception("Invalid number of parameters provided for email");
}
}

列表 3: 发送提醒邮件

这是使用了 javax.mail API 的简单实现,用于发送邮件。

主过程的实现

我们已经实现了子过程,然后,对主过程的实现就变得非常简单了(列表 4) [3]

复制代码
<workflow-app xmlns='uri:oozie:workflow:0.1' name='processDirsWF'>
<start to='getDirs2Process' />
<!-- STEP ONE -->
<action name='getDirs2Process'>
<!--writes 2 properties: dir.num-files: returns -1 if dir doesn't exist,
otherwise returns # of files in dir dir.age: returns -1 if dir doesn't exist,
otherwise returns age of dir in days -->
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>com.navteq.oozie.GenerateLookupDirs</main-class>
<capture-output />
</java>
<ok to="forkSubWorkflows" />
<error to="fail" />
</action>
<fork name="forkSubWorkflows">
<path start="processDir0"/>
<path start="processDir1"/>
<path start="processDir2"/>
<path start="processDir3"/>
<path start="processDir4"/>
<path start="processDir5"/>
<path start="processDir6"/>
<path start="processDir7"/>
</fork>
<action name="processDir0">
<sub-workflow>
<app-path>hdfs://sachicn001:8020/user/gtitievs/workflows/ingest</app-path>
<configuration>
<property>
<name>inputDir</name>
<value>hdfs://sachicn001:8020/user/data/probedev/files/${wf:actionData('getDirs2Process')['dir0']}</value>
</property>
<property>
<name>outputDir</name>
<value>hdfs://sachicn001:8020/user/gtitievs/probe-output/${wf:actionData('getDirs2Process')['dir0']}</value>
</property>
<property>
<name>jobTracker</name>
<value>${jobTracker}</value>
</property>
<property>
<name>nameNode</name>
<value>${nameNode}</value>
</property>
<property>
<name>activeDir</name>
<value>hdfs://sachicn001:8020/user/gtitievs/test-activeDir</value>
</property>
<property>
<name>dirName</name>
<value>${wf:actionData('getDirs2Process')['dir0']}</value>
</property>
</configuration>
</sub-workflow>
<ok to="joining"/>
<error to="fail"/>
</action>
….
<action name="processDir7">
<sub-workflow>
<app-path>hdfs://sachicn001:8020/user/gtitievs/workflows/ingest</app-path>
<configuration>
<property>
<name>inputDir</name>
<value>hdfs://sachicn001:8020/user/data/probedev/files/${wf:actionData('getDirs2Process')['dir7']}</value>
</property>
<property>
<name>outputDir</name>
<value>hdfs://sachicn001:8020/user/gtitievs/probe-output/${wf:actionData('getDirs2Process')['dir7']}</value>
</property>
<property>
<name>dirName</name>
<value>${wf:actionData('getDirs2Process')['dir7']}</value>
</property>
</configuration>
</sub-workflow>
<ok to="joining"/>
<error to="fail"/>
</action>
<join name="joining" to="end"/>
<kill name="fail">
<message>Java failed, error
message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name='end' />
</workflow-app>

代码 4: 数据获取主过程

这个过程首先会触发 java 节点,计算需要处理的目录列表(列表 5),然后对每个目录执行子过程,从而处理给定的目录。

复制代码
package com.navteq.oozie;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.Properties;
public class GenerateLookupDirs {
public static final long dayMillis = 1000 * 60 * 60 * 24;
private static final String OOZIE_ACTION_OUTPUT_PROPERTIES = "oozie.action.output.properties";
public static void main(String[] args) throws Exception {
Calendar curDate = new GregorianCalendar();
int year, month, date;
String propKey, propVal;
String oozieProp = System.getProperty(OOZIE_ACTION_OUTPUT_PROPERTIES);
if (oozieProp != null) {
File propFile = new File(oozieProp);
Properties props = new Properties();
for (int i = 0; i<8; ++i)
{
year = curDate.get(Calendar.YEAR);
month = curDate.get(Calendar.MONTH) + 1;
date = curDate.get(Calendar.DATE);
propKey = "dir"+i;
propVal = year + "-" +
(month < 10 ? "0" + month : month) + "-" +
(date < 10 ? "0" + date : date);
props.setProperty(propKey, propVal);
curDate.setTimeInMillis(curDate.getTimeInMillis() - dayMillis);
}
OutputStream os = new FileOutputStream(propFile);
props.store(os, "");
os.close();
} else
throw new RuntimeException(OOZIE_ACTION_OUTPUT_PROPERTIES
+ " System property not defined");
}
}

代码 5: 目录计算程序

结论

在这篇文章中,我们向你展示了一个更复杂的完整的工作流示例,它让我们可以演示更多的 Oozie 特性以及对它们的应用。在下一篇文章中,我们会讨论构建可重用的 Oozie 组件库,并使用自定义的节点扩展 Oozie。

致谢

非常感谢我们在 Navteq 的同事 Gregory Titievsky,他为我们实现了大部分代码。

关于作者

Boris Lublinsky是 NAVTEQ 公司的首席架构师,在这家公司中他的工作是为大型数据管理和处理、SOA 以及实现各种 NAVTEQ 的项目定义架构的愿景。他还是 InfoQ 的 SOA 编辑,以及 OASIS 的 SOA RA 工作组的参与者。Boris 是一位作者,还经常发表演讲,他最新的一本书是《Applied SOA》。

Michael Segel在过去二十多年间一直与客户写作,识别并解决他们的业务问题。Michael 已经作为多种角色、在多个行业中工作过。他是一位独立顾问,总是期望能够解决所有有挑战的问题。Michael 拥有俄亥俄州立大学的软件工程学位。

参考信息

1. Boris Lublinsky Mike Segel 《Oozie 简介》


[1] 目录的名称是搜集这条数据的日期。

[2] 这是已经存在的程序,对它的描述与本文无关。

[3] 在此省略了一些重复代码。

查看英文原文: Oozie by Example


给 InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家加入到 InfoQ 中文站用户讨论组中与我们的编辑和其他读者朋友交流。

2011-08-26 00:0030640
用户头像

发布了 340 篇内容, 共 129.4 次阅读, 收获喜欢 13 次。

关注

评论

发布
暂无评论
发现更多内容

逆袭之路,普通二本的八年开发码农如何进阿里拿年薪百万

小谈

Java 面试

高承实:区块链在新基建中的作用和未来发展

CECBC

新基建 政策扶持 技术特征 链上数据 产业场景

ARTS Week6

时之虫

ARTS 打卡计划

分布式缓存

Axe

测试开发工程师修炼手册—测试技能大盘点

Zoe

测试工程师产出

理解 Mysql 索引底层原理只需这一篇就够了

小谈

MySQL 数据结构 面试 Spring Cloud Spring Boot

Week5命题作业

星河寒水

极客大学架构师训练营

面试细节: i = i++和 i = ++i

Java小咖秀

面试 JVM 经验分享

架构师训练营第4周总结

aoeiuvzcs

Go: 字符串和转换优化

陈思敏捷

string 字符串 Go 语言

一个简单的技术选型心得

i风语

Java 架构

Redis系列(五):你要的Redis集群搭建来了,实践与否你自己选!

z小赵

Java redis 分布式 高并发

Android架构组件-App架构指南,你还不收藏嘛

小吴选手

架构 架构师 架构总结 架构要素 P7架构师

18个Java8日期处理的实践,太有用了建议收藏

码哥小胖

MySQL SQL语法 sql查询

简直了!顶级架构师分享心得,如何在项目中兼容多种数据库

犬来八荒

Java MySQL 数据库 面试

锦囊篇|一文摸懂SharedPreferences和MMKV(二)

ClericYi

游戏夜读 | 互动剧的黎明到了?

game1night

我是如何解决邮件焦虑的

vinkyqy

效率 职场 邮件

为什么我建议你读一读历史?

Phoenix

历史 中国历史

阿里技术官:这样带你学Spring全家桶,其实没你想的那么难

小吴选手

spring Spring Cloud Spring Boot

太牛 了!快码住!GitHub上标星75k!超牛的《Java面试突击版》

犬来八荒

Java git Linux 面试

向女朋友解释乐观锁与悲观锁的小妙招!

小闫

spring 面试 Spring Cloud 乐观锁 悲观锁

深入理解编译优化之循环展开和粗化锁

程序那些事

JIT 编译优化 循环展开 粗化锁

java架构-一些设计上的基本常识

猿灯塔

Java

第四周

仪轩

六月我在工作中蜕变,勤奋小人打架终于赢了

程序员小跃

效率工具 加班 沟通 复盘

五分钟让你搞懂Nginx负载均衡原理及四种负载均衡算法

架构大数据双料架构师

马匹、马镫、马车,和华为的数据基础设施革新

脑极体

程序员的晚餐 | 7 月 3 日 好久没做饭

清远

美食

猿灯塔:疫情冲击,去体验远程面试被怼10分钟,今年Java开发找工作真难

猿灯塔

Redis分布式锁课堂开课了!

小闫

redis Spring Cloud Redis项目

跟着示例学Oozie_Java_Boris Lublinsky_InfoQ精选文章