速来报名!AICon北京站鸿蒙专场~ 了解详情
写点什么

揭秘 InputFormat:掌控 Map Reduce 任务执行的利器

  • 2012-01-09
  • 本文字数:7553 字

    阅读完需:约 25 分钟

随着越来越多的公司采用 Hadoop,它所处理的问题类型也变得愈发多元化。随着 Hadoop 适用场景数量的不断膨胀,控制好怎样执行以及何处执行 map 任务显得至关重要。实现这种控制的方法之一就是自定义 **InputFormat** 实现。

InputFormat 类是 Hadoop Map Reduce 框架中的基础类之一。该类主要用来定义两件事情:

  • 数据分割 (Data splits)
  • 记录读取器 (Record reader)

数据分割 是 Hadoop Map Reduce 框架中的基础概念之一,它定义了单个 Map 任务的大小及其可能的执行服务器信息。记录读取器 主要负责从输入文件实际读取数据并将它们(以键值对的形式)提交给 mapper。尽管有不少文章介绍过怎样实现自定义的 _ 记录读取器 _(例如,参考文章 [1]),但是关于如何进行分割(split)的介绍却相当粗略。这里我们将会解释什么是分割,并介绍怎样实现自定义分割来完成特定任务。

剖析分割

任何分割操作的实现都继承自 Apache 抽象基类——InputSplit,它定义了分割的长度及位置。分割长度 是指分割数据的大小(以字节为单位),而 _ 分割位置 _ 是分割所在的机器结点名称组成的列表,其中待分割的数据都会于本地存在。分割位置可以方便调度器决定在哪个机器上执行此次分割。简化后的 [1] 作业跟踪器 (job tracker) 工作流程如下:

  • 接受来自某个任务跟踪器 (task tracker) 的心跳通信,得到该位置 map 的可用情况。
  • 为队列等候中的分割任务找到可用的“本地”结点。
  • 向任务跟踪器提交分割请求以待执行。

数据局部性 (Locality) 的相关程度因存储机制和整体的执行策略的不同而不同。例如,在 Hadoop 分布式文件系统 (HDFS) 中,分割通常对应一个物理数据块大小以及该数据块物理定位所在的一系列机器(其中机器总数由复制因子定义)的位置。这就是 **FileInputFormat** 计算分割的过程。

而 HBase 的实现则采用了另外一套方法。在 HBase 中,分割 对应于一系列属于某个表区域 (table region) 的表键 (table keys),而 _ 位置 _ 则为正在运行区域服务器的机器。

计算密集型应用

Map Reduce 应用中有一类特殊的应用叫做计算密集型应用(Compute-Intensive application)。这类应用的特点在于 Mapper.map() 函数执行的时间要远远长于数据访问的时间,且至少要差一个数量级。从技术角度来说,虽然这类应用仍然可以使用“标准”输入格式的实现,但是它会带来数据存放结点过少而集群内剩余结点没能充分利用的问题(见图 1)。

(点击图片进行放大)

图1:数据局部性情况下的结点使用图

图1 中显示了针对计算密集型应用,使用“标准”数据局部性导致的结点使用率上的巨大差异——有些结点(红色标注)被过度使用,而其他结点(黄色和浅绿色标注)则使用不足。由此可见,在针对计算密集型应用时,需要重新思考对“局部性”概念的认识。在这种情况下,“局部性”意味着所有可用结点之间map 任务的均匀分布——即最大化地使用集群机器的计算能力。

使用自定义InputFormat 改变“局部性”

假定源数据以文件序列的形式存在,那么一个简单的** ComputeIntensiveSequenceFileInputFormat** 类(见清单 1)便可以实现将分割生成的结果均匀地分布在集群中的所有服务器上。

复制代码
package com.navteq.fr.mapReduce.InputFormat;
<p>import java.io.IOException;<br></br>import java.util.ArrayList;<br></br>import java.util.Collection;<br></br>import java.util.List;<br></br>import java.util.StringTokenizer;</p><p>import org.apache.hadoop.fs.FileStatus;<br></br>import org.apache.hadoop.fs.Path;<br></br>import org.apache.hadoop.mapred.ClusterStatus;<br></br>import org.apache.hadoop.mapred.JobClient;<br></br>import org.apache.hadoop.mapred.JobConf;<br></br>import org.apache.hadoop.mapreduce.InputSplit;<br></br>import org.apache.hadoop.mapreduce.JobContext;<br></br>import org.apache.hadoop.mapreduce.lib.input.FileSplit;<br></br>import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;</p><p>public class ComputeIntensiveSequenceFileInputFormat<K, V> extends SequenceFileInputFormat<K, V> {</p><p>          private static final double SPLIT_SLOP = 1.1; // 10% slop<br></br>          static final String NUM_INPUT_FILES = "mapreduce.input.num.files";</p><p>          /**<br></br>          * Generate the list of files and make them into FileSplits.<br></br>          */ <br></br>          @Override<br></br>          public List<InputSplit> getSplits(JobContext job) throws IOException {<br></br>           </p><p>          long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));<br></br>          long maxSize = getMaxSplitSize(job);</p><p>          // get servers in the cluster<br></br>          String[] servers = getActiveServersList(job);<br></br>          if(servers == null)<br></br>                     return null;<br></br>          // generate splits<br></br>          List<InputSplit> splits = new ArrayList<InputSplit>();<br></br>          List<FileStatus>files = listStatus(job);<br></br>          int currentServer = 0;<br></br>          for (FileStatus file: files) {<br></br>                    Path path = file.getPath();<br></br>                    long length = file.getLen();<br></br>                    if ((length != 0) && isSplitable(job, path)) { <br></br>                              long blockSize = file.getBlockSize();<br></br>                              long splitSize = computeSplitSize(blockSize, minSize, maxSize);</p><p>                              long bytesRemaining = length;<br></br>                              while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {<br></br>                                        splits.add(new FileSplit(path, length-bytesRemaining, splitSize, <br></br>                                                                  new String[] {servers[currentServer]}));<br></br>                              currentServer = getNextServer(currentServer, servers.length);<br></br>                              bytesRemaining -= splitSize;<br></br>                    }</p><p>                    if (bytesRemaining != 0) {<br></br>                              splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, <br></br>                                                new String[] {servers[currentServer]}));<br></br>                              currentServer = getNextServer(currentServer, servers.length);<br></br>                    }<br></br>          } else if (length != 0) {<br></br>                    splits.add(new FileSplit(path, 0, length, <br></br>                                     new String[] {servers[currentServer]}));<br></br>                    currentServer = getNextServer(currentServer, servers.length);<br></br>          } else { <br></br>                    //Create empty hosts array for zero length files<br></br>                    splits.add(new FileSplit(path, 0, length, new String[0]));<br></br>          }<br></br>      }</p><p>      // Save the number of input files in the job-conf<br></br>      job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());</p><p>      return splits;<br></br>   }</p><p>   private String[] getActiveServersList(JobContext context){</p><p>      String [] servers = null;<br></br>       try {<br></br>                JobClient jc = new JobClient((JobConf)context.getConfiguration()); <br></br>                ClusterStatus status = jc.getClusterStatus(true);<br></br>                Collection<String> atc = status.getActiveTrackerNames();<br></br>                servers = new String[atc.size()];<br></br>                int s = 0;<br></br>                for(String serverInfo : atc){<br></br>                         StringTokenizer st = new StringTokenizer(serverInfo, ":");<br></br>                         String trackerName = st.nextToken();<br></br>                         StringTokenizer st1 = new StringTokenizer(trackerName, "_");<br></br>                         st1.nextToken();<br></br>                         servers[s++] = st1.nextToken();<br></br>                }<br></br>      }catch (IOException e) {<br></br>                e.printStackTrace();<br></br>      }</p><p>      return servers;</p><p>  }</p><p>  private static int getNextServer(int current, int max){</p><p>      current++;<br></br>      if(current >= max)<br></br>                current = 0;<br></br>      return current;<br></br>  }<br></br>}</p>

清单 1:ComputeIntensiveSequenceFileInputFormat 类

该类继承自 SequenceFileInputFormat 并重写了 **getSplits()方法,虽然计算分割的过程与FileInputFormat** 完全一样,但是它为分割指定了“局部性”,以便从集群中找出可用的服务器。getSplits() 利用到了以下两个方法:

  • getActiveServersList() 方法,返回集群中当前可用的服务器(名称)组成的数组。
  • getNextServer() 方法,返回服务器数组中下一个服务器索引,且当服务器数组中元素全部用尽时,返回数组头部重新开始。

虽然上述实现(见清单 1)将 map 任务的执行均匀地分布在了集群中的所有服务器上,但是它却完全忽略了数据的实际位置。稍微好点的 **getSplits** 方法实现(见清单 2)可以试图将两种策略结合在一起:既尽量多地放置针对数据的本地作业,且保持剩余作业在集群上的良好平衡。 [2]

<span color="#000000"><b>public</b> List<<span><span color="#000000">InputSplit</span></span>> getSplits(JobContext job) <b>throws</b> IOException { </span><p><span color="#000000">         // get splits<br></br>        List<<span>InputSplit</span>> originalSplits = <b>super</b>.getSplits(job);</span></p><p><span color="#000000">         // Get active servers<br></br>        String[] servers = getActiveServersList(job);<br></br>        <b>if</b>(servers == <b>null</b>)<br></br>                <b>return</b> <b>null</b>;<br></br>        // reassign splits to active servers<br></br>        List<<span>InputSplit</span>> splits = <b>new</b> ArrayList<<span>InputSplit</span>>(originalSplits.size());<br></br>        <b>int</b> numSplits = originalSplits.size();<br></br>        <b>int</b> currentServer = 0;<br></br>        <b>for</b>(<b>int</b> i = 0; i < numSplits; i++, currentServer = i>getNextServer(currentServer, <br></br>                                                                           servers.length)){<br></br>                String server = servers[currentServer]; // Current server<br></br>                <b>boolean</b> replaced = <b>false</b>;<br></br>                // For every remaining split<br></br>                <b>for</b>(<span>InputSplit</span> split : originalSplits){<br></br>                        FileSplit fs = (FileSplit)split;<br></br>                        // For every split location<br></br>                        <b>for</b>(String l : fs.getLocations()){<br></br>                                 // If this split is local to the server<br></br>                                 <b>if</b>(l.equals(server)){<br></br>                                           // Fix split location<br></br>                                           splits.add(<b>new</b> FileSplit(fs.getPath(), fs.getStart(),<br></br>                                                            fs.getLength(), <b>new</b> String[] {server}));<br></br>                                           originalSplits.remove(split);<br></br>                                           replaced = <b>true</b>;<br></br>                                           <b>break</b>;<br></br>                                 }<br></br>                        }<br></br>                        <b>if</b>(replaced)<br></br>                                 <b>break</b>;<br></br>                }<br></br>                // If no local splits are found for this server<br></br>                <b>if</b>(!replaced){<br></br>                        // Assign first available split to it <br></br>                        FileSplit fs = (FileSplit)splits.get(0);<br></br>                        splits.add(<b>new</b> FileSplit(fs.getPath(), fs.getStart(), fs.getLength(), <br></br>                                                                             <b>new</b> String[] {server}));<br></br>                        originalSplits.remove(0);<br></br>                }<br></br>        }<br></br>        <b>return</b> splits;<br></br>}</span></p>清单 2:优化过的 getSplits 方法

在此实现中,我们首先使用父类 (FileInputSplit) 来得到包含位置计算在内的分割以确保数据局部性。然后我们计算出可用的服务器列表,并为每一个存在的服务器尝试分配与其同处本地的分割。

延迟公平调度

虽然清单 1 和清单 2 中的代码都正确地计算出了分割位置,但当我们试图在 Hadoop 集群上运行代码时,就会发现结果与服务器之间产生均匀分布相去甚远。参考文章 [2] 中很好的描述了我们观察到的这个问题,并为该问题描述了一种解决方案——即延迟公平调度。

假设已经设置好了公平调度程序,那么下面的程序段应当加入到 **mapred-site.xml** 文件中以激活某个延迟调度程序 [3] :

复制代码
<property>
        <name>mapred.fairscheduler.locality.delay</name>
        <value>360000000</value>
<property>

适当借助延迟公平调度程序,作业执行将可以利用整个集群(见图 2)。此外,根据我们的实验,这种情况下的执行时间相比“数据局部性”的做法要节省约 30%。

(点击图片进行放大)

图2:执行局部性情况下的结点使用图

其他注意事项

用于测试的计算作业共使用了96 个split 和mapper 任务。测试集群拥有19 个数据结点,其中每个数据结点拥有8 个mapper 槽,因此该集群共有152 个可用槽。当该作业运行时,它并没有充分利用集群中的所有槽。

Ganglia 的两份截图都展示了我们所使用的测试集群,其中前三个结点为控制结点,而第四个结点为边缘结点,主要用来启动作业。图中展示了中央处理器 / 机器的负载情况。在图 1 中,有一些结点被过度使用(红色显示),而集群中的其他结点则未得到充分利用。在图 2 中,虽然我们得到了更加平衡的分布,然而集群仍然未被充分利用。用于测试的作业也可以运行多线程,这么做会增加中央处理器的负载,但同时也会降低在每次 **Mapper.map()** 迭代上的整体时间花费。正如图 3 所示,通过增加线程数量,我们可以更好地利用集群资源,并进一步减少完成作业所花费的时间。通过改变作业区域性,我们可以在不牺牲性能的情况下更好地利用群集处理远程作业数据。

(点击图片进行放大)

图3:使用多线程Map 作业的执行区域性情况下的结点使用图

即使机器中央处理器处于高负荷状态,它仍然可以允许其他磁盘I/O 密集型作业运行在开放槽中,要注意的是,这么做会带来些许的性能下降。

自定义分割

本文中提到的方法对大文件非常适用,但是对于小文件而言,并没有足够的分割来让其使用集群中的多台机器。一种可行的方法是使用更小的分割块,但是这么做会给集群命名结点带来更多的负担(内存需求方面)。一种更好的做法是修改清单1 中的代码,以使用自定义的块大小(而不是文件块大小)。这种方法可以计算出所需的分割块数量,而不用理会实际的文件大小。

总结

在这篇文章中,我们已经展示了如何利用自定义的** InputFormat**s 来更紧密地控制 Map Reduce 中的 map 任务在可用服务器间的分布。这种控制对于一类特殊应用——计算密集型应用非常重要,控制过程将 Hadoop Map Reduce 做为通用的并行执行框架使用。

关于作者

Boris Lublinsky是 NAVTEQ 公司的首席架构师,在这家公司中他的工作是为大型数据管理、处理以及 SOA 定义架构愿景,并且实施各种 NAVTEQ 的项目。他还是 InfoQ 的 SOA 编辑,OASIS 的 SOA RA 工作组的参与者。Boris 是一位作者并经常发表演讲,他最新的一本书叫做《Applied SOA》。

Michael Segel在过去二十多年里一直不断与客户合作,帮助他们发现并解决业务上的问题。Michael 做过许多不同类型的工作,也在不同的行业圈摸打滚爬过。他是一位独立顾问,并且总是期望能够解决所有具有挑战性的问题。Michael 拥有俄亥俄州立大学的软件工程学位。

参考

1. Boris Lublinsky, Mike Segel. Custom XML Records Reader.

2. Matei Zaharia, Dhruba Borthakur, Joydeep Sen Sarma, Khaled Elmeleegy, Scott Shenker, Ion Stoica. Delay Scheduling: A Simple Technique for Achieving Locality and Fairness in Cluster Scheduling.


[1] 这是一个简化后的解释。真实的调度算法要复杂得多;考虑更多的参数而不仅仅是分割的位置。

[2] 虽然我们将其列作一个选项,但是如果花费在 Mapper.map() 方法上的时间高与远程访问数据时间的一个或多个数量级时,清单 1 中的代码将不会有任何性能上的提升。但尽管如此,它也许会带来网络利用率上的略微提高。

[3] 请注意这里的延迟以毫秒为单位,在改变该值之后需要重新启动作业跟踪器。

查看英文原文: Uncovering mysteries of InputFormat: Providing better control for your Map Reduce execution.

2012-01-09 13:004240
用户头像

发布了 125 篇内容, 共 37.6 次阅读, 收获喜欢 5 次。

关注

评论

发布
暂无评论
发现更多内容

工作中养成的工作习惯与给老板的汇报

松子(李博源)

大数据 个人成长 高效 高效率 工作总结

一体化实时HTAP数据库StoneDB,如何替换MySQL并实现近百倍分析性能的提升

StoneDB

云原生 #数据库 HTAP 大数据 开源 #开源

GQM 概述:构建研发效能度量体系的根本方法

思码逸研发效能

研发效能 创新方法 效能度量

小白 0-1 学习 app 开发,从配置到 helloword

YonBuilder低代码开发平台

跨平台 安卓 低代码开发 多端开发

构建工业软件开源工具链,2022 开放原子全球开源峰会开源工业软件论坛即将开幕

kk-OSC

开源 开放原子全球开源峰会 开源工业软件

音视频通话前的网络及设备检测该如何操作?

ZEGO即构

音视频开发 通话检测

新书上市 | 20年行业实践,一线工程师的必读之作

图灵教育

软件设计

【7.1-7.8】写作社区精彩技术博文回顾

InfoQ写作社区官方

优质创作周报

东方甄选品控翻车,如何通过智能协同的供应链建设建开启可持续商业模式?

数商云

数字化转型 供应链 企业数字化

关于TCP与UDP你应该知道的

是乃德也是Ned

7月月更

云脉芯联加入龙蜥社区,共建网络“芯”生态

OpenAnolis小助手

开源 芯片 龙蜥社区 CLA 云脉芯联

帮助文档——助客户快速了解您的产品如何使用

Baklib

细数下,FinClip 6月都干了啥

FinClip

ACM MM 2022 | 腾讯优图11篇论文入选,含盲超分辨率算法等研究方向

科技热闻

工程师世界的《原则》,Quora创始人豆瓣9.2分神作!

博文视点Broadview

观测云产品更新|新增查看器显示列多种快捷操作;新增 Pipeline 一键获取样本测试数据;新增场景自定义查看器文本分析模式等

观测云

共建开源人才生态,2022 开放原子全球开源峰会聚焦 “产学研用”

kk-OSC

开源 数字化 产学研用 开放原子全球开源峰会

SpEL快速上手及实践

转转技术团队

Java spring 后端

走进天太|加速智能生产力落地 让机器人随处可见

科技之家

一文搞懂Python上下文管理器

曲鸟

Python 7月月更 上下文管理器

搭建帮助中心,推动SaaS企业发展

Baklib

SaaS 客户服务 帮助中心 文档管理

内部排序——交换排序

乔乔

7月月更

升哲科技入选《中国企业家》2022年度“新锐100”企业

SENSORO

Flink 引擎在快手的深度优化与生产实践

Apache Flink

大数据 flink 编程 流计算 实时计算

公有云计费套路多?这里有一份破招详解

焱融科技

【计算讲谈社】第五讲|不止能上路,更能做好服务:自动驾驶产品规模化的问题定义

大咖说

人工智能 自动驾驶 阿里云 科技

中文拼写纠错:怎样改善模型对 multi-typo 的纠正效果?

澜舟孟子开源社区

人工智能 自然语言处理 nlp 文本生成 文本纠错

【LeetCode】单词替换Java题解

Albert

LeetCode 7月月更

这么强?!Erda MySQL Migrator:持续集成的数据库版本控制

尔达Erda

数据库 程序员 开发者 云原生 MySQL 运维

2022 开放原子全球开源峰会 OpenAnolis 分论坛携干货来袭!

kk-OSC

centos 开源 龙蜥操作系统 开放原子全球开源峰会 OpenAnolis

wallys/DR8072V01/IPQ8072A networking SBC supports dual 10GbE, WiFi 6

wallys-wifi6

揭秘InputFormat:掌控Map Reduce任务执行的利器_DevOps & 平台工程_Boris Lublinsky_InfoQ精选文章