Hadoop 常常被用作大型数据处理生态系统中的一部分。它的优势在于能够批量地处理大量数据,并将结果以最好的方式与其他系统相集成。从高层次角度来看,整个过程就是 Hadoop 接收输入文件、使用自定义转换(Map-Reduce 步骤)获得内容流,以及将输出文件的结果写回磁盘。上个月 InfoQ 展示了怎样在第一个步骤中,使用 InputFormat 类来更好地对接收输入文件进行控制。而在本文中,我们将同大家一起探讨怎样自定义最后一个步骤——即怎样写入输出文件。OutputFormat 将 Map/Reduce 作业的输出结果转换为其他应用程序可读的方式,从而轻松实现与其他系统的互操作。为了展示 OutputFormts 的实用性,我们将用两个例子进行讨论:如何拆分作业结果到不同目录以及如何为提供快速键值查找的服务写入文件。
OutputFormats 是做什么的?
OutputFormt 接口决定了在哪里以及怎样持久化作业结果。Hadoop 为不同类型的格式提供了一系列的类和接口,实现自定义操作只要继承其中的某个类或接口即可。你可能已经熟悉了默认的 OutputFormat,也就是 TextOutputFormat,它是一种以行分隔,包含制表符界定的键值对的文本文件格式。尽管如此,对多数类型的数据而言,如再常见不过的数字,文本序列化会浪费一些空间,由此带来的结果是运行时间更长且资源消耗更多。为了避免文本文件的弊端,Hadoop 提供了 SequenceFileOutputformat,它将对象表示成二进制形式而不再是文本文件,并将结果进行压缩。下面是 Hadoop 提供的类层次结构:
- FileOutputFormat(实现 OutputFormat 接口)—— 所有 OutputFormats 的基类
- MapFileOutputFormat —— 一种使用部分索引键的格式
- SequenceFileOutputFormat —— 二进制键值数据的压缩格式
- SequenceFileAsBinaryOutputFormat —— 原生二进制数据的压缩格式
- TextOutputFormat —— 以行分隔、包含制表符定界的键值对的文本文件格式
- MultipleOutputFormat —— 使用键值对参数写入文件的抽象类
- MultipleTextOutputFormat —— 输出多个以标准行分割、制表符定界格式的文件
- MultipleSequenceFileOutputFormat —— 输出多个压缩格式的文件
OutputFormat 提供了对 RecordWriter 的实现,从而指定如何序列化数据。 RecordWriter 类可以处理包含单个键值对的作业,并将结果写入到 OutputFormat 中准备好的位置。RecordWriter 的实现主要包括两个函数:“write”和“close”。“write”函数从 Map/Reduce 作业中取出键值对,并将其字节写入磁盘。LineRecordWriter 是默认使用的 RecordWriter,它是前面提到的 TextOutputFormat 的一部分。它写入的内容包括:
- 键 (key) 的字节 (由 getBytes() 函数返回)
- 一个用以定界的制表符
- 值 (value) 的字节(同样由 getBytes() 函数返回)
- 一个换行符
“close”函数会关闭 Hadoop 到输出文件的数据流。
我们已经讨论了输出数据的格式,下面我们关心的问题是数据存储在何处?同样,你或许看到过某个作业的输出结果会以多个“部分”文件的方式存储在输出目录中,如下:
|-- output-directory | |-- part-00000 | |-- part-00001 | |-- part-00002 | |-- part-00003 | |-- part-00004 '-- part-00005 {1}
默认情况下,当需要写入数据时,每个进程都会在输出目录创建自己的文件。数据由 reducers 在作业结束时写入(如果没有 reducers 会由 mapper 写入)。即使在本文后面提到的创建自定义输出目录时,我们仍会保持写入“部分”文件,这么做可以让多个进程同时写入同一个目录而互不干扰。
自定义 OutputFormat
从前面我们已经看到,OutputFormat 类的主要职责是决定数据的存储位置以及写入的方式。那么为什么要自定义这些行为呢?自定义数据位置的原因之一是为了将 Map/Reduce 作业输出分离到不同的目录。例如,假设需要处理一个包含世界范围内的搜索请求的日志文件,并希望计算出每个国家的搜索频度。你想要在不牵涉其他国家的前提下能够查看某个特定国家的结果。也许以后在你的数据管道中,会用不同的进程来处理不同的国家,或者想要把某个特定国家的结果复制一份到该国的数据中心去。使用默认的 OutputFormat 时,所有的数据都会存储在同一目录下,这样在不浏览的情况下是无从知晓“部分”文件的内容的。而通过使用自定义的 OutputFormat,你可以为每个国家创建一个子目录的布局,如下:
|-- output-directory | |-- France | | |-- part-00000 | | |-- part-00001 | | '-- part-00002 ... | | '-- Zimbabwe | |-- part-00000 | |-- part-00001 | '-- part-00002
其中每个部分文件都具有键值对(“搜索词汇”=> 频度)。现在只要简单地指定某个国家数据所在的路径,就可以只读取该国家的数据了。下面我们将看到怎样继承 MultipleTextOutputFormat 类,以获得所需的行为。
自定义 OutputFormat 还有一些其他的原因,以名为 ElephantDB 的项目为例, 它将数据以一种面向消费应用程序的“本地”形式进行存储。这个项目的设立是为了让 Map/Reduece 作业结果可以像分布式服务一样被查询。ElephantDB 写入的并不是文本文件,而是使用自定义的 OutputFormat 将结果写成 BerkeleyDB 文件,其中这些文件使用作业输出的键进行索引。之后使用某个服务加载 BerkeleyDB 文件,可以提供低延滞的任意键查找。类似的系统还有 HBase 和 Voldemort,它们可以存储 Hadoop 生成的键值数据。ElephantDB 重点关注的是怎样与 Hadoop 批量式更新进行简易紧密的集成。
多路输出
为了解决上面的搜索日志的问题,我们继承了 MultipleTextOutputFormat 类,并根据被写入的键值来选择输出目录。我们的 Map/Reduce 作业将会为搜索请求所在国家生成一个键,并为搜索词汇及该搜索的频度产生一个值。由于 MultipleTextOutputFormat 已经知道如何写入文本文件,因此并不需要为 OutputFormat 实现序列化功能。清单 1 实现了该类:
1 package oddjob.hadoop; 2 3 import org.apache.hadoop.fs.Path; 4 import org.apache.hadoop.io.Text; 5 import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat; 6 7 public class MultipleTextOutputFormatByKey extends MultipleTextOutputFormat<Text, Text> { 8 9 /** 10 * Use they key as part of the path for the final output file. 11 */ 12 @Override 13 protected String generateFileNameForKeyValue(Text key, Text value, String leaf) { 14 return new Path(key.toString(), leaf).toString(); 15 } 16 17 /** 18 * When actually writing the data, discard the key since it is already in 19 * the file path. 20 */ 21 @Override 22 protected Text generateActualKey(Text key, Text value) { 23 return null; 24 } 25 }
清单 1:MultipleTextOutputFormat 子类样例
MultipleTextOutputFormatByKey 类的 generateActualFileNameForKeyValue 方法指定了作业输出的存储位置(第 13 行)。对于每组由 Map/Reduce 作业生成的键值对,该类会把键加入到路径名称中作为输出。“leaf”参数就是我们之前看到的“part-0000”,它在每个 reducer 中都是独一无二的,这样可以允许不同进程同时写入到输出目录而互不影响。例如,由第一个 reducer 产生的键为“France”、值为“soccer 5000”的结果会被写入到“output-directory/France/part-00000”内的某个文件中。
要使用这个类,需确保 Hadoop 包含了这个自定义类的 jar,并使用完整的类名作为“-outputformat”的参数:
hadoop jar hadoop-streaming.jar -libjars CustomOutputFormats.jar \ -outputformat oddjob.hadoop.MultipleTextOutputFormatByKey \ -input search-logs \ -output search-frequency-by-country \ -mapper parse-logs.py \ -reducer count-searches.py
清单 1 是 oddjob 项目中某个类的 Java 实现。oddjob 是一个开源库,提供了多种 MultipleTextOutputFormat。虽然这个库面向的是 Hadoop 的流特性,但是它也可以用在产生文本键值输出的其他作业中。
为服务准备输出
在我们的下一个例子中,必须实现两个接口来自定义数据序列化以及文件存放的目录结构,以使结果可被 ElephantDB 服务加载。正如前面所讨论的,序列化部分会由 RecordWriter 的实现来处理。在 LineRecordWriter 类将字节流写入输出文件的同时,ElephantRecordWriter 还包含了专门的逻辑用来选择要写入的文件以及使用第三方库来格式化磁盘上的数据。
1 public class ElephantRecordWriter implements RecordWriter<IntWritable, ElephantRecordWritable> { 2 3 FileSystem _fs; 4 Args _args; 5 Map<Integer, LocalPersistence> _lps = new HashMap<Integer, LocalPersistence>(); 6 Progressable _progressable; 7 LocalElephantManager _localManager; 8 9 int _numWritten = 0; 10 long _lastCheckpoint = System.currentTimeMillis(); 11 12 public ElephantRecordWriter(Configuration conf, Args args, Progressable progressable) throws IOException { 13 _fs = Utils.getFS(args.outputDirHdfs, conf); 14 _args = args; 15 _progressable = progressable; 16 _localManager = new LocalElephantManager(_fs, args.spec, args.persistenceOptions, LocalElephantManager.getTmpDirs(conf)); 17 } 18 19 private String remoteUpdateDirForShard(int shard) { 20 if(_args.updateDirHdfs==null) return null; 21 else return _args.updateDirHdfs + "/" + shard; 22 } 23 24 public void write(IntWritable shard, ElephantRecordWritable record) throws IOException { 25 LocalPersistence lp = null; 26 LocalPersistenceFactory fact = _args.spec.getLPFactory(); 27 Map<String, Object> options = _args.persistenceOptions; 28 if(_lps.containsKey(shard.get())) { 29 lp = _lps.get(shard.get()); 30 } else { 31 String updateDir = remoteUpdateDirForShard(shard.get()); 32 String localShard = _localManager.downloadRemoteShard("" + shard.get(), updateDir); 33 lp = fact.openPersistenceForAppend(localShard, options); 34 _lps.put(shard.get(), lp); 35 progress(); 36 } 37 38 _args.updater.updateElephant(lp, record.key, record.val); 39 40 _numWritten++; 41 if(_numWritten % 25000 == 0) { 42 long now = System.currentTimeMillis(); 43 long delta = now - _lastCheckpoint; 44 _lastCheckpoint = now; 45 LOG.info("Wrote last 25000 records in " + delta + " ms"); 46 _localManager.progress(); 47 } 48 } 49 50 public void close(Reporter reporter) throws IOException { 51 for(Integer shard: _lps.keySet()) { 52 String lpDir = _localManager.localTmpDir("" + shard); 53 LOG.info("Closing LP for shard " + shard + " at " + lpDir); 54 _lps.get(shard).close(); 55 LOG.info("Closed LP for shard " + shard + " at " + lpDir); 56 progress(); 57 String remoteDir = _args.outputDirHdfs + "/" + shard; 58 if(_fs.exists(new Path(remoteDir))) { 59 LOG.info("Deleting existing shard " + shard + " at " + remoteDir); 60 _fs.delete(new Path(remoteDir), true); 61 LOG.info("Deleted existing shard " + shard + " at " + remoteDir); 62 } 63 LOG.info("Copying " + lpDir + " to " + remoteDir); 64 _fs.copyFromLocalFile(new Path(lpDir), new Path(remoteDir)); 65 LOG.info("Copied " + lpDir + " to " + remoteDir); 66 progress(); 67 } 68 _localManager.cleanup(); 69 } 70 71 private void progress() { 72 if(_progressable!=null) _progressable.progress(); 73 } 74 }
清单 2:从 ElephantDB 中摘录的某个 RecordWriter 子类
ElephantDB 的工作方式是通过跨越若干个 LocalPersistence 对象(BerkeleyDB 文件)来对数据进行分片(划分)。ElephantRecordWriter 类中的 write 函数拿到分片 ID,并检查该分片是否已经打开(第 28 行),如果没有则打开并创建一个新的本地文件(第 33 行)。第 38 行的 updateElephant 调用将作业输出的键值对写入到 BerkeleyDB 文件。
当关闭 ElephantRecordWriter 时,该类在第 64 行会复制 BerkeleyDB 文件到 HDFS 中,且可以随意选择是否覆盖旧文件。接下去的 progress 方法调用会通知 Hadoop 当前的 RecordWriter 正在按计划进行,这有点类似于真实 Map/Reduce 作业中的状态或计数器更新。
下一步是利用 ElephantRecordWriter 来实现 OutputFormat。要理解此清单中的代码,重点是了解 Hadoop JobConf 对象封装了什么。顾名思义,JobConf 对象包含了某项作业的全部设置,包括输入输出目录,作业名称以及 mapper 和 reducer 类。清单 3 展示了两个自定义类是如何共同工作的:
1 public class ElephantOutputFormat implements OutputFormat<IntWritable, ElephantRecordWritable> { 2 public static Logger LOG = Logger.getLogger(ElephantOutputFormat.class); 3 4 public RecordWriter<IntWritable, ElephantRecordWritable> getRecordWriter(FileSystem fs, JobConf conf, String string, Progressable progressable) throws IOException { 5 return new ElephantRecordWriter(conf, (Args) Utils.getObject(conf, ARGS_CONF), progressable); 6 } 7 8 public void checkOutputSpecs(FileSystem fs, JobConf conf) throws IOException { 9 Args args = (Args) Utils.getObject(conf, ARGS_CONF); 10 fs = Utils.getFS(args.outputDirHdfs, conf); 11 if(conf.getBoolean("mapred.reduce.tasks.speculative.execution", true)) { 12 throw new InvalidJobConfException("Speculative execution should be false"); 13 } 14 if(fs.exists(new Path(args.outputDirHdfs))) { 15 throw new InvalidJobConfException("Output dir already exists " + args.outputDirHdfs); 16 } 17 if(args.updateDirHdfs!=null && !fs.exists(new Path(args.updateDirHdfs))) { 18 throw new InvalidJobConfException("Shards to update does not exist " + args.updateDirHdfs); 19 } 20 } 21 }
清单 3:从 ElephantDB 中摘录的某个 OutputFormat 实现
正如前面所看到的,OutputFormat 有两个职责,分别是决定数据的存储位置以及数据写入的方式。ElephantOutputFormat 的数据存储位置是通过检查 JobConf 以及在第 14 和 17 行检查确保该位置是一个合法目标位置后来决定的。至于数据的写入方式,则是由 getRecordWriter 函数处理,它的返回结果是清单 2 中的 ElephantRecordWriter 对象。
从 Hadoop 的角度来看,当 Map/Reduce 作业结束并且每个 reducer 产生了键值对流的时候,这些类会派上用场。Hadoop 会以作业配置为参数调用 checkOutputSpecs。如果函数运行没有抛出异常,它会接下去调用 getRecordWriter 以返回可以写入流数据的对象。当所有的键值对都被写入后,Hadoop 会调用 writer 中的 close 函数,将数据提交到 HDFS 并结束该 reducer 的职责。
总结
OutputFormat 是 Hadoop 框架中的重要组成部分。它们通过为目标消费应用程序产生合适的输出来提供与其他系统和服务间的互操作。自定义作业输出位置可以简化并加速数据工作流;而自定义结果输出方式可以让其快速地工作于其他不同的环境下。虽然实现 OutputFormat 和覆写几个方法一样简单,但是它足够灵活可以支持全新的磁盘上的数据格式。
关于作者
Jim Blomo ( @jimblomo ) 热衷于开发强劲优雅的系统来操控数据。在 Yelp 公司中,他负责管理一个日益增长的数据挖掘团队,该团队使用 Hadoop,mrjob 以及 oddjob 处理 TB 级的数据。在加入 Yelp 公司前,他曾经为创业公司和 Amazon 构建基础设施。他喜爱美食,文化,以及同妻子一道漫步在旧金山湾区的户外。
查看英文原文: Exploring Hadoop OutputFormat
感谢侯伯薇对本文的审校。
给InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家通过新浪微博( @InfoQ )或者腾讯微博( @InfoQ )关注我们,并与我们的编辑和其他读者朋友交流。
评论