写点什么

跟着示例学 Oozie

  • 2011-08-26
  • 本文字数:8273 字

    阅读完需:约 27 分钟

在前一篇文章《 Oozie 简介》中,我们已经描述了 Oozie 工作流服务器,并且展示了一个非常简单的工作流示例。我们还描述了针对 Oozie 的工作流的部署和配置,以及用来启动、停止和监控 Oozie 工作流的工具。

在本文中,我们会描述一个更加复杂的例子,通过它我们可以讨论更多 Oozie 特性,并演示如何来使用它们。

定义过程

我们在此描述的工作流会实现汽车 GPS 探测数据的获取过程。我们每个小时都会以文件的形式把探测数据传递到指定的 HDFS 目录中 [1] ,其中包含有这个小时之内的所有探测数据。探测数据的获取是每天针对一天内所有的 24 个文件完成的。如果文件的数量是 24,那么获取过程就会启动。否则:

  • 当天什么都不做
  • 对前一天——最多到 7 天,发送剩下的内容到探测数据提供程序
  • 如果目录的存在时间已达到 7 天,那么就获取所有可用的探测数据文件。

过程的总体实现请见图 1

(点击可以查看大图)。

图1: 过程图

在此,主流程(数据获取流程)首先会为今天以及之前的六天计算出目录的名称,然后启动(fork)七个目录的子过程(子流程)。待所有子过程的状态都变成终止之后,join 步骤就会把控制权交给end 状态。

子过程启动时,首先会获得关于目录的信息——它的日期以及文件数量。基于这条信息,它会决定是获取数据还是把数据归档,或者发送剩下的邮件,或者不做任何工作。

Directory 子过程实现

以下代码负责实现的是 directory 子过程(代码 1)。

复制代码
<workflow-app xmlns='uri:oozie:workflow:0.1' name='processDir'>
<start to='getDirInfo' />
<!-- STEP ONE -->
<action name='getDirInfo'>
<!--writes 2 properties: dir.num-files: returns -1 if dir doesn't exist,
otherwise returns # of files in dir dir.age: returns -1 if dir doesn't exist,
otherwise returns age of dir in days -->
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>com.navteq.oozie.GetDirInfo</main-class>
<arg>${inputDir}</arg>
<capture-output />
</java>
<ok to="makeIngestDecision" />
<error to="fail" />
</action>
<!-- STEP TWO -->
<decision name="makeIngestDecision">
<switch>
<!-- empty or doesn't exist -->
<case to="end">
${wf:actionData('getDirInfo')['dir.num-files'] lt 0 ||
(wf:actionData('getDirInfo')['dir.age'] lt 1 and
wf:actionData('getDirInfo')['dir.num-files'] lt 24)}
</case>
<!-- # of files >= 24 -->
<case to="ingest">
${wf:actionData('getDirInfo')['dir.num-files'] gt 23 ||
wf:actionData('getDirInfo')['dir.age'] gt 6}
</case>
<default to="sendEmail"/>
</switch>
</decision>
<!--EMAIL-->
<action name="sendEmail">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>com.navteq.oozie.StandaloneMailer</main-class>
<arg>probedata2@navteq.com</arg>
<arg>gregory.titievsky@navteq.com</arg>
<arg>${inputDir}</arg>
<arg>${wf:actionData('getDirInfo')['dir.num-files']}</arg>
<arg>${wf:actionData('getDirInfo')['dir.age']}</arg>
</java>
<ok to="end" />
<error to="fail" />
</action>
<!--INGESTION -->
<action name="ingest">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${outputDir}" />
</prepare>
<configuration>
<property>
<name>mapred.reduce.tasks</name>
<value>300</value>
</property>
</configuration>
<main-class>com.navteq.probedata.drivers.ProbeIngest</main-class>
<arg>-conf</arg>
<arg>action.xml</arg>
<arg>${inputDir}</arg>
<arg>${outputDir}</arg>
</java>
<ok to=" archive-data" />
<error to="ingest-fail" />
</action>
<!—Archive Data -->
<action name="archive-data">
<fs>
<move source='${inputDir}' target='/probe/backup/${dirName}' />
<delete path = '${inputDir}' />
</fs>
<ok to="end" />
<error to="ingest-fail" />
</action>
<kill name="ingest-fail">
<message>Ingestion failed, error
message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<kill name="fail">
<message>Java failed, error
message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name='end' />
</workflow-app>

代码 1: Directory 子过程

这个子过程的 start 节点会触发自定义的 java 节点,这个节点会获得目录信息(代码 2)。

复制代码
package com.navteq.oozie;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.util.GregorianCalendar;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class GetDirInfo {
private static final String OOZIE_ACTION_OUTPUT_PROPERTIES = "oozie.action.output.properties";
public static void main(String[] args) throws Exception {
String dirPath = args[0];
String propKey0 = "dir.num-files";
String propVal0 = "-1";
String propKey1 = "dir.age";
String propVal1 = "-1";
System.out.println("Directory path: '"+dirPath+"'");
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path hadoopDir = new Path(dirPath);
if (fs.exists(hadoopDir)){
FileStatus[] files = FileSystem.get(conf).listStatus(hadoopDir);
int numFilesInDir = files.length;
propVal0 = Integer.toString(numFilesInDir);
long timePassed, daysPassedLong;
int daysPassed;
String dirName = hadoopDir.getName();
String[] dirNameArray = dirName.split("-");
if (dirNameArray.length == 3) {
int year = Integer.valueOf(dirNameArray[0]);
int month = Integer.valueOf(dirNameArray[1]) - 1; //months are 0 based
int date = Integer.valueOf(dirNameArray[2]);
GregorianCalendar dirCreationDate = new GregorianCalendar(year,
month, date);
timePassed = (new GregorianCalendar()).getTimeInMillis()
- dirCreationDate.getTimeInMillis();
daysPassed = (int) = timePassed / 1000 / 60 / 60 / 24;;
propVal1 = Integer.toString(daysPassed);
}
}
String oozieProp = System.getProperty(OOZIE_ACTION_OUTPUT_PROPERTIES);
if (oozieProp != null) {
File propFile = new File(oozieProp);
Properties props = new Properties();
props.setProperty(propKey0, propVal0);
props.setProperty(propKey1, propVal1);
OutputStream os = new FileOutputStream(propFile);
props.store(os, "");
os.close();
} else
throw new RuntimeException(OOZIE_ACTION_OUTPUT_PROPERTIES
+ " System property not defined");
}
}

代码 2: 获得目录信息的节点

这个类会获得目录名作为输入的参数,并首先检查该目录是否存在。如果目录不存在,那么存在时间(age)和文件数量都会返回 -1,否则,这两个值就会返回给子过程。

子过程的下一步是一个 switch(决定)声明,它会决定如何处理目录。如果目录不存在(文件数 < 0),或者是当前日期(存在时间 < 1)并且文件数量少于 24(文件数 < 24),那么子过程就会直接转换到终止状态。如果所有文件都位于子目录中(文件数 > 23)或者目录是在至少七天前创建的(存在时间 > 6),那么就会有如下操作:

  • 使用现存的 Map/reduce 程序 [2] 获取数据
  • 目录会备份在数据归档中,然后删除

对 action 节点的其它配置

获取动作向你展示了另外一些 Oozie 配置参数,包括: - Prepare——如果出现了 prepare 参数,就意味着在启动作业(job)之前会删除路径列表。这应该专门用于清理目录。删除操作会在 fs.default.name 文件系统中执行。

  • Configuration——如果出现了 configuration 元素,它其中就会包含针对 Map/Reduce 作业的 JobConf 属性。它不仅可以用于 map/reduce 动作, 而且还可以用于启动 map/reduce 作业的 java 动作。

如果不是以上两种情况,那么子过程就会发送剩余的邮件,然后退出。邮件是作为另一个 java 主类实现的(代码 3)。

复制代码
package com.navteq.oozie;
import java.util.Properties;
import javax.mail.Message;
import javax.mail.Session;
import javax.mail.Transport;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;
public class StandaloneMailer {
private static String _mServer = "imailchi.navtech.com";
private static Properties _props = null;
private StandaloneMailer(){}
public static void init(String mServer){
_mServer = mServer;
_props = new Properties();
_props.setProperty("mail.smtp.host", _mServer);
}
public static void SendMail(String subject, String message, String from, String to) throws Exception {
// create some properties and get the default Session
Session session = Session.getDefaultInstance(_props, null);
// create a message
Message msg = new MimeMessage(session);
// set the from and to address
InternetAddress addressFrom = new InternetAddress(from);
msg.setFrom(addressFrom);
String [] recipients = new String[] {to};
InternetAddress[] addressTo = new InternetAddress[recipients.length];
for (int i = 0; i < recipients.length; i++){
addressTo[i] = new InternetAddress(recipients[i]);
}
msg.setRecipients(Message.RecipientType.TO, addressTo);
// Setting the Subject and Content Type
msg.setSubject(subject);
msg.setContent(message, "text/plain");
Transport.send(msg);
}
public static void main (String[] args) throws Exception {
if (args.length ==5){
init(_mServer);
StringBuilder subject = new StringBuilder();
StringBuilder body = new StringBuilder();
subject.append("Directory ").append(args[2]).append(" contains").append(args[3]).append(" files.");
body.append("Directory ").append(args[2]).append(" is ").append(args[4]).
append(" days old and contains only ").append(args[3]).append(" files instead of 24.");
SendMail(subject.toString(), body.toString(), args[0], args[1]);
}
else throw new Exception("Invalid number of parameters provided for email");
}
}

列表 3: 发送提醒邮件

这是使用了 javax.mail API 的简单实现,用于发送邮件。

主过程的实现

我们已经实现了子过程,然后,对主过程的实现就变得非常简单了(列表 4) [3]

复制代码
<workflow-app xmlns='uri:oozie:workflow:0.1' name='processDirsWF'>
<start to='getDirs2Process' />
<!-- STEP ONE -->
<action name='getDirs2Process'>
<!--writes 2 properties: dir.num-files: returns -1 if dir doesn't exist,
otherwise returns # of files in dir dir.age: returns -1 if dir doesn't exist,
otherwise returns age of dir in days -->
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>com.navteq.oozie.GenerateLookupDirs</main-class>
<capture-output />
</java>
<ok to="forkSubWorkflows" />
<error to="fail" />
</action>
<fork name="forkSubWorkflows">
<path start="processDir0"/>
<path start="processDir1"/>
<path start="processDir2"/>
<path start="processDir3"/>
<path start="processDir4"/>
<path start="processDir5"/>
<path start="processDir6"/>
<path start="processDir7"/>
</fork>
<action name="processDir0">
<sub-workflow>
<app-path>hdfs://sachicn001:8020/user/gtitievs/workflows/ingest</app-path>
<configuration>
<property>
<name>inputDir</name>
<value>hdfs://sachicn001:8020/user/data/probedev/files/${wf:actionData('getDirs2Process')['dir0']}</value>
</property>
<property>
<name>outputDir</name>
<value>hdfs://sachicn001:8020/user/gtitievs/probe-output/${wf:actionData('getDirs2Process')['dir0']}</value>
</property>
<property>
<name>jobTracker</name>
<value>${jobTracker}</value>
</property>
<property>
<name>nameNode</name>
<value>${nameNode}</value>
</property>
<property>
<name>activeDir</name>
<value>hdfs://sachicn001:8020/user/gtitievs/test-activeDir</value>
</property>
<property>
<name>dirName</name>
<value>${wf:actionData('getDirs2Process')['dir0']}</value>
</property>
</configuration>
</sub-workflow>
<ok to="joining"/>
<error to="fail"/>
</action>
….
<action name="processDir7">
<sub-workflow>
<app-path>hdfs://sachicn001:8020/user/gtitievs/workflows/ingest</app-path>
<configuration>
<property>
<name>inputDir</name>
<value>hdfs://sachicn001:8020/user/data/probedev/files/${wf:actionData('getDirs2Process')['dir7']}</value>
</property>
<property>
<name>outputDir</name>
<value>hdfs://sachicn001:8020/user/gtitievs/probe-output/${wf:actionData('getDirs2Process')['dir7']}</value>
</property>
<property>
<name>dirName</name>
<value>${wf:actionData('getDirs2Process')['dir7']}</value>
</property>
</configuration>
</sub-workflow>
<ok to="joining"/>
<error to="fail"/>
</action>
<join name="joining" to="end"/>
<kill name="fail">
<message>Java failed, error
message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name='end' />
</workflow-app>

代码 4: 数据获取主过程

这个过程首先会触发 java 节点,计算需要处理的目录列表(列表 5),然后对每个目录执行子过程,从而处理给定的目录。

复制代码
package com.navteq.oozie;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.Properties;
public class GenerateLookupDirs {
public static final long dayMillis = 1000 * 60 * 60 * 24;
private static final String OOZIE_ACTION_OUTPUT_PROPERTIES = "oozie.action.output.properties";
public static void main(String[] args) throws Exception {
Calendar curDate = new GregorianCalendar();
int year, month, date;
String propKey, propVal;
String oozieProp = System.getProperty(OOZIE_ACTION_OUTPUT_PROPERTIES);
if (oozieProp != null) {
File propFile = new File(oozieProp);
Properties props = new Properties();
for (int i = 0; i<8; ++i)
{
year = curDate.get(Calendar.YEAR);
month = curDate.get(Calendar.MONTH) + 1;
date = curDate.get(Calendar.DATE);
propKey = "dir"+i;
propVal = year + "-" +
(month < 10 ? "0" + month : month) + "-" +
(date < 10 ? "0" + date : date);
props.setProperty(propKey, propVal);
curDate.setTimeInMillis(curDate.getTimeInMillis() - dayMillis);
}
OutputStream os = new FileOutputStream(propFile);
props.store(os, "");
os.close();
} else
throw new RuntimeException(OOZIE_ACTION_OUTPUT_PROPERTIES
+ " System property not defined");
}
}

代码 5: 目录计算程序

结论

在这篇文章中,我们向你展示了一个更复杂的完整的工作流示例,它让我们可以演示更多的 Oozie 特性以及对它们的应用。在下一篇文章中,我们会讨论构建可重用的 Oozie 组件库,并使用自定义的节点扩展 Oozie。

致谢

非常感谢我们在 Navteq 的同事 Gregory Titievsky,他为我们实现了大部分代码。

关于作者

Boris Lublinsky是 NAVTEQ 公司的首席架构师,在这家公司中他的工作是为大型数据管理和处理、SOA 以及实现各种 NAVTEQ 的项目定义架构的愿景。他还是 InfoQ 的 SOA 编辑,以及 OASIS 的 SOA RA 工作组的参与者。Boris 是一位作者,还经常发表演讲,他最新的一本书是《Applied SOA》。

Michael Segel在过去二十多年间一直与客户写作,识别并解决他们的业务问题。Michael 已经作为多种角色、在多个行业中工作过。他是一位独立顾问,总是期望能够解决所有有挑战的问题。Michael 拥有俄亥俄州立大学的软件工程学位。

参考信息

1. Boris Lublinsky Mike Segel 《Oozie 简介》


[1] 目录的名称是搜集这条数据的日期。

[2] 这是已经存在的程序,对它的描述与本文无关。

[3] 在此省略了一些重复代码。

查看英文原文: Oozie by Example


给 InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家加入到 InfoQ 中文站用户讨论组中与我们的编辑和其他读者朋友交流。

2011-08-26 00:0030783
用户头像

发布了 340 篇内容, 共 132.9 次阅读, 收获喜欢 13 次。

关注

评论

发布
暂无评论
发现更多内容

梦幻西游H5游戏超详细图文架设教程

echeverra

H5游戏

百分点大数据技术团队:低代码平台实践

百分点科技技术团队

【优化技术专题】「线程间的高性能消息框架」终极关注Disruptor的核心源码和Java8的@Contended伪共享指南

洛神灬殇

Disruptor 异步执行 高性能框架 10月月更

12种 vo2dto 方法,就 BeanUtils.copyProperties 压测最拉胯!【快双11了,别用错喽】

小傅哥

Java 小傅哥 MapStruct vo2dto JMapper

专场预告 | DTCC数据库技术大会云溪数据库专场

云计算

「 活动 」连续 3 天,企业容器应用实战营上海站来啦!

阿里巴巴云原生

阿里云 Kubernetes 容器 云原生 活动

GitHub标星过万!阿里内部流传的JDK源码剖析手册到底有多强?

程序员 jdk 面试 java

第 14 章 -《Linux 一学就会》- RAID磁盘阵列的原理与搭建

学神来啦

Linux linux运维 linux云计算

一加9 Pro怎么样?值得入手的全能旗舰

Geek_8a195c

SSH是什么?怎么组成?有哪些优势?

行云管家

SSH 服务器 SSH工具 服务器管理协议

浙江金华市正规等保测评机构有几家?在哪里?联系电话是多少?

行云管家

网络安全 等保 等保测评

再见收费的Navicat!操作所有数据库靠它就够了!

Java 数据库 架构 开源项目

成本直降50% | 阿里云发布云原生网关,开启下一代网关新进程

阿里巴巴中间件

阿里云 微服务 云原生 中间件 网关

校友录小程序开发笔记三十二:校友卡模块设计与实现

CC同学

仅需三天,受人追捧的华为内部Java优化笔记登顶Github热搜!

Java 架构 面试 程序人生 编程语言

2021年9月国产数据库大事记

墨天轮

数据库 华为云 国产数据库 达梦 人大金仓

每一个用到canvas的小伙伴都应该了解的fabric.js

荣顶

JavaScript 大前端 canvas 图形处理 画布

想提高运维效率,那就把MySQL数据库部署到Kubernetes 集群中

华为云开发者联盟

MySQL 运维 测试 MySQL数据库 Kubernetes 集群

为绿色而生:智慧矿山可视化数治监管

一只数据鲸鱼

数据可视化 智慧矿山 煤矿 矿山

设计电商秒杀系统

木云先森

架构训练营

Vue进阶(幺叁幺):父子组件传值实现数据深拷贝

No Silver Bullet

Vue 深拷贝 10月月更

设计模式如何提升 vivo 营销自动化业务扩展性 | 引擎篇01

vivo互联网技术

自动化 后端 设计模式 软件架构设计 java

“828页Java面试手册”在我手,何愁offer不到手!

Java 程序员 架构 面试 后端

云计算改变企业的传统思考方式

低代码小观

云计算 创新 企业 企业管理 传统观念

直播回顾|蒋烁淼:《下一代企业级应用软件论坛》可观测性主题精彩分享

观测云

可观测性

DCI架构是如何解决DDD战术建模缺点的?

华为云开发者联盟

领域驱动设计 对象 建模 对象编程 DCI架构

教你用Java7的Fork/Join框架开发高并发程序

华为云开发者联盟

Java 算法 线程 高并发 Fork/Join框架

低代码BPM平台

低代码小观

低代码 企业 企业管理 BPM 低代码平台

授人以渔,华为“开源雨林”计划致力推动开源生态发展

机器翻译是否能替代人工翻译?从前世今生说起

博文视点Broadview

双减来了!人工智能如何促进教育领域转型?

京东科技开发者

人工智能 大数据 AI 教育行业

跟着示例学Oozie_Java_Boris Lublinsky_InfoQ精选文章