Hadoop 作为 MR 的开源实现,一直以动态运行解析文件格式并获得比 MPP 数据库快上几倍的装载速度为优势。不过,MPP 数据库社区也一直批评 Hadoop 由于文件格式并非为特定目的而建,因此序列化和反序列化的成本过高 [7]。本文介绍 Hadoop 目前已有的几种文件格式,分析其特点、开销及使用场景。希望加深读者对 Hadoop 文件格式及其影响性能的因素的理解。
Hadoop 中的文件格式
1 SequenceFile
SequenceFile 是 Hadoop API 提供的一种二进制文件,它将数据以 <key,value> 的形式序列化到文件中。这种二进制文件内部使用 Hadoop 的标准的 Writable 接口实现序列化和反序列化。它与 Hadoop API 中的 MapFile 是互相兼容的。Hive 中的 SequenceFile 继承自 Hadoop API 的 SequenceFile,不过它的 key 为空,使用 value 存放实际的值, 这样是为了避免 MR 在运行 map 阶段的排序过程。如果你用 Java API 编写 SequenceFile,并让 Hive 读取的话,请确保使用 value 字段存放数据,否则你需要自定义读取这种 SequenceFile 的 InputFormat class 和 OutputFormat class。
图 1:Sequencefile 文件结构
2 RCFile
RCFile 是 Hive 推出的一种专门面向列的数据格式。 它遵循“先按列划分,再垂直划分”的设计理念。当查询过程中,针对它并不关心的列时,它会在 IO 上跳过这些列。需要说明的是,RCFile 在 map 阶段从远端拷贝仍然是拷贝整个数据块,并且拷贝到本地目录后 RCFile 并不是真正直接跳过不需要的列,并跳到需要读取的列, 而是通过扫描每一个 row group 的头部定义来实现的,但是在整个 HDFS Block 级别的头部并没有定义每个列从哪个 row group 起始到哪个 row group 结束。所以在读取所有列的情况下,RCFile 的性能反而没有 SequenceFile 高。
图 2:RCFile 文件结构
3 Avro
Avro 是一种用于支持数据密集型的二进制文件格式。它的文件格式更为紧凑,若要读取大量数据时,Avro 能够提供更好的序列化和反序列化性能。并且 Avro 数据文件天生是带 Schema 定义的,所以它不需要开发者在 API 级别实现自己的 Writable 对象。最近多个 Hadoop 子项目都支持 Avro 数据格式,如 Pig 、Hive、Flume、Sqoop 和 Hcatalog。
图 3:Avro MR 文件格式
4. 文本格式
除上面提到的 3 种二进制格式之外,文本格式的数据也是 Hadoop 中经常碰到的。如 TextFile 、XML 和 JSON。 文本格式除了会占用更多磁盘资源外,对它的解析开销一般会比二进制格式高几十倍以上,尤其是 XML 和 JSON,它们的解析开销比 Textfile 还要大,因此强烈不建议在生产系统中使用这些格式进行储存。 如果需要输出这些格式,请在客户端做相应的转换操作。 文本格式经常会用于日志收集,数据库导入,Hive 默认配置也是使用文本格式,而且常常容易忘了压缩,所以请确保使用了正确的格式。另外文本格式的一个缺点是它不具备类型和模式,比如销售金额、利润这类数值数据或者日期时间类型的数据,如果使用文本格式保存,由于它们本身的字符串类型的长短不一,或者含有负数,导致 MR 没有办法排序,所以往往需要将它们预处理成含有模式的二进制格式,这又导致了不必要的预处理步骤的开销和储存资源的浪费。
5. 外部格式
Hadoop 实际上支持任意文件格式,只要能够实现对应的 RecordWriter 和 RecordReader 即可。其中数据库格式也是会经常储存在 Hadoop 中,比如 Hbase,Mysql,Cassandra,MongoDB。 这些格式一般是为了避免大量的数据移动和快速装载的需求而用的。他们的序列化和反序列化都是由这些数据库格式的客户端完成,并且文件的储存位置和数据布局 (Data Layout) 不由 Hadoop 控制,他们的文件切分也不是按 HDFS 的块大小(blocksize)进行切割。
文件存储大小比较与分析
我们选取一个 TPC-H 标准测试来说明不同的文件格式在存储上的开销。因为此数据是公开的,所以读者如果对此结果感兴趣,也可以对照后面的实验自行做一遍。Orders 表文本格式的原始大小为 1.62G。 我们将其装载进 Hadoop 并使用 Hive 将其转化成以上几种格式,在同一种 LZO 压缩模式下测试形成的文件的大小。
Orders_text1
1732690045
1.61G
非压缩
TextFile
Orders_tex2
772681211
736M
LZO 压缩
TextFile
Orders_seq1
1935513587
1.80G
非压缩
SequenceFile
Orders_seq2
822048201
783M
LZO 压缩
SequenceFile
Orders_rcfile1
1648746355
1.53G
非压缩
RCFile
Orders_rcfile2
686927221
655M
LZO 压缩
RCFile
Orders_avro_table1
1568359334
1.46G
非压缩
Avro
Orders_avro_table2
652962989
622M
LZO 压缩
Avro
表 1:不同格式文件大小对比
从上述实验结果可以看到,SequenceFile 无论在压缩和非压缩的情况下都比原始纯文本 TextFile 大,其中非压缩模式下大 11%, 压缩模式下大 6.4%。这跟 SequenceFile 的文件格式的定义有关: SequenceFile 在文件头中定义了其元数据,元数据的大小会根据压缩模式的不同略有不同。一般情况下,压缩都是选取 block 级别进行的,每一个 block 都包含 key 的长度和 value 的长度,另外每 4K 字节会有一个 sync-marker 的标记。对于 TextFile 文件格式来说不同列之间只需要用一个行间隔符来切分,所以 TextFile 文件格式比 SequenceFile 文件格式要小。但是 TextFile 文件格式不定义列的长度,所以它必须逐个字符判断每个字符是不是分隔符和行结束符。因此 TextFile 的反序列化开销会比其他二进制的文件格式高几十倍以上。
RCFile 文件格式同样也会保存每个列的每个字段的长度。但是它是连续储存在头部元数据块中,它储存实际数据值也是连续的。另外 RCFile 会每隔一定块大小重写一次头部的元数据块(称为 row group,由 hive.io.rcfile.record.buffer.size 控制,其默认大小为 4M),这种做法对于新出现的列是必须的,但是如果是重复的列则不需要。RCFile 本来应该会比 SequenceFile 文件大,但是 RCFile 在定义头部时对于字段长度使用了 Run Length Encoding 进行压缩,所以 RCFile 比 SequenceFile 又小一些。Run length Encoding 针对固定长度的数据格式有非常高的压缩效率,比如 Integer、Double 和 Long 等占固定长度的数据类型。在此提一个特例——Hive 0.8 引入的 TimeStamp 时间类型,如果其格式不包括毫秒,可表示为”YYYY-MM-DD HH:MM:SS”,那么就是固定长度占 8 个字节。如果带毫秒,则表示为”YYYY-MM-DD HH:MM:SS.fffffffff”,后面毫秒的部分则是可变的。
Avro 文件格式也按 group 进行划分。但是它会在头部定义整个数据的模式(Schema), 而不像 RCFile 那样每隔一个 row group 就定义列的类型,并且重复多次。另外,Avro 在使用部分类型的时候会使用更小的数据类型,比如 Short 或者 Byte 类型,所以 Avro 的数据块比 RCFile 的文件格式块更小。
序列化与反序列化开销分析
我们可以使用 Java 的 profile 工具来查看 Hadoop 运行时任务的 CPU 和内存开销。以下是在 Hive 命令行中的设置:
hive>set mapred.task.profile=true; hive>set mapred.task.profile.params =-agentlib:hprof=cpu=samples,heap=sites, depth=6,force=n,thread=y,verbose=n,file=%s
当 map task 运行结束后,它产生的日志会写在 $logs/userlogs/job- 文件夹下。当然,你也可以直接在 JobTracker 的 Web 界面的 logs 或 jobtracker.jsp 页面找到日志。
我们运行一个简单的 SQL 语句来观察 RCFile 格式在序列化和反序列化上的开销:
hive> select O_CUSTKEY,O_ORDERSTATUS from orders_rc2 where O_ORDERSTATUS='P';
其中的 O_CUSTKEY 列为 integer 类型,O_ORDERSTATUS 为 String 类型。在日志输出的最后会包含内存和 CPU 的消耗。
下表是一次 CPU 的开销:
rank
self
accum
count
trace
method
20
0.48%
79.64%
65
315554
org.apache.hadoop.hive.ql.io.RCFile$Reader.getCurrentRow
28
0.24%
82.07%
32
315292
org.apache.hadoop.hive.serde2.columnar.ColumnarStruct.init
55
0.10%
85.98%
14
315788
org.apache.hadoop.hive.ql.io.RCFileRecordReader.getPos
56
0.10%
86.08%
14
315797
org.apache.hadoop.hive.ql.io.RCFileRecordReader.next
表 2:一次 CPU 的开销
其中第五列可以对照上面的 Track 信息查看到底调用了哪些函数。比如 CPU 消耗排名 20 的函数对应 Track:
TRACE 315554: (thread=200001) org.apache.hadoop.hive.ql.io.RCFile$Reader.getCurrentRow(RCFile.java:1434) org.apache.hadoop.hive.ql.io.RCFileRecordReader.next(RCFileRecordReader.java:88) org.apache.hadoop.hive.ql.io.RCFileRecordReader.next(RCFileRecordReader.java:39) org.apache.hadoop.hive.ql.io.CombineHiveRecordReader.doNext(CombineHiveRecordReader.java:98) org.apache.hadoop.hive.ql.io.CombineHiveRecordReader.doNext(CombineHiveRecordReader.java:42) org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.next(HiveContextAwareRecordReader.java:67)
其中,比较明显的是 RCFile,它为了构造行而消耗了不必要的数组移动开销。其主要是因为 RCFile 为了还原行,需要构造 RowContainer,顺序读取一行构造 RowContainer,然后给其中对应的列进行赋值,因为 RCFile 早期为了兼容 SequenceFile 所以可以合并两个 block,又由于 RCFile 不知道列在哪个 row group 结束,所以必须维持数组的当前位置,类似如下格式定义:
Array<RowContainer extends List<Object>>
而此数据格式可以改为面向列的序列化和反序列化方式。如:
Map<array<col1Type>,array<col2Type>,array<col3Type>....>
这种方式的反序列化会避免不必要的数组移动,当然前提是我们必须知道列在哪个 row group 开始到哪个 row group 结束。这种方式会提高整体反序列化过程的效率。
关于 Hadoop 文件格式的思考
1 高效压缩
Hadoop 目前尚未出现针对数据特性的高效编码(Encoding)和解码 (Decoding) 数据格式。尤其是支持 Run Length Encoding、Bitmap 这些极为高效算法的数据格式。HIVE-2065 讨论过使用更加高效的压缩形式,但是对于如何选取列的顺序没有结论。关于列顺序选择可以看 Daniel Lemire 的一篇论文 《Reordering Columns for Smaller Indexes》[1]。作者同时也是 Hive 0.8 中引入的 bitmap 压缩算法基础库的作者。该论文的结论是:当某个表需要选取多个列进行压缩时,需要根据列的选择性 (selectivity) 进行升序排列,即唯一值越少的列排得越靠前。 事实上这个结论也是 Vertica 多年来使用的数据格式。其他跟压缩有关的还有 HIVE-2604 和 HIVE-2600。
2 基于列和块的序列化和反序列化
不论排序后的结果是不是真的需要,目前 Hadoop 的整体框架都需要不断根据数据 key 进行排序。除了上面提到的基于列的排序,序列化和反序列化之外,Hadoop 的文件格式应该支持某种基于块(Block) 级别的排序和序列化及反序列化方式,只有当数据满足需要时才进行这些操作。来自 Google Tenzing 论文中曾将它作为 MR 的优化手段提到过。
“Block Shuffle:正常来说,MR 在 Shuffle 的时候使用基于行的编码和解码。为了逐个处理每一行, 数据必须先排序。然而,当排序不是必要的时候这种方式并不高效, 我们在基于行的 shuffle 基础上实现了一种基于 block 的 shuffle 方式,每一次处理大概 1M 的压缩 block,通过把整个 block 当成一行,我们能够避免 MR 框架上的基于行的序列化和反序列化消耗,这种方式比基于行的 shuffle 快上 3 倍以上。”
3 数据过滤(Skip List)
除常见的分区和索引之外,使用排序之后的块(Block)间隔也是常见列数据库中使用的过滤数据的方法。Google Tenzing 同样描述了一种叫做 ColumnIO 的数据格式,ColumnIO 在头部定义该 Block 的最大值和最小值,在进行数据判断的时候,如果当前 Block 的头部信息里面描述的范围中不包含当前需要处理的内容,则会直接跳过该块。Hive 社区里曾讨论过如何跳过不需要的块 ,可是因为没有排序所以一直没有较好的实现方式。包括 RCFile 格式,Hive 的 index 机制里面目前还没有一个高效的根据头部元数据就可以跳过块的实现方式。
4 延迟物化
真正好的列数据库,都应该可以支持直接在压缩数据之上不需要通过解压和排序就能够直接操作块。通过这种方式可以极大的降低 MR 框架或者行式数据库中先解压,再反序列化,然后再排序所带来的开销。Google Tenzing 里面描述的 Block Shuffle 也属于延迟物化的一种。更好的延迟物化可以直接在压缩数据上进行操作,并且可以做内部循环, 此方面在论文《Integrating Compression and Execution in Column-Oriented Database System》[5] 的 5.2 章节有描述。 不过考虑到它跟 UDF 集成也有关系,所以,它会不会将文件接口变得过于复杂也是一件有争议的事情。
5 与 Hadoop 框架集成
无论文本亦或是二进制格式,都只是最终的储存格式。Hadoop 运行时产生的中间数据却没有办法控制。包括一个 MR Job 在 map 和 reduce 之间产生的数据或者 DAG Job 上游 reduce 和下游 map 之间的数据,尤其是中间格式并不是列格式,这会产生不必要的 IO 和 CPU 开销。比如 map 阶段产生的 spill,reduce 阶段需要先 copy 再 sort-merge。如果这种中间格式也是面向列的,然后将一个大块切成若干小块,并在头部加上每个小块的最大最小值索引,就可以避免大量 sort-mege 操作中解压—反序列化—排序—合并(Merge)的开销,从而缩短任务的运行时间。
其他文件格式
Hadoop 社区也曾有对其他文件格式的研究。比如,IBM 研究过面向列的数据格式并发表论文《Column-Oriented Storage Techniques for MapReduce》[4],其中特别提到 IBM 的 CIF(Column InputFormat) 文件格式在序列化和反序列化的 IO 消耗上比 RCFile 的消耗要小 20 倍。里面提到的将列分散在不同的 HDFS Block 块上的实现方式 RCFile 也有考虑过,但是最后因为重组行的消耗可能会因分散在远程机器上产生的延迟而最终放弃了这种实现。此外,最近 Avro 也在实现一种面向列的数据格式,不过目前 Hive 与 Avro 集成尚未全部完成。有兴趣的读者可以关注 avro-806 和 hive-895。
总结
Hadoop 可以与各种系统兼容的前提是 Hadoop MR 框架本身能够支持多种数据格式的读写。但如果要提升其性能,Hadoop 需要一种高效的面向列的基于整个 MR 框架集成的数据格式。尤其是高效压缩,块重组(block shuffle),数据过滤(skip list)等高级功能,它们是列数据库相比 MR 框架在文件格式上有优势的地方。相信随着社区的发展以及 Hadoop 的逐步成熟,未来会有更高效且统一的数据格式出现。
参考资料
[1] 压缩列顺序选择 http://lemire.me/en/ Reordering Columns for Smaller Indexes 论文地址
[2]Hive 与 Avro 集成 https://issues.apache.org/jira/browse/HIVE-895
[3]Google 的 Tenzing 论文 http://research.google.com/pubs/DistributedSystemsandParallelComputing.html
Tenzing A SQL Implementation On The MapReduce Framework
[4]IBM Column-Oriented Storage Techniques for MapReduce http://pages.cs.wisc.edu/~jignesh/publ/colMR.pdf
[5]Integrating compression and execution in column-oriented database systems http://db.lcs.mit.edu/projects/cstore/abadisigmod06.pdf
[6]Avro 项目主页 http://avro.apache.org/
[7]MapReduce and Parallel DBMSs: Friends or Foes , repetitive record parsing 小节 http://cacm.acm.org/magazines/2010/1/55743-mapreduce-and-parallel-dbmss-friends-or-foes/fulltext
作者简介
江志伟,关注分析型 MPP 数据库和 Hadoop,建有个人博客 http://www.gemini5201314.net/ , 五月份 Hadoop Definitive Guide 3rd 要出了,如果找到有兴趣合作翻译的朋友,可能会翻译这本经典书籍。
感谢马国耀对本文的审校。
给InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家通过新浪微博( @InfoQ )或者腾讯微博( @InfoQ )关注我们,并与我们的编辑和其他读者朋友交流。
评论