写点什么

使用 Spark Streaming + Kudu + Impala 构建一个预测引擎

  • 2016-06-01
  • 本文字数:7227 字

    阅读完需:约 24 分钟

随着用户使用天数的增加,不管你的业务是扩大还是缩减了,为什么你的大数据中心架构保持线性增长的趋势?很明显需要一个稳定的基本架构来保障你的业务线。当你的客户处在休眠期,或者你的业务处在淡季,你增加的计算资源就处在浪费阶段;相对应地,当你的业务在旺季期,或者每周一每个人对上周的数据进行查询分析,有多少次你忒想拥有额外的计算资源。

根据需求水平动态分配资源 VS 固定的资源分配方式,似乎不太好实现。幸运的是,借助于现今强大的开源技术,可以很轻松的实现你所愿。在这篇文章中,我将给出一个解决例子,基于流式 API 数据来演示如何预测资源需求变化来调整资源分配。

我们旨在用流式回归模型预测接下来十分钟的海量事件数据,并与传统批处理的方法预测的结果进行对比。这个预测结果可用来动态规划计算机资源,或者业务优化。传统的批处理方法预测采用 Impala 和 Spark 两种方法,动态预测使用 Spark Streaming。

任何预测的起点是基于海量历史数据和实时更新的数据来预测未来的数据业务。流式 API 提供稳定的流失 RSVP 数据,用来预测未来一段时间 RSVP 数据。

动态资源分配预测架构图

这个例子的数据通过流式 API 进入 Kafka,然后使用 Spark Streaming 从 Kafka 加载数据到 Kudu。Kafka 允许数据同时进入两个独立的 Spark Streaming 作业:一个用来进行特征工程;一个用来使用 MLlib 进行流式预测。预测的结果存储在 Kudu 中,我们也可以使用 Impala 或者 Spark SQL 进行交互式查询,见图 1。

图 1

你可能急切想知道我的技术选型,下面是一些技术概要:

  • Kafka:Kafka 可抽象数据输入,支持扩展,并耦合 Spark Streaming 框架。Kafka 拥有每秒处理百万事件的扩展能力,并能和其他各项技术集成,比如,Spark Streaming。
  • Spark Streaming:Spark Streaming 能够处理复杂的流式事件,并且采用 Scala 编程仅需简单的几行代码即可,也支持 Java、Python 或者 R 语言。Spark Streaming 提供和 Kafka、MLlib(Spark 的机器学习库)的集成。 Apache Kudu:Kudu 支持事件的增量插入,它旨在提供一种基于 HDFS(HDFS 优势在于大数据存储下的快速扫描能力)和 HBase(HBase 优势是基于主键的快速插入/查询)之间超存储层。本项目可以采用 HBase 或者 Cassandra,但 Kudu 为数据分析提供了快速的扫描能力、列式存储架构。
  • Impala:使用 Impala 可很容易的即席查询。它提供一个查询引擎直接查询加载到 Kudu 上的数据,并能理解生成模型。作为可选的方案可使用 Spark SQL,但这里为了比较使用 MADlib 库训练的回归模型和使用 Saprk MLlib 训练的模型,故用 Impala。

构建实例

现在解释下架构的选择,详细细节如下:

首先,粗略浏览一下流式数据源。通过 Kafka 来监测文件,tail 文件变化发送到 Kafka,部分代码见 Github 。下面给出 RSVP 内容样例:

复制代码
{"response":"yes","member":{"member_name":"Richard
Williamson","photo":"http:\/\/photos3.meetupstatic.com\/photos\/member\/d\/a\/4\/0\/thu
mb_231595872.jpeg","member_id":29193652},"visibility":"public","event":
{"time":1424223000000,"event_url":"http:\/\/www.meetup.com\/Big-Data-
Science\/events\/217322312\/","event_id":"fbtgdlytdbbc","event_name":"Big Data Science
@Strata Conference,
2015"},"guests":0,"mtime":1424020205391,"rsvp_id":1536654666,"group":{"group_name":"Big
Data
Science","group_state":"CA","group_city":"Fremont","group_lat":37.52,"group_urlname":"Big-
Data-Science","group_id":3168962,"group_country":"us","group_topics":
[{"urlkey":"data-visualization","topic_name":"Data Visualization"},{"urlkey":"data-
mining","topic_name":"Data Mining"},{"urlkey":"businessintell","topic_name":"Business
Intelligence"},{"urlkey":"mapreduce","topic_name":"MapReduce"},
{"urlkey":"hadoop","topic_name":"Hadoop"},{"urlkey":"opensource","topic_name":"Open
Source"},{"urlkey":"r-project-for-statistical-computing","topic_name":"R Project for Statistical
Computing"},{"urlkey":"predictive-analytics","topic_name":"Predictive Analytics"},
{"urlkey":"cloud-computing","topic_name":"Cloud Computing"},{"urlkey":"big-
data","topic_name":"Big Data"},{"urlkey":"data-science","topic_name":"Data Science"},
{"urlkey":"data-analytics","topic_name":"Data Analytics"},
{"urlkey":"hbase","topic_name":"HBase"},
{"urlkey":"hive","topic_name":"Hive"}],"group_lon":-121.93},"venue":
{"lon":-121.889122,"venue_name":"San Jose Convention Center, Room
210AE","venue_id":21805972,"lat":37.330341}}

一旦 Kafka 运行起来,数据从 Kafka 经过 Spark Streaming 进入 Kudu,代码见这里

流式作业在 Kudu 上初始化一个表,接着运行 Spark Streaming 加载数据到数据表。你可以创建一个 Impala 外部表,并指向 Kudu 上存储的数据。

复制代码
CREATE EXTERNAL TABLE `kudu_meetup_rsvps` (
`event_id` STRING,
`member_id` INT,
`rsvp_id` INT,
`event_name` STRING,
`event_url` STRING,
`TIME` BIGINT,
`guests` INT,
`member_name` STRING,
`facebook_identifier` STRING,
`linkedin_identifier` STRING,
`twitter_identifier` STRING,
`photo` STRING,
`mtime` BIGINT,
`response` STRING,
`lat` DOUBLE,
`lon` DOUBLE,
`venue_id` INT,
`venue_name` STRING,
`visibility` STRING
)
TBLPROPERTIES(
'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
'kudu.table_name' = 'kudu_meetup_rsvps',
'kudu.master_addresses' = 'quickstart.cloudera:7051',
'kudu.key_columns' = 'event_id, member_id, rsvp_id'
);

紧接着用 Impala 表查询获得小时 RSVP 数据:

复制代码
create
table rsvps_by_hour as
select from_unixtime(cast(mtime/1000 as bigint), "yyyy-MM-dd") as mdate
,cast(from_unixtime(cast(mtime/1000 as bigint), "HH") as int) as mhour
,count(*) as rsvp_cnt
from kudu_meetup_rsvps
group
by 1,2

有了 RSVP 数据后可以画随时间的变化图,见图 2:

图 2

接着可以进行特征工程,为了后续可以直接用 Impala 建立预测模型:

复制代码
create
table rsvps_by_hr_training as
select
case when mhour=0 then 1 else 0 end as hr0
,case when mhour=1 then 1 else 0 end as hr1
,case when mhour=2 then 1 else 0 end as hr2
,case when mhour=3 then 1 else 0 end as hr3
,case when mhour=4 then 1 else 0 end as hr4
,case when mhour=5 then 1 else 0 end as hr5
,case when mhour=6 then 1 else 0 end as hr6
,case when mhour=7 then 1 else 0 end as hr7
,case when mhour=8 then 1 else 0 end as hr8
,case when mhour=9 then 1 else 0 end as hr9
,case when mhour=10 then 1 else 0 end as hr10
,case when mhour=11 then 1 else 0 end as hr11
,case when mhour=12 then 1 else 0 end as hr12
,case when mhour=13 then 1 else 0 end as hr13
,case when mhour=14 then 1 else 0 end as hr14
,case when mhour=15 then 1 else 0 end as hr15
,case when mhour=16 then 1 else 0 end as hr16
,case when mhour=17 then 1 else 0 end as hr17
,case when mhour=18 then 1 else 0 end as hr18
,case when mhour=19 then 1 else 0 end as hr19
,case when mhour=20 then 1 else 0 end as hr20
,case when mhour=21 then 1 else 0 end as hr21
,case when mhour=22 then 1 else 0 end as hr22
,case when mhour=23 then 1 else 0 end as hr23
,case when mdate in ("2015-02-14","2015-02-15") then 1 else 0 end as weekend_day
,mdate
,mhour
,rsvp_cnt
from rsvps_by_hour;

在 Impala 上安装 MADlib ,这样就可以直接在 Impala 上构建回归模型。

采用 MADlib 训练回归模型的第一步:

复制代码
select printarray(linr(toarray(hr0,hr1,hr2,hr3,hr4,hr5,hr6,hr7,hr8,hr9,hr10,hr11,hr12,hr13,hr14, hr15,hr16,hr17,hr18,hr19,hr20,hr21,hr22,hr23,weekend_day), rsvp_cnt))
from rsvps_by_hr_training;

下面展示回归系数。你可看到前面的 24 个系数显示了一天的按小时趋势,在晚上很少的人在线;最后一个系数是周末,如果是周末的话,系数是负值。

复制代码
Feature Coefficient
hr0 8037.43
hr1 7883.93
hr2 7007.68
hr3 6851.91
hr4 6307.91
hr5 5468.24
hr6 4792.58
hr7 4336.91
hr8 4330.24
hr9 4360.91
hr10 4373.24
hr11 4711.58
hr12 5649.91
hr13 6752.24
hr14 8056.24
hr15 9042.58
hr16 9761.37
hr17 10205.9
hr18 10365.6
hr19 10048.6
hr20 9946.12
hr21 9538.87
hr22 9984.37
hr23 9115.12
weekend_day -2323.73

通过上述系数进行预测:

复制代码
select mdate,
mhour,
cast(linrpredict(toarray(8037.43, 7883.93, 7007.68, 6851.91,
6307.91, 5468.24, 4792.58, 4336.91, 4330.24, 4360.91, 4373.24, 4711.58, 5649.91,
6752.24, 8056.24, 9042.58, 9761.37, 10205.9, 10365.6, 10048.6, 9946.12,
9538.87, 9984.37, 9115.12, -2323.73), toarray(hr0, hr1, hr2, hr3, hr4,
hr5, hr6, hr7, hr8, hr9, hr10, hr11, hr12, hr13, hr14, hr15, hr16, hr17,
hr18, hr19, hr20, hr21, hr22, hr23, weekend_day)) as int) as rsvp_cnt_pred,
rsvp_cnt
from rsvps_by_hr_testing

图 3 按小时对比预测数据和 RSVP 真实值,由于数据有限,只列出两天的预测。

图 3

使用 Spark MLlib 训练模型

下面使用 Spark MLlib 建立类似的模型,在海量数据下这种方式更优吸引力。

首先,Spark 加载 JSON 文件并使用 Spark SQL 注册为一张表。你也可以直接从 Kudu 加载数据,但此列子直接用 Spark 读取 JSON 文件。

复制代码
val path = "/home/demo/meetupstream1M.json"
val meetup = sqlContext.read.json(path)
meetup.registerTempTable("meetup")

你可以使用 Spark SQL 运行一个类似在前面 Impala 中使用的查询语句来获取小时的 RSVP 数据:

复制代码
val meetup2 = sqlContext.sql("
select from_unixtime(cast(mtime/1000 as bigint), 'yyyy-MM-dd') as dy,
case when from_unixtime(cast(mtime/1000 as bigint),'yyyy-MM-dd') in ('2015-02-14','2015-02-15') then 1 else 0 end as weekend_day,
from_unixtime(cast(mtime/1000 as bigint), 'HH') as hr,
count(*) as rsvp_cnt
from meetup
where from_unixtime(cast(mtime/1000 as bigint), 'yyyy-MM-dd') >= '2015-10-30'
group
by from_unixtime(cast(mtime/1000 as bigint), 'yyyy-MM-dd'),
from_unixtime(cast(mtime/1000 as bigint), 'HH')")

接下来,创建特征向量。你可以参照前面类的方法做特征工程,但这里介绍一个 Andrew Ray 的简便方法,使用一句话即可实现特征向量:

复制代码
val meetup3 = meetup2.groupBy("dy","weekend_day","hr","rsvp_cnt").pivot("hr").count().orderBy("dy")
现在有了这些数据,可以训练回归模型了:
import org.apache.spark.mllib.regression.RidgeRegressionWithSGD
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
val trainingData = meetup3.map { row =>
val features = Array[Double](1.0,row(1).toString().toDouble,row(4).toString().toDouble,
row(5).toString().toDouble,row(6).toString().toDouble,
row(7).toString().toDouble,row(8).toString().toDouble,
row(9).toString().toDouble,row(10).toString().toDouble,
row(11).toString().toDouble,row(12).toString().toDouble,
row(13).toString().toDouble,row(14).toString().toDouble,
row(15).toString().toDouble,row(16).toString().toDouble,
row(17).toString().toDouble,row(18).toString().toDouble,
row(19).toString().toDouble,row(20).toString().toDouble,
row(21).toString().toDouble,row(22).toString().toDouble,
row(23).toString().toDouble,row(24).toString().toDouble,
row(25).toString().toDouble,row(26).toString().toDouble,
row(27).toString().toDouble)
LabeledPoint(row(3).toString().toDouble, Vectors.dense(features))
}
trainingData.cache()
val model = new RidgeRegressionWithSGD().run(trainingData)

得到一个新的数据集评分,

复制代码
val scores = meetup3.map { row =>
val features = Vectors.dense(Array[Double](1.0,row(1).toString().toDouble,
row(4).toString().toDouble,row(5).toString().toDouble,
row(6).toString().toDouble,row(7).toString().toDouble,
row(8).toString().toDouble,row(9).toString().toDouble,
row(10).toString().toDouble,row(11).toString().toDouble,
row(12).toString().toDouble,row(13).toString().toDouble,
row(14).toString().toDouble,row(15).toString().toDouble,
row(16).toString().toDouble,row(17).toString().toDouble,
row(18).toString().toDouble,row(19).toString().toDouble,
row(20).toString().toDouble,row(21).toString().toDouble,
row(22).toString().toDouble,row(23).toString().toDouble,
row(24).toString().toDouble,row(25).toString().toDouble,
row(26).toString().toDouble,row(27).toString().toDouble))
(row(0),row(2),row(3), model.predict(features))
}
scores.foreach(println)

图 4 描述 Spark 模型结果和真实 RSVP 数据的对比。

图 4

使用 Spark Streaming 建立回归模型

前面的两个例子展示了我们如何基于批处理数据构建模型和即席查询,现在开始建立一个 Spark Streaming 回归模型。使用流式的方法建立模型使得我们可以更频繁的更新模型,获取最新的数据,预测也更准确。

这里可能和批处理的方法稍有不同。为了展示使用流式回归模型,这里简单的使用每分钟的 RSVP 数据(替代前面批量预测中按小时处理)来生成连续的流数据来预测接下来的十分钟内的数据。

首先,使用 Kafka 来输入数据,代码见这里。这部分代码简单的设置 Kafka 为输入源,设置 topic、broker list 和 Spark Streaming 作为输入参数,它可以连接 Kafka 并获取数据。

复制代码
def loadDataFromKafka(topics: String,
brokerList: String,
ssc: StreamingContext): DStream[String] = {
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]
("metadata.broker.list" -> brokerList)
val messages = KafkaUtils.createDirectStream[String,
String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
messages.map(_._2)
}
val dstream = loadDataFromKafka(topics, brokerList, ssc)
对 DStream 进行 transform 操作获得 RSVP 值:
val stream = dstream.transform { rdd =>
val parsed1 = sqlContext.read.json(rdd)
parsed1.registerTempTable("parsed1")
val parsed2 = sqlContext.sql("
select m,
cnt,
mtime
from (select (round(mtime/60000)-(" + current_time + "/60000 ))/1000.0 as m,
count(*) as cnt,
round(mtime/60000) as mtime
from (select distinct * from parsed1) a
group
by (round(mtime/60000)-(" + current_time + "/60000 ))/1000.0,
round(mtime/60000) ) aa
where cnt > 20
")
parsed2.rdd
}
stream.print()

转换数据结构来训练模型:一个数据流为训练数据,actl_stream;另一个数据流用来预测,pred_stream。预测数据流为当前训练数据流时刻的下一个 10 分钟时间间隔。

复制代码
val actl_stream = stream.map(x =>
LabeledPoint(x(1).toString.toDouble,Vectors.dense(Array(1.0,x(0).toString.toDouble))) ).cache()
actl_stream.print()
val pred_stream = stream.map(x =>
LabeledPoint((x(2).toString.toDouble+10)*60000,Vectors.dense(Array(1.0,x(0).toString.toDouble))) )
pred_stream.print()

用时间间隔的数据作为特征训练流式模型,这里的场景非常简单,只是为了说明问题。实际的产品模型需要结合前面讨论的按天和周末的模型来提高预测的准确性。

复制代码
val numFeatures = 2
val model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(numFeatures)
model.trainOn(actl_stream)
最后,应用预测模型对下一个时间间隔的数据进行预测:
val rslt_stream = model.predictOnValues(pred_stream.map(lp => (lp.label, lp.features)))
rslt_stream.print()

图 5 为流式模型预测的结果。

图 5

如你所见,假如我们利用最近十分钟的 RSVP 数据,可以更好的预测接下来的十分钟左右的数据。将来为了更好的预测需要考虑增加更多的特征来提高模型的健壮性。预测的结果流式的写入 Kudu,使用 API 可以很容易的使用这些预测数据来自动的分配资源。


感谢杜小芳对本文的审校。

给InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家通过新浪微博( @InfoQ @丁晓昀),微信(微信号: InfoQChina )关注我们。

2016-06-01 17:3614812
用户头像

发布了 43 篇内容, 共 26.8 次阅读, 收获喜欢 7 次。

关注

评论

发布
暂无评论
发现更多内容

openEuler 资源利用率提升之道02:典型应用下的效果

openEuler

开源 数据 cpu 操作系统 openEuler

借问变量何处存,牧童笑称用指针,Go lang1.18入门精炼教程,由白丁入鸿儒,go lang类型指针(Pointer)的使用EP05

刘悦的技术博客

入门 教程 Go web 教程分享 入门介绍

开源一夏 | jQuery对于链和捕获的实战研究

恒山其若陋兮

开源 8月月更

SRv6故障管理

穿过生命散发芬芳

8月月更 SRv6

Kubernetes web网站无法访问

CTO技术共享

开源 签约计划第三季 8月月更

【高并发】别闹了,要实现亿级流量下的分布式限流,这些算法你必须掌握!!

冰河

并发编程 多线程 高并发 协程 异步编程

MySQL权限管理

武师叔

8月月更

Kubernetes与OpenStack

CTO技术共享

开源 OpenStack 签约计划第三季 8月月更

Kubernetes 实现 CI/CD 发布流程

CTO技术共享

开源 CI/CD 签约计划第三季 8月月更

Kubernetes 资源核心原理

CTO技术共享

开源 签约计划第三季 8月月更

Unity Metaverse(四)、接入环信IM SDK 实现用户登录注册

CoderZ

Unity 登录验证 环信im 8月月更

中断系统结构及中断控制详解

timerring

8月月更

每日一R「01」跟着大佬学 Rust

Samson

8月月更

抖音开启“818发现好物节”:电商平台造节活动何时休

石头IT视角

微服务架构的核心关键点

阿泽🧸

微服务架构 8月月更

培训预告 | 企业应用现代化实用教程——DevOps方法论及最佳实践篇 8月11日上线

York

DevOps 云原生 团队建设 降本增效 应用现代化

头脑风暴:打家劫舍2

HelloWorld杰少

算法 LeetCode 动态规划 8月月更

直播 | 服务餐饮商户年交易额超 7000 亿,哗啦啦如何用 StarRocks 搞定实时报表

StarRocks

数据库

舔狗至高境界,学会这个技巧让你从舔狗升华到海王【Python趣味爬虫】

Geek_ac6fb9

后端

Java 多行字符串

HoneyMoose

Sass.vs.Less | 简介之基础语法

Jason199

SASS 8月月更

经验分享|低成本快节奏搭建企业知识管理系统的方法

Baklib

分门别类输入输出,Go lang1.18入门精炼教程,由白丁入鸿儒,go lang基本数据类型和输入输出EP03

刘悦的技术博客

golang 编程 教程 教程分享 golang 面试

超人飞来!Flutter 实现满屏的力量感动画!

岛上码农

flutter ios 移动端开发 安卓开发 8月月更

兼容并蓄广纳百川,Go lang1.18入门精炼教程,由白丁入鸿儒,go lang复合容器类型的声明和使用EP04

刘悦的技术博客

golang go doc 教程 教程分享 golang 面试

低代码实现探索(四十七)低的不止前端,还有后端

零道云-混合式低代码平台

Kubernetes 企业如何落地

CTO技术共享

开源 签约计划第三季 8月月更

文档管理系统对于企业来说有哪些作用?

Baklib

Spring Cloud Stream 消息发送

周杰伦本人

8月月更

RocketMQ 详解系列

牧小农

RocketMQ

MySQL 原理与优化,Group By 优化 技巧

老崔说架构

使用Spark Streaming + Kudu + Impala构建一个预测引擎_语言 & 开发_侠天_InfoQ精选文章