写点什么

Hadoop 基本流程与应用开发

分布式计算开源框架 Hadoop 入门实践(三)

2008 年 8 月 13 日

── 分布式计算开源框架 Hadoop 入门实践(三)

Hadoop 基本流程

一个图片太大了,只好分割成为两部分。根据流程图来说一下具体一个任务执行的情况。

  1. 在分布式环境中客户端创建任务并提交。

  2. InputFormat 做 Map 前的预处理,主要负责以下工作:

  3. 验证输入的格式是否符合 JobConfig 的输入定义,这个在实现 Map 和构建 Conf 的时候就会知道,不定义可以是 Writable 的任意子类。

  4. 将 input 的文件切分为逻辑上的输入 InputSplit,其实这就是在上面提到的在分布式文件系统中 blocksize 是有大小限制的,因此大文件会被划分为多个 block。

  5. 通过 RecordReader 来再次处理 inputsplit 为一组 records,输出给 Map。(inputsplit 只是逻辑切分的第一步,但是如何根据文件中的信息来切分还需要 RecordReader 来实现,例如最简单的默认方式就是回车换行的切分)

  6. RecordReader 处理后的结果作为 Map 的输入,Map 执行定义的 Map 逻辑,输出处理后的 key 和 value 对应到临时中间文件。

  7. Combiner 可选择配置,主要作用是在每一个 Map 执行完分析以后,在本地优先作 Reduce 的工作,减少在 Reduce 过程中的数据传输量。

  8. Partitioner 可选择配置,主要作用是在多个 Reduce 的情况下,指定 Map 的结果由某一个 Reduce 处理,每一个 Reduce 都会有单独的输出文件。(后面的代码实例中有介绍使用场景)

  9. Reduce 执行具体的业务逻辑,并且将处理结果输出给 OutputFormat。

  10. OutputFormat 的职责是,验证输出目录是否已经存在,同时验证输出结果类型是否如 Config 中配置,最后输出 Reduce 汇总后的结果。

业务场景和代码范例

业务场景描述:可设定输入和输出路径(操作系统的路径非 HDFS 路径),根据访问日志分析某一个应用访问某一个 API 的总次数和总流量,统计后分别输出到两个文件中。这里仅仅为了测试,没有去细分很多类,将所有的类都归并于一个类便于说明问题。


测试代码类图

LogAnalysiser 就是主类,主要负责创建、提交任务,并且输出部分信息。内部的几个子类用途可以参看流程中提到的角色职责。具体地看看几个类和方法的代码片断:

LogAnalysiser::MapClass

public static class MapClass extends MapReduceBase<br></br>        implements Mapper<LongWritable, Text, Text, LongWritable> <br></br>    {<br></br>        public void map(LongWritable key, Text value, OutputCollector<Text, LongWritable> output, Reporter reporter)<br></br>                throws IOException<br></br>        {    <br></br>            String line = value.toString();// 没有配置 RecordReader,所以默认采用 line 的实现,key 就是行号,value 就是行内容 <br></br>            if (line == null || line.equals(""))<br></br>                return;<br></br>            String[] words = line.split(",");<br></br>            if (words == null || words.length < 8)<br></br>                return;<br></br>            String appid = words[1];<br></br>            String apiName = words[2];<br></br>            LongWritable recbytes = new LongWritable(Long.parseLong(words[7]));<br></br>            Text record = new Text();<br></br>            record.set(new StringBuffer("flow::").append(appid)<br></br>                            .append("::").append(apiName).toString());<br></br>            reporter.progress();<br></br>            output.collect(record, recbytes);// 输出流量的统计结果,通过 flow:: 作为前缀来标示。<br></br>            record.clear();<br></br>            record.set(new StringBuffer("count::").append(appid).append("::").append(apiName).toString());<br></br>            output.collect(record, new LongWritable(1));// 输出次数的统计结果,通过 count:: 作为前缀来标示 <br></br>        }    <br></br>    }LogAnalysiser:: PartitionerClass

public static class PartitionerClass implements Partitioner<Text, LongWritable><br></br>    {<br></br>        public int getPartition(Text key, LongWritable value, int numPartitions)<br></br>        {<br></br>            if (numPartitions >= 2)//Reduce 个数,判断流量还是次数的统计分配到不同的 Reduce<br></br>                if (key.toString().startsWith("flow::"))<br></br>                    return 0;<br></br>                else<br></br>                    return 1;<br></br>            else<br></br>                return 0;<br></br>        }<br></br>        public void configure(JobConf job){}    <br></br>}LogAnalysiser:: CombinerClass

参看 ReduceClass,通常两者可以使用一个,不过这里有些不同的处理就分成了两个。在 ReduceClass 中蓝色的行表示在 CombinerClass 中不存在。

LogAnalysiser:: ReduceClass

public static class ReduceClass extends MapReduceBase<br></br>        implements Reducer<Text, LongWritable,Text, LongWritable> <br></br>    {<br></br>        public void reduce(Text key, Iterator<LongWritable> values,<br></br>                OutputCollector<Text, LongWritable> output, Reporter reporter)throws IOException<br></br>        {<br></br>            Text newkey = new Text();<br></br>            newkey.set(key.toString().substring(key.toString().indexOf("::")+2));<br></br>            LongWritable result = new LongWritable();<br></br>            long tmp = 0;<br></br>            int counter = 0;<br></br>            while(values.hasNext())// 累加同一个 key 的统计结果 <br></br>            {<br></br>                tmp = tmp + values.next().get();<br></br>                <br></br>                counter = counter +1;// 担心处理太久,JobTracker 长时间没有收到报告会认为 TaskTracker 已经失效,因此定时报告一下 <br></br>                if (counter == 1000)<br></br>                {<br></br>                    counter = 0;<br></br>                    reporter.progress();<br></br>                }<br></br>            }<br></br>            result.set(tmp);<br></br>            output.collect(newkey, result);// 输出最后的汇总结果 <br></br>        }    <br></br>    }LogAnalysiser

public static void main(String[] args)<br></br> {<br></br> try<br></br> {<br></br> run(args);<br></br> } catch (Exception e)<br></br> {<br></br> e.printStackTrace();<br></br> }<br></br> }<br></br> public static void run(String[] args) throws Exception<br></br> {<br></br> if (args == null || args.length <2)<br></br> {<br></br> System.out.println("need inputpath and outputpath");<br></br> return;<br></br> }<br></br> String inputpath = args[0];<br></br> String outputpath = args[1];<br></br> String shortin = args[0];<br></br> String shortout = args[1];<br></br> if (shortin.indexOf(File.separator) >= 0)<br></br> shortin = shortin.substring(shortin.lastIndexOf(File.separator));<br></br> if (shortout.indexOf(File.separator) >= 0)<br></br> shortout = shortout.substring(shortout.lastIndexOf(File.separator));<br></br> SimpleDateFormat formater = new SimpleDateFormat("yyyy.MM.dd");<br></br> shortout = new StringBuffer(shortout).append("-")<br></br> .append(formater.format(new Date())).toString();<p> if (!shortin.startsWith("/"))</p><br></br> shortin = "/" + shortin;<br></br> if (!shortout.startsWith("/"))<br></br> shortout = "/" + shortout;<br></br> shortin = "/user/root" + shortin;<br></br> shortout = "/user/root" + shortout; <br></br> File inputdir = new File(inputpath);<br></br> File outputdir = new File(outputpath);<br></br> if (!inputdir.exists() || !inputdir.isDirectory())<br></br> {<br></br> System.out.println("inputpath not exist or isn't dir!");<br></br> return;<br></br> }<br></br> if (!outputdir.exists())<br></br> {<br></br> new File(outputpath).mkdirs();<br></br> }<p> JobConf conf = new JobConf(new Configuration(),LogAnalysiser.class);// 构建 Config</p><br></br> FileSystem fileSys = FileSystem.get(conf);<br></br> fileSys.copyFromLocalFile(new Path(inputpath), new Path(shortin));// 将本地文件系统的文件拷贝到 HDFS 中 <p> conf.setJobName("analysisjob");</p><br></br> conf.setOutputKeyClass(Text.class);// 输出的 key 类型,在 OutputFormat 会检查 <br></br> conf.setOutputValueClass(LongWritable.class); // 输出的 value 类型,在 OutputFormat 会检查 <br></br> conf.setMapperClass(MapClass.class);<br></br> conf.setCombinerClass(CombinerClass.class);<br></br> conf.setReducerClass(ReduceClass.class);<br></br> conf.setPartitionerClass(PartitionerClass.class);<br></br> conf.set("mapred.reduce.tasks", "2");// 强制需要有两个 Reduce 来分别处理流量和次数的统计 <br></br> FileInputFormat.setInputPaths(conf, shortin);//hdfs 中的输入路径 <br></br> FileOutputFormat.setOutputPath(conf, new Path(shortout));//hdfs 中输出路径 <p> Date startTime = new Date();</p><br></br> System.out.println("Job started: " + startTime);<br></br> JobClient.runJob(conf);<br></br> Date end_time = new Date();<br></br> System.out.println("Job ended: " + end_time);<br></br> System.out.println("The job took " + (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");<br></br> // 删除输入和输出的临时文件 <br></br> fileSys.copyToLocalFile(new Path(shortout),new Path(outputpath));<br></br> fileSys.delete(new Path(shortin),true);<br></br> fileSys.delete(new Path(shortout),true);<br></br> }以上的代码就完成了所有的逻辑性代码,然后还需要一个注册驱动类来注册业务 Class 为一个可标示的命令,让 hadoop jar 可以执行。

public class ExampleDriver {<br></br>  public static void main(String argv[]){<br></br>    ProgramDriver pgd = new ProgramDriver();<br></br>    try {<br></br>      pgd.addClass("analysislog", LogAnalysiser.class, "A map/reduce program that analysis log .");<br></br>      pgd.driver(argv);<br></br>    }<br></br>    catch(Throwable e){<br></br>      e.printStackTrace();<br></br>    }<br></br>  }<br></br>}将代码打成 jar,并且设置 jar 的 mainClass 为 ExampleDriver 这个类。在分布式环境启动以后执行如下语句:

hadoop jar analysiser.jar analysislog /home/wenchu/test-in /home/wenchu/test-out在 /home/wenchu/test-in 中是需要分析的日志文件,执行后就会看见整个执行过程,包括了 Map 和 Reduce 的进度。执行完毕会在 /home/wenchu/test-out 下看到输出的内容。有两个文件:part-00000 和 part-00001 分别记录了统计后的结果。 如果需要看执行的具体情况,可以看在输出目录下的 _logs/history/xxxx_analysisjob,里面罗列了所有的 Map,Reduce 的创建情况以及执行情况。在运行期也可以通过浏览器来查看 Map,Reduce 的情况: http://MasterIP:50030/jobtracker.jsp

Hadoop 集群测试

首先这里使用上面的范例作为测试,也没有做太多的优化配置,这个测试结果只是为了看看集群的效果,以及一些参数配置的影响。

文件复制数为 1,blocksize 5M

Slave 数 处理记录数 (万条) 执行时间(秒) 2 95 38 2 950 337 4 95 24 4 950 178 6 95 21 6 950 114Blocksize 5M

Slave 数 处理记录数 (万条) 执行时间(秒) 2(文件复制数为 1) 950 337 2(文件复制数为 3) 950 339 6(文件复制数为 1) 950 114 6(文件复制数为 3) 950 117文件复制数为 1

Slave 数 处理记录数 (万条) 执行时间(秒) 6(blocksize 5M) 95 21 6(blocksize 77M) 95 26 4(blocksize 5M) 950 178 4(blocksize 50M) 950 54 6(blocksize 5M) 950 114 6(blocksize 50M) 950 44 6(blocksize 77M) 950 74 测试的数据结果很稳定,基本测几次同样条件下都是一样。通过测试结果可以看出以下几点:

  1. 机器数对于性能还是有帮助的(等于没说 ^_^)。
  2. 文件复制数的增加只对安全性有帮助,但是对于性能没有太多帮助。而且现在采取的是将操作系统文件拷贝到 HDFS 中,所以备份多了,准备的时间很长。
  3. blocksize 对于性能影响很大,首先如果将 block 划分的太小,那么将会增加 job 的数量,同时也增加了协作的代价,降低了性能,但是配置的太大也会让 job 不能最大化并行处理。所以这个值的配置需要根据数据处理的量来考虑。
  4. 最后就是除了这个表里面列出来的结果,应该去仔细看输出目录中的 _logs/history 中的 xxx_analysisjob 这个文件,里面记录了全部的执行过程以及读写情况。这个可以更加清楚地了解哪里可能会更加耗时。

随想

“云计算”热的烫手,就和 SAAS、Web2 及 SNS 等一样,往往都是在搞概念,只有真正踏踏实实的大型互联网公司,才会投入人力物力去研究符合自己的分布式计算。其实当你的数据量没有那么大的时候,这种分布式计算也就仅仅只是一个玩具而已,只有在真正解决问题的过程中,它深层次的问题才会被挖掘出来。

这三篇文章(分布式计算开源框架 Hadoop 介绍,Hadoop 中的集群配置和使用技巧)仅仅是为了给对分布式计算有兴趣的朋友抛个砖,要想真的掘到金子,那么就踏踏实实的去用、去想、去分析。或者自己也会更进一步地去研究框架中的实现机制,在解决自己问题的同时,也能够贡献一些什么。

前几日看到有人跪求成为架构师的方式,看了有些可悲,有些可笑,其实有多少架构师知道什么叫做架构?架构师的职责是什么?与其追求这么一个名号,还不如踏踏实实地做块石头沉到水底。要知道,积累和沉淀的过程就是一种成长。

相关阅读:

  1. 分布式计算开源框架 Hadoop 介绍――分布式计算开源框架 Hadoop 入门实践(一)
  2. Hadoop 中的集群配置和使用技巧――分布式计算开源框架 Hadoop 入门实践(二)

作者介绍:岑文初,就职于阿里软件公司研发中心平台一部,任架构师。当前主要工作涉及阿里软件开发平台服务框架(ASF)设计与实现,服务集成平台(SIP)设计与实现。没有什么擅长或者精通,工作到现在唯一提升的就是学习能力和速度。个人 Blog 为: http://blog.csdn.net/cenwenchu79

志愿参与 InfoQ 中文站内容建设,请邮件至 editors@cn.infoq.com 。也欢迎大家到 InfoQ 中文站用户讨论组参与我们的线上讨论。

2008 年 8 月 13 日 01:5442095

评论

发布
暂无评论
发现更多内容

朱嘉明:全面认知区块链的科学特征

CECBC区块链专委会

区块链

初识Golang之安装运行篇

Kylin

golang新手 3月日更 21天挑战

利用区块链技术,打造绿色发展的中药材生态链

CECBC区块链专委会

中药材

Spark提交后都干了些什么?

小舰

大数据 spark Spark调优

备战一个月,四闯字节跳动,终圆大厂梦!(附面试经历+复习笔记)

Crud的程序员

Java 编程 程序员 架构

2021吊打面试官必备!居然真的有人把1000道对标阿里P7面试真题最优解收录成册,全网开源!

Java王路飞

Java 程序员 架构 面试 分布式

打击虚拟货币洗钱:中国破获比特币跨境洗钱案

CECBC区块链专委会

虚拟货币

MySQL自定义变量?学不废不收费

Java王路飞

Java MySQL 程序员 架构 面试

Hystrix 实战经验分享

Java王路飞

Java 程序员 架构 面试 微服务

JVM之调优及常见场景分析

Java王路飞

编程 程序员 面试 JVM 调优

你是否觉得上级的能力不如你?

石云升

心理学 28天写作 职场经验 管理经验 3月日更

10.4|PPT 教程|内容页之表格使用

青城

SpringBoot + Mybatis实现动态数据源切换

互联网架构师小马

实时数据流计算引擎Flink和Spark流计算对比

小舰

大数据 flink spark 流计算

Github上堪称最全的面试题库(Java岗)到底有多香

云流

Java 程序员 架构 面试

我是如何拿到5大银行offer

小舰

面试 银行 笔试 校园招聘

融合发展是区块链的未来 数字通证新模式具有划时代意义

CECBC区块链专委会

数字通证

我从外包辞职了,10000小时后,走进字节跳动拿了offer

云流

Java 编程 程序员 架构 面试

你真的懂Spring解决循环依赖吗?

云流

Java 架构 Spring Boot

阿里P9大牛匠心打造21版Java架构面试大全,跳槽涨薪稳了

神奇小汤圆

Java 编程 程序员 面试

并发编程:一次搞定单例模式

Java架构师迁哥

深入剖析数据库事务的隔离级别

小舰

数据库 事务隔离级别 数据库事务

C/C++Linux服务器开发完整学习路线(含免费学习资料下载地址)

Linux服务器开发

Linux C/C++ 后端开发 Linux服务器开发 Linux后台开发

Java 多线程 : 迟来的 Future

神奇小汤圆

Java 架构 线程

面面俱到!阿里分布式全栈系统设计实录Github仅上线6天星标已经高达68K!

程序员小毕

Java 架构 面试 分布式 高性能

冰河公开了其总结的一项重要的编程技能!

冰河

Java 正则表达式 程序员

使用 Arthas 排查SpringBoot诡异耗时的Bug

互联网架构师小马

大专生阿里/腾讯/京东面经分享:Java面试精选题+架构实战笔记(技术狂补)

比伯

Java 编程 架构 面试 计算机

SpringBoot配置文件数据加密自定义开发详解

程序员小毕

Java spring 程序员 面试 springboot

盘点一下数据库的误操作有哪些后悔药?

程序员小毕

Java MySQL 编程 程序员 面试

linux下七种文件类型

xiezhr

Linux linux操作 linux运维 linux 文件权限控制

演讲经验交流会|ArchSummit 上海站

演讲经验交流会|ArchSummit 上海站

Hadoop基本流程与应用开发-InfoQ