写点什么

大数据查询——HBase 读写设计与实践

  • 2017-12-19
  • 本文字数:6519 字

    阅读完需:约 21 分钟

背景介绍

本项目主要解决 check 和 opinion2 张历史数据表(历史数据是指当业务发生过程中的完整中间流程和结果数据)的在线查询。原实现基于 Oracle 提供存储查询服务,随着数据量的不断增加,在写入和读取过程中面临性能问题,且历史数据仅供业务查询参考,并不影响实际流程,从系统结构上来说,放在业务链条上游比较重。本项目将其置于下游数据处理 Hadoop 分布式平台来实现此需求。下面列一些具体的需求指标:

  1. 数据量:目前 check 表的累计数据量为 5000w+ 行,11GB;opinion 表的累计数据量为 3 亿 +,约 100GB。每日增量约为每张表 50 万 + 行,只做 insert,不做 update。
  2. 查询要求:check 表的主键为 id(Oracle 全局 id),查询键为 check_id,一个 check_id 对应多条记录,所以需返回对应记录的 list; opinion 表的主键也是 id,查询键是 bussiness_no 和 buss_type,同理返回 list。单笔查询返回 List 大小约 50 条以下,查询频率为 100 笔 / 天左右,查询响应时间 2s。

技术选型

从数据量及查询要求来看,分布式平台上具备大数据量存储,且提供实时查询能力的组件首选 HBase。根据需求做了初步的调研和评估后,大致确定 HBase 作为主要存储组件。将需求拆解为写入和读取 HBase 两部分。

读取 HBase 相对来说方案比较确定,基本根据需求设计 RowKey,然后根据 HBase 提供的丰富 API(get,scan 等)来读取数据,满足性能要求即可。

写入 HBase 的方法大致有以下几种:

  1. Java 调用 HBase 原生 API,HTable.add(List(Put))。
  2. MapReduce 作业,使用 TableOutputFormat 作为输出。
  3. Bulk Load,先将数据按照 HBase 的内部数据格式生成持久化的 HFile 文件,然后复制到合适的位置并通知 RegionServer ,即完成海量数据的入库。其中生成 Hfile 这一步可以选择 MapReduce 或 Spark。

本文采用第 3 种方式,Spark + Bulk Load 写入 HBase。该方法相对其他 2 种方式有以下优势:

  1. BulkLoad 不会写 WAL,也不会产生 flush 以及 split。
  2. 如果我们大量调用 PUT 接口插入数据,可能会导致大量的 GC 操作。除了影响性能之外,严重时甚至可能会对 HBase 节点的稳定性造成影响,采用 BulkLoad 无此顾虑。
  3. 过程中没有大量的接口调用消耗性能。
  4. 可以利用 Spark 强大的计算能力。

图示如下:

设计

环境信息

复制代码
Hadoop 2.5-2.7
HBase 0.98.6
Spark 2.0.0-2.1.1
Sqoop 1.4.6

表设计

本段的重点在于讨论 HBase 表的设计,其中 RowKey 是最重要的部分。为了方便说明问题,我们先来看看数据格式。以下以 check 举例,opinion 同理。

check 表(原表字段有 18 个,为方便描述,本文截选 5 个字段示意)

如上图所示,主键为 id,32 位字母和数字随机组成,业务查询字段 check_id 为不定长字段(不超过 32 位),字母和数字组成,同一 check_id 可能对应多条记录,其他为相关业务字段。众所周知,HBase 是基于 RowKey 提供查询,且要求 RowKey 是唯一的。RowKey 的设计主要考虑的是数据将怎样被访问。初步来看,我们有 2 种设计方法。

  1. 拆成 2 张表,一张表 id 作为 RowKey,列为 check 表对应的各列;另一张表为索引表,RowKey 为 check_id,每一列对应一个 id。查询时,先找到 check_id 对应的 id list,然后根据 id 找到对应的记录。均为 HBase 的 get 操作。
  2. 将本需求可看成是一个范围查询,而不是单条查询。将 check_id 作为 RowKey 的前缀,后面跟 id。查询时设置 Scan 的 startRow 和 stopRow,找到对应的记录 list。

第一种方法优点是表结构简单,RowKey 容易设计,缺点为 1)数据写入时,一行原始数据需要写入到 2 张表,且索引表写入前需要先扫描该 RowKey 是否存在,如果存在,则加入一列,否则新建一行,2)读取的时候,即便是采用 List, 也至少需要读取 2 次表。第二种设计方法,RowKey 设计较为复杂,但是写入和读取都是一次性的。综合考虑,我们采用第二种设计方法。

RowKey 设计

热点问题

HBase 中的行是以 RowKey 的字典序排序的,其热点问题通常发生在大量的客户端直接访问集群的一个或极少数节点。默认情况下,在开始建表时,表只会有一个 region,并随着 region 增大而拆分成更多的 region,这些 region 才能分布在多个 regionserver 上从而使负载均分。对于我们的业务需求,存量数据已经较大,因此有必要在一开始就将 HBase 的负载均摊到每个 regionserver,即做 pre-split。常见的防治热点的方法为加盐,hash 散列,自增部分(如时间戳)翻转等。

RowKey 设计

Step1:确定预分区数目,创建 HBase Table

不同的业务场景及数据特点确定数目的方式不一样,我个人认为应该综合考虑数据量大小和集群大小等因素。比如 check 表大小约为 11G,测试集群大小为 10 台机器,hbase.hregion.max.filesize=3G(当 region 的大小超过这个数时,将拆分为 2 个),所以初始化时尽量使得一个 region 的大小为 1~2G(不会一上来就 split),region 数据分到 11G/2G=6 个,但为了充分利用集群资源,本文中 check 表划分为 10 个分区。如果数据量为 100G,且不断增长,集群情况不变,则 region 数目增大到 100G/2G=50 个左右较合适。Hbase check 表建表语句如下:

复制代码
create 'tinawang:check',
{ NAME => 'f', COMPRESSION => 'SNAPPY',DATA_BLOCK_ENCODING => 'FAST_DIFF',BLOOMFILTER=>'ROW'},
{SPLITS => [ '1','2','3', '4','5','6','7','8','9']}

其中,Column Family =‘f’,越短越好。

COMPRESSION => ‘SNAPPY’,HBase 支持 3 种压缩 LZO, GZIP and Snappy。GZIP 压缩率高,但是耗 CPU。后两者差不多,Snappy 稍微胜出一点,cpu 消耗的比 GZIP 少。一般在 IO 和 CPU 均衡下,选择 Snappy。

DATA_BLOCK_ENCODING => ‘FAST_DIFF’,本案例中 RowKey 较为接近,通过以下命令查看 key 长度相对 value 较长。

./hbase org.apache.hadoop.hbase.io.hfile.HFile -m -f /apps/hbase/data/data/tinawang/check/a661f0f95598662a53b3d8b1ae469fdf/f/a5fefc880f87492d908672e1634f2eed_SeqId_2_

Step2:RowKey 组成

Salt

让数据均衡的分布到各个 Region 上,结合 pre-split,我们对查询键即 check 表的 check_id 求 hashcode 值,然后 modulus(numRegions) 作为前缀,注意补齐数据。

StringUtils.leftPad(Integer.toString(Math.abs(check_id.hashCode() % numRegion)),1,’0’)说明:如果数据量达上百 G 以上,则 numRegions 自然到 2 位数,则 salt 也为 2 位。

Hash 散列

因为 check_id 本身是不定长的字符数字串,为使数据散列化,方便 RowKey 查询和比较,我们对 check_id 采用 SHA1 散列化,并使之 32 位定长化。

MD5Hash.getMD5AsHex(Bytes.toBytes(check_id))唯一性

以上 salt+hash 作为 RowKey 前缀,加上 check 表的主键 id 来保障 RowKey 唯一性。综上,check 表的 RowKey 设计如下:(check_id=A208849559)

为增强可读性,中间还可以加上自定义的分割符,如’+’,’|’等。

7+7c9498b4a83974da56b252122b9752bf+56B63AB98C2E00B4E053C501380709AD

以上设计能保证对每次查询而言,其 salt+hash 前缀值是确定的,并且落在同一个 region 中。需要说明的是 HBase 中 check 表的各列同数据源 Oracle 中 check 表的各列存储。

WEB 查询设计

RowKey 设计与查询息息相关,查询方式决定 RowKey 设计,反之基于以上 RowKey 设计,查询时通过设置 Scan 的 [startRow,stopRow], 即可完成扫描。以查询 check_id=A208849559 为例,根据 RowKey 的设计原则,对其进行 salt+hash 计算,得前缀。

复制代码
startRow = 7+7c9498b4a83974da56b252122b9752bf
stopRow = 7+7c9498b4a83974da56b252122b9752bg

代码实现关键流程

Spark write to HBase

Step0: prepare work

因为是从上游系统承接的业务数据,存量数据采用 sqoop 抽到 hdfs;增量数据每日以文件的形式从 ftp 站点获取。因为业务数据字段中包含一些换行符,且 sqoop1.4.6 目前只支持单字节,所以本文选择’0x01’作为列分隔符,’0x10’作为行分隔符。

Step1: Spark read hdfs text file

SparkContext.textfile() 默认行分隔符为”\n”,此处我们用“0x10”,需要在 Configuration 中配置。应用配置,我们调用 newAPIHadoopFile 方法来读取 hdfs 文件,返回 JavaPairRDD,其中 LongWritable 和 Text 分别为 Hadoop 中的 Long 类型和 String 类型(所有 Hadoop 数据类型和 java 的数据类型都很相像,除了它们是针对网络序列化而做的特殊优化)。我们需要的数据文件放在 pairRDD 的 value 中,即 Text 指代。为后续处理方便,可将 JavaPairRDD 转换为 JavaRDD< String >。

Step2: Transfer and sort RDD

① 将 avaRDD< String> 转换成 JavaPairRDD<tuple2,String>,其中参数依次表示为,RowKey,col,value。做这样转换是因为 HBase 的基本原理是基于 RowKey 排序的,并且当采用 bulk load 方式将数据写入多个预分区(region)时,要求 Spark 各 partition 的数据是有序的,RowKey,column family(cf),col name 均需要有序。在本案例中因为只有一个列簇,所以将 RowKey 和 col name 组织出来为 Tuple2 格式的 key。请注意原本数据库中的一行记录(n 个字段),此时会被拆成 n 行。

② 基于 JavaPairRDD<tuple2,String> 进行 RowKey,col 的二次排序。如果不做排序,会报以下异常:

java.io.IOException: Added a key notlexically larger than previous key③ 将数据组织成 HFile 要求的 JavaPairRDDhfileRDD。

Step3:create hfile and bulk load to HBase

①主要调用 saveAsNewAPIHadoopFile 方法:

复制代码
hfileRdd.saveAsNewAPIHadoopFile(hfilePath,ImmutableBytesWritable.class,
KeyValue.class,HFileOutputFormat2.class,config);

② hfilebulk load to HBase

复制代码
final Job job = Job.getInstance();
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
HFileOutputFormat2.configureIncrementalLoad(job,htable);
LoadIncrementalHFiles bulkLoader = newLoadIncrementalHFiles(config);
bulkLoader.doBulkLoad(newPath(hfilePath),htable);

注:如果集群开启了 kerberos,step4 需要放置在 ugi.doAs()方法中,在进行如下验证后实现

复制代码
UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(keyUser,keytabPath);
UserGroupInformation.setLoginUser(ugi);

访问 HBase 集群的 60010 端口 web,可以看到 region 分布情况。

Read from HBase

本文基于 spring boot 框架来开发 web 端访问 HBase 内数据。

use connection pool(使用连接池)

创建连接是一个比较重的操作,在实际 HBase 工程中,我们引入连接池来共享 zk 连接,meta 信息缓存,region server 和 master 的连接。

复制代码
HConnection connection = HConnectionManager.createConnection(config);
HTableInterface table = connection.getTable("table1");
try {
// Use the table as needed, for a single operation and a single thread
} finally {
table.close();
}

也可以通过以下方法,覆盖默认线程池。

复制代码
HConnection createConnection(org.apache.hadoop.conf.Configuration conf,ExecutorService pool);

process query

Step1: 根据查询条件,确定 RowKey 前缀

根据 3.3 RowKey 设计介绍,HBase 的写和读都遵循该设计规则。此处我们采用相同的方法,将 web 调用方传入的查询条件,转化成对应的 RowKey 前缀。例如,查询 check 表传递过来的 check_id=A208849559,生成前缀 7+7c9498b4a83974da56b252122b9752bf。

Step2:确定 scan 范围

A208849559 对应的查询结果数据即在 RowKey 前缀为 7+7c9498b4a83974da56b252122b9752bf 对应的 RowKey 及 value 中。

复制代码
scan.setStartRow(Bytes.toBytes(rowkey_pre)); //scan, 7+7c9498b4a83974da56b252122b9752bf
byte[] stopRow = Bytes.toBytes(rowkey_pre);
stopRow[stopRow.length-1]++;
scan.setStopRow(stopRow);// 7+7c9498b4a83974da56b252122b9752bg

Step3:查询结果组成返回对象

遍历 ResultScanner 对象,将每一行对应的数据封装成 table entity,组成 list 返回。

测试

从原始数据中随机抓取 1000 个 check_id,用于模拟测试,连续发起 3 次请求数为 2000(200 个线程并发,循环 10 次),平均响应时间为 51ms,错误率为 0。

如上图,经历 N 次累计测试后,各个 region 上的 Requests 数较为接近,符合负载均衡设计之初。

踩坑记录

1、kerberos 认证问题

如果集群开启了安全认证,那么在进行 Spark 提交作业以及访问 HBase 时,均需要进行 kerberos 认证。

本文采用 yarn cluster 模式,像提交普通作业一样,可能会报以下错误。

复制代码
ERROR StartApp: job failure,
java.lang.NullPointerException
at com.tinawang.spark.hbase.utils.HbaseKerberos.<init>(HbaseKerberos.java:18)
at com.tinawang.spark.hbase.job.SparkWriteHbaseJob.run(SparkWriteHbaseJob.java:60)

定位到 HbaseKerberos.java:18,代码如下:

this.keytabPath = (Thread.currentThread().getContextClassLoader().getResource(prop.getProperty("hbase.keytab"))).getPath();这是因为 executor 在进行 HBase 连接时,需要重新认证,通过 --keytab 上传的 tina.keytab 并未被 HBase 认证程序块获取到,所以认证的 keytab 文件需要另外通过 --files 上传。示意如下

复制代码
--keytab /path/tina.keytab \
--principal tina@GNUHPC.ORG \
--files "/path/tina.keytab.hbase"

其中 tina.keytab.hbase 是将 tina.keytab 复制并重命名而得。因为 Spark 不允许同一个文件重复上传。

2、序列化

复制代码
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2101)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369)
...
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637)
Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext
Serialization stack:
- object not serializable (class: org.apache.spark.api.java.JavaSparkContext, value: org.apache.spark.api.java.JavaSparkContext@24a16d8c)
- field (class: com.tinawang.spark.hbase.processor.SparkReadFileRDD, name: sc, type: class org.apache.spark.api.java.JavaSparkContext)
...

解决方法一

如果 sc 作为类的成员变量,在方法中被引用,则加 transient 关键字,使其不被序列化。

private transient JavaSparkContext sc;解决方法二

将 sc 作为方法参数传递,同时使涉及 RDD 操作的类 implements Serializable。 代码中采用第二种方法。详见代码。

3、批量请求测试

Exception in thread "http-nio-8091-Acceptor-0" java.lang.NoClassDefFoundError: org/apache/tomcat/util/ExceptionUtils或者

Exception in thread "http-nio-8091-exec-34" java.lang.NoClassDefFoundError: ch/qos/logback/classic/spi/ThrowableProxy查看下面 issue 以及一次排查问题的过程,可能是 open file 超过限制。

https://github.com/spring-projects/spring-boot/issues/1106

http://mp.weixin.qq.com/s/34GVlaYDOdY1OQ9eZs-iXg

使用 ulimit-a 查看每个用户默认打开的文件数为 1024。

在系统文件 /etc/security/limits.conf 中修改这个数量限制,在文件中加入以下内容, 即可解决问题。

  • soft nofile 65536
  • hard nofile 65536

作者介绍

汪婷,中国民生银行大数据开发工程师,专注于 Spark 大规模数据处理和 Hbase 系统设计。

参考文献

http://hbase.apache.org/book.html#perf.writing

http://www.opencore.com/blog/2016/10/efficient-bulk-load-of-hbase-using-spark/ http://hbasefly.com/2016/03/23/hbase_writer/ https://github.com/spring-projects/spring-boot/issues/1106 http://mp.weixin.qq.com/s/34GVlaYDOdY1OQ9eZs-iXg

2017-12-19 17:1411956

评论

发布
暂无评论
发现更多内容

人工智能 | Spark在分布式造数工具中的应用:构建大规模测试数据的新境界

测吧(北京)科技有限公司

测试

构建卓越的人工智能产品:全方位的质量保障与测试

测吧(北京)科技有限公司

测试

Docker镜像构建:技术深度解析与实践

树上有只程序猿

Docker 镜像

人工智能 | 优化模型训练的利器——训练集、验证集和测试集

测吧(北京)科技有限公司

测试

人工智能 | 精准数据划分:提升模型性能的关键一步

测吧(北京)科技有限公司

测试

人工智能发展脉络:从专家系统到机器学习的历史演进

测吧(北京)科技有限公司

测试

人工智能 | 数据闭环构建技巧:确保模型稳定性和数据质量

测吧(北京)科技有限公司

测试

2024年API安全趋势预测

互联网工科生

API API 安全

又一次了,该认真考虑“混合多云”了!

京东科技开发者

云计算 云服务 混合多云

人工智能与智能化测试Workshop

测吧(北京)科技有限公司

测试

掌握这些,轻松管理BusyBox:如何交叉编译和集成BusyBox

EquatorCoco

嵌入式 嵌入式应用 busybox

石原子科技荣登「2024 中国企业服务云图」,引领数据价值在线化革命,助力企业省心省钱更安全

StoneDB

MySQL 数据库 HTAP StoneDB

人工智能产品测试的挑战与应对策略

测吧(北京)科技有限公司

测试

高效微调大模型的新方法

百度开发者中心

nlp 大模型 #人工智能

诚邀报名|谭中意邀您共论“大模型应用开发之道”

开放原子开源基金会

Java 开源 程序员 开发者 算法

人工智能 | 优化模型性能的关键一步——深入理解训练集、验证集和测试集

测吧(北京)科技有限公司

测试

Go语言很难吗?为什么 Go 岗位这么少?

伤感汤姆布利柏

Go 后端 低代码 Go 面试题 面经 后端 大厂

融云 CEO 董晗获评甲子光年「2023 中国数字经济创新人物」

融云 RongCloud

互联网 通信 数字经济 wicc 光年20

如何使用京东商品详情 API 获取用户评价最多的商品详情?

技术冰糖葫芦

API 开发

人工智能 | 精细解读人工智能评估指标——深入了解模型的强项与弱项

测吧(北京)科技有限公司

测试

人工智能 | 分布式造数工具中的Spark应用实践:快速生成大规模测试数据

测吧(北京)科技有限公司

测试

大模型训练的得力助手

百度开发者中心

大模型 #人工智能 LLM

在线教育如何通过小程序打造业务新引擎

Geek_2305a8

基于DotNetty实现一个接口自动发布工具 - 通信实现

EquatorCoco

git Netty WPF

从 Elasticsearch 到 SelectDB,观测云实现日志存储与分析的 10 倍性价比提升

SelectDB

数据库 大数据 数据仓库 数据分析 apache doris

人工智能 | 数据与特征:解析模型如何依赖信息解决实际问题

测吧(北京)科技有限公司

测试

StoneDB-8.0-V2.2.0 企业版正式发布!性能优化,稳定性提升,持续公测中!

StoneDB

MySQL 数据库 HTAP StoneDB

一起来看看,Abaqus2024版本会有哪些“惊喜”?

思茂信息

abaqus abaqus软件 abaqus有限元仿真 有限元分析 有限元仿真

人工智能 | 深入理解评估指标——优化模型性能的关键

测吧(北京)科技有限公司

测试

人工智能 | 自学习:数据科学的新潮流

测吧(北京)科技有限公司

测试

工程师都喜欢的一款自动生成网格的仿真软件——Hyperworks到底好不好用?

智造软件

CAE CAE软件 hyperworks

大数据查询——HBase读写设计与实践_大数据_汪婷_InfoQ精选文章