写点什么

跟着示例学 Oozie

  • 2011-08-26
  • 本文字数:8273 字

    阅读完需:约 27 分钟

在前一篇文章《 Oozie 简介》中,我们已经描述了 Oozie 工作流服务器,并且展示了一个非常简单的工作流示例。我们还描述了针对 Oozie 的工作流的部署和配置,以及用来启动、停止和监控 Oozie 工作流的工具。

在本文中,我们会描述一个更加复杂的例子,通过它我们可以讨论更多 Oozie 特性,并演示如何来使用它们。

定义过程

我们在此描述的工作流会实现汽车 GPS 探测数据的获取过程。我们每个小时都会以文件的形式把探测数据传递到指定的 HDFS 目录中 [1] ,其中包含有这个小时之内的所有探测数据。探测数据的获取是每天针对一天内所有的 24 个文件完成的。如果文件的数量是 24,那么获取过程就会启动。否则:

  • 当天什么都不做
  • 对前一天——最多到 7 天,发送剩下的内容到探测数据提供程序
  • 如果目录的存在时间已达到 7 天,那么就获取所有可用的探测数据文件。

过程的总体实现请见图 1

(点击可以查看大图)。

图1: 过程图

在此,主流程(数据获取流程)首先会为今天以及之前的六天计算出目录的名称,然后启动(fork)七个目录的子过程(子流程)。待所有子过程的状态都变成终止之后,join 步骤就会把控制权交给end 状态。

子过程启动时,首先会获得关于目录的信息——它的日期以及文件数量。基于这条信息,它会决定是获取数据还是把数据归档,或者发送剩下的邮件,或者不做任何工作。

Directory 子过程实现

以下代码负责实现的是 directory 子过程(代码 1)。

复制代码
<workflow-app xmlns='uri:oozie:workflow:0.1' name='processDir'>
<start to='getDirInfo' />
<!-- STEP ONE -->
<action name='getDirInfo'>
<!--writes 2 properties: dir.num-files: returns -1 if dir doesn't exist,
otherwise returns # of files in dir dir.age: returns -1 if dir doesn't exist,
otherwise returns age of dir in days -->
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>com.navteq.oozie.GetDirInfo</main-class>
<arg>${inputDir}</arg>
<capture-output />
</java>
<ok to="makeIngestDecision" />
<error to="fail" />
</action>
<!-- STEP TWO -->
<decision name="makeIngestDecision">
<switch>
<!-- empty or doesn't exist -->
<case to="end">
${wf:actionData('getDirInfo')['dir.num-files'] lt 0 ||
(wf:actionData('getDirInfo')['dir.age'] lt 1 and
wf:actionData('getDirInfo')['dir.num-files'] lt 24)}
</case>
<!-- # of files >= 24 -->
<case to="ingest">
${wf:actionData('getDirInfo')['dir.num-files'] gt 23 ||
wf:actionData('getDirInfo')['dir.age'] gt 6}
</case>
<default to="sendEmail"/>
</switch>
</decision>
<!--EMAIL-->
<action name="sendEmail">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>com.navteq.oozie.StandaloneMailer</main-class>
<arg>probedata2@navteq.com</arg>
<arg>gregory.titievsky@navteq.com</arg>
<arg>${inputDir}</arg>
<arg>${wf:actionData('getDirInfo')['dir.num-files']}</arg>
<arg>${wf:actionData('getDirInfo')['dir.age']}</arg>
</java>
<ok to="end" />
<error to="fail" />
</action>
<!--INGESTION -->
<action name="ingest">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${outputDir}" />
</prepare>
<configuration>
<property>
<name>mapred.reduce.tasks</name>
<value>300</value>
</property>
</configuration>
<main-class>com.navteq.probedata.drivers.ProbeIngest</main-class>
<arg>-conf</arg>
<arg>action.xml</arg>
<arg>${inputDir}</arg>
<arg>${outputDir}</arg>
</java>
<ok to=" archive-data" />
<error to="ingest-fail" />
</action>
<!—Archive Data -->
<action name="archive-data">
<fs>
<move source='${inputDir}' target='/probe/backup/${dirName}' />
<delete path = '${inputDir}' />
</fs>
<ok to="end" />
<error to="ingest-fail" />
</action>
<kill name="ingest-fail">
<message>Ingestion failed, error
message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<kill name="fail">
<message>Java failed, error
message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name='end' />
</workflow-app>

代码 1: Directory 子过程

这个子过程的 start 节点会触发自定义的 java 节点,这个节点会获得目录信息(代码 2)。

复制代码
package com.navteq.oozie;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.util.GregorianCalendar;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class GetDirInfo {
private static final String OOZIE_ACTION_OUTPUT_PROPERTIES = "oozie.action.output.properties";
public static void main(String[] args) throws Exception {
String dirPath = args[0];
String propKey0 = "dir.num-files";
String propVal0 = "-1";
String propKey1 = "dir.age";
String propVal1 = "-1";
System.out.println("Directory path: '"+dirPath+"'");
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path hadoopDir = new Path(dirPath);
if (fs.exists(hadoopDir)){
FileStatus[] files = FileSystem.get(conf).listStatus(hadoopDir);
int numFilesInDir = files.length;
propVal0 = Integer.toString(numFilesInDir);
long timePassed, daysPassedLong;
int daysPassed;
String dirName = hadoopDir.getName();
String[] dirNameArray = dirName.split("-");
if (dirNameArray.length == 3) {
int year = Integer.valueOf(dirNameArray[0]);
int month = Integer.valueOf(dirNameArray[1]) - 1; //months are 0 based
int date = Integer.valueOf(dirNameArray[2]);
GregorianCalendar dirCreationDate = new GregorianCalendar(year,
month, date);
timePassed = (new GregorianCalendar()).getTimeInMillis()
- dirCreationDate.getTimeInMillis();
daysPassed = (int) = timePassed / 1000 / 60 / 60 / 24;;
propVal1 = Integer.toString(daysPassed);
}
}
String oozieProp = System.getProperty(OOZIE_ACTION_OUTPUT_PROPERTIES);
if (oozieProp != null) {
File propFile = new File(oozieProp);
Properties props = new Properties();
props.setProperty(propKey0, propVal0);
props.setProperty(propKey1, propVal1);
OutputStream os = new FileOutputStream(propFile);
props.store(os, "");
os.close();
} else
throw new RuntimeException(OOZIE_ACTION_OUTPUT_PROPERTIES
+ " System property not defined");
}
}

代码 2: 获得目录信息的节点

这个类会获得目录名作为输入的参数,并首先检查该目录是否存在。如果目录不存在,那么存在时间(age)和文件数量都会返回 -1,否则,这两个值就会返回给子过程。

子过程的下一步是一个 switch(决定)声明,它会决定如何处理目录。如果目录不存在(文件数 < 0),或者是当前日期(存在时间 < 1)并且文件数量少于 24(文件数 < 24),那么子过程就会直接转换到终止状态。如果所有文件都位于子目录中(文件数 > 23)或者目录是在至少七天前创建的(存在时间 > 6),那么就会有如下操作:

  • 使用现存的 Map/reduce 程序 [2] 获取数据
  • 目录会备份在数据归档中,然后删除

对 action 节点的其它配置

获取动作向你展示了另外一些 Oozie 配置参数,包括: - Prepare——如果出现了 prepare 参数,就意味着在启动作业(job)之前会删除路径列表。这应该专门用于清理目录。删除操作会在 fs.default.name 文件系统中执行。

  • Configuration——如果出现了 configuration 元素,它其中就会包含针对 Map/Reduce 作业的 JobConf 属性。它不仅可以用于 map/reduce 动作, 而且还可以用于启动 map/reduce 作业的 java 动作。

如果不是以上两种情况,那么子过程就会发送剩余的邮件,然后退出。邮件是作为另一个 java 主类实现的(代码 3)。

复制代码
package com.navteq.oozie;
import java.util.Properties;
import javax.mail.Message;
import javax.mail.Session;
import javax.mail.Transport;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;
public class StandaloneMailer {
private static String _mServer = "imailchi.navtech.com";
private static Properties _props = null;
private StandaloneMailer(){}
public static void init(String mServer){
_mServer = mServer;
_props = new Properties();
_props.setProperty("mail.smtp.host", _mServer);
}
public static void SendMail(String subject, String message, String from, String to) throws Exception {
// create some properties and get the default Session
Session session = Session.getDefaultInstance(_props, null);
// create a message
Message msg = new MimeMessage(session);
// set the from and to address
InternetAddress addressFrom = new InternetAddress(from);
msg.setFrom(addressFrom);
String [] recipients = new String[] {to};
InternetAddress[] addressTo = new InternetAddress[recipients.length];
for (int i = 0; i < recipients.length; i++){
addressTo[i] = new InternetAddress(recipients[i]);
}
msg.setRecipients(Message.RecipientType.TO, addressTo);
// Setting the Subject and Content Type
msg.setSubject(subject);
msg.setContent(message, "text/plain");
Transport.send(msg);
}
public static void main (String[] args) throws Exception {
if (args.length ==5){
init(_mServer);
StringBuilder subject = new StringBuilder();
StringBuilder body = new StringBuilder();
subject.append("Directory ").append(args[2]).append(" contains").append(args[3]).append(" files.");
body.append("Directory ").append(args[2]).append(" is ").append(args[4]).
append(" days old and contains only ").append(args[3]).append(" files instead of 24.");
SendMail(subject.toString(), body.toString(), args[0], args[1]);
}
else throw new Exception("Invalid number of parameters provided for email");
}
}

列表 3: 发送提醒邮件

这是使用了 javax.mail API 的简单实现,用于发送邮件。

主过程的实现

我们已经实现了子过程,然后,对主过程的实现就变得非常简单了(列表 4) [3]

复制代码
<workflow-app xmlns='uri:oozie:workflow:0.1' name='processDirsWF'>
<start to='getDirs2Process' />
<!-- STEP ONE -->
<action name='getDirs2Process'>
<!--writes 2 properties: dir.num-files: returns -1 if dir doesn't exist,
otherwise returns # of files in dir dir.age: returns -1 if dir doesn't exist,
otherwise returns age of dir in days -->
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>com.navteq.oozie.GenerateLookupDirs</main-class>
<capture-output />
</java>
<ok to="forkSubWorkflows" />
<error to="fail" />
</action>
<fork name="forkSubWorkflows">
<path start="processDir0"/>
<path start="processDir1"/>
<path start="processDir2"/>
<path start="processDir3"/>
<path start="processDir4"/>
<path start="processDir5"/>
<path start="processDir6"/>
<path start="processDir7"/>
</fork>
<action name="processDir0">
<sub-workflow>
<app-path>hdfs://sachicn001:8020/user/gtitievs/workflows/ingest</app-path>
<configuration>
<property>
<name>inputDir</name>
<value>hdfs://sachicn001:8020/user/data/probedev/files/${wf:actionData('getDirs2Process')['dir0']}</value>
</property>
<property>
<name>outputDir</name>
<value>hdfs://sachicn001:8020/user/gtitievs/probe-output/${wf:actionData('getDirs2Process')['dir0']}</value>
</property>
<property>
<name>jobTracker</name>
<value>${jobTracker}</value>
</property>
<property>
<name>nameNode</name>
<value>${nameNode}</value>
</property>
<property>
<name>activeDir</name>
<value>hdfs://sachicn001:8020/user/gtitievs/test-activeDir</value>
</property>
<property>
<name>dirName</name>
<value>${wf:actionData('getDirs2Process')['dir0']}</value>
</property>
</configuration>
</sub-workflow>
<ok to="joining"/>
<error to="fail"/>
</action>
….
<action name="processDir7">
<sub-workflow>
<app-path>hdfs://sachicn001:8020/user/gtitievs/workflows/ingest</app-path>
<configuration>
<property>
<name>inputDir</name>
<value>hdfs://sachicn001:8020/user/data/probedev/files/${wf:actionData('getDirs2Process')['dir7']}</value>
</property>
<property>
<name>outputDir</name>
<value>hdfs://sachicn001:8020/user/gtitievs/probe-output/${wf:actionData('getDirs2Process')['dir7']}</value>
</property>
<property>
<name>dirName</name>
<value>${wf:actionData('getDirs2Process')['dir7']}</value>
</property>
</configuration>
</sub-workflow>
<ok to="joining"/>
<error to="fail"/>
</action>
<join name="joining" to="end"/>
<kill name="fail">
<message>Java failed, error
message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name='end' />
</workflow-app>

代码 4: 数据获取主过程

这个过程首先会触发 java 节点,计算需要处理的目录列表(列表 5),然后对每个目录执行子过程,从而处理给定的目录。

复制代码
package com.navteq.oozie;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.Properties;
public class GenerateLookupDirs {
public static final long dayMillis = 1000 * 60 * 60 * 24;
private static final String OOZIE_ACTION_OUTPUT_PROPERTIES = "oozie.action.output.properties";
public static void main(String[] args) throws Exception {
Calendar curDate = new GregorianCalendar();
int year, month, date;
String propKey, propVal;
String oozieProp = System.getProperty(OOZIE_ACTION_OUTPUT_PROPERTIES);
if (oozieProp != null) {
File propFile = new File(oozieProp);
Properties props = new Properties();
for (int i = 0; i<8; ++i)
{
year = curDate.get(Calendar.YEAR);
month = curDate.get(Calendar.MONTH) + 1;
date = curDate.get(Calendar.DATE);
propKey = "dir"+i;
propVal = year + "-" +
(month < 10 ? "0" + month : month) + "-" +
(date < 10 ? "0" + date : date);
props.setProperty(propKey, propVal);
curDate.setTimeInMillis(curDate.getTimeInMillis() - dayMillis);
}
OutputStream os = new FileOutputStream(propFile);
props.store(os, "");
os.close();
} else
throw new RuntimeException(OOZIE_ACTION_OUTPUT_PROPERTIES
+ " System property not defined");
}
}

代码 5: 目录计算程序

结论

在这篇文章中,我们向你展示了一个更复杂的完整的工作流示例,它让我们可以演示更多的 Oozie 特性以及对它们的应用。在下一篇文章中,我们会讨论构建可重用的 Oozie 组件库,并使用自定义的节点扩展 Oozie。

致谢

非常感谢我们在 Navteq 的同事 Gregory Titievsky,他为我们实现了大部分代码。

关于作者

Boris Lublinsky是 NAVTEQ 公司的首席架构师,在这家公司中他的工作是为大型数据管理和处理、SOA 以及实现各种 NAVTEQ 的项目定义架构的愿景。他还是 InfoQ 的 SOA 编辑,以及 OASIS 的 SOA RA 工作组的参与者。Boris 是一位作者,还经常发表演讲,他最新的一本书是《Applied SOA》。

Michael Segel在过去二十多年间一直与客户写作,识别并解决他们的业务问题。Michael 已经作为多种角色、在多个行业中工作过。他是一位独立顾问,总是期望能够解决所有有挑战的问题。Michael 拥有俄亥俄州立大学的软件工程学位。

参考信息

1. Boris Lublinsky Mike Segel 《Oozie 简介》


[1] 目录的名称是搜集这条数据的日期。

[2] 这是已经存在的程序,对它的描述与本文无关。

[3] 在此省略了一些重复代码。

查看英文原文: Oozie by Example


给 InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家加入到 InfoQ 中文站用户讨论组中与我们的编辑和其他读者朋友交流。

2011-08-26 00:0030742
用户头像

发布了 340 篇内容, 共 131.7 次阅读, 收获喜欢 13 次。

关注

评论

发布
暂无评论
发现更多内容

启动!阿里巴巴编程之夏2022

阿里巴巴云原生

阿里云 云原生 编程之夏

Serverless 时代下微服务应用全托管解决方案

阿里巴巴云原生

阿里云 Serverless 微服务 云原生

SQL操作:WITH表达式及其应用

华为云开发者联盟

数据库 sql 后端 华为云

Redis 做缓存场景引发的问题

Ayue、

redis

数据库主键一定要自增吗?有哪些场景不建议自增?

CRMEB

vue2升级vue3:Vue2/3插槽——vue3的jsx组件插槽slot怎么处理

zhoulujun

typescript Vue3 slots tsx slot

文件I/O

fy

UNIX系统

昇腾科研创新使能计划赋能开发者  华为计算提供三大维度支持

Geek_2d6073

Nebula Graph入驻阿里云计算巢,助力企业打造云上超大规模图数据库

阿里云弹性计算

spark 分布式 云原生 图数据库 计算巢

有哪些新手程序员不知道的小技巧?

Jackpop

AWS CloudWatch

冯亮

云计算 监控 AWS

揭秘支撑百度搜索、Feed、小程序三大业务的MVVM框架设计思想,San 核心人员倾力打造

图灵教育

前端开发 好书推荐 框架设计

Dubbo3 官方文档贡献者征集令

阿里巴巴云原生

阿里云 开源 云原生 dubbo

小程序怎样关联微信小程序二维码,实现二码合一聚合

Geek_99967b

小程序 小程序容器

vue2升级vue3:Vue3时jsx组件绑定自定义的事件、v-model、sync修

zhoulujun

Vue3 sync tsx

力扣每日一练之双指针1Day8

京与旧铺

6月月更

pnpm 中无法使用 patch-package 打补丁

OpenHacker

前端 js

vue-axios(一)

小恺

6月月更

一行代码可以做什么?

Jackpop

从-1开始实现一个中间件

艾小仙

Java 中间件 springboot

Docker入坑篇

青柚1943

Docker DevOps 云原生 容器化

Redis HyperLogLog 是什么?这些场景使用让我枪出如龙一笑破苍穹

码哥字节

redis HyperLogLog Redis 数据结构

基于 K8s 的交付难题退退退!| 独家交付秘籍(第三回)

阿里巴巴云原生

阿里云 Kubernetes 云原生 应用交付平台

带你区分几种并行

华为云开发者联盟

后端 开发 华为云

拥抱开放,Serverless 时代的下一征程

阿里巴巴云原生

阿里云 Serverless 云原生 SAE

Vscode有什么好用的插件?

Jackpop

C语言数组与指针练习题(原题+解析+原码)

未见花闻

6月月更

秒云云原生信创全兼容解决方案再升级,助力信创产业加速落地

MIAOYUN

运维 云原生 信创 智能运维 信创云

技术实践 | 场景导向的音视频通话体验优化

融云 RongCloud

阿里云 ACK One、ACK 云原生 AI 套件新发布,解决算力时代下场景化需求

阿里巴巴云原生

阿里云 云原生 分布式云容器平台 ACK One ACK 云原生 AI 套件

vue2升级vue3:vue2 vue-i18n 升级到vue3搭配VueI18n v9

zhoulujun

国际化 i18n vue-i18n

跟着示例学Oozie_Java_Boris Lublinsky_InfoQ精选文章