写点什么

用 Apache Spark 进行大数据处理——第三部分:Spark 流

  • 2016-03-02
  • 本文字数:7677 字

    阅读完需:约 25 分钟

介绍

在“用 Apache Spark 进行大数据处理”系列的前两篇文章中,我们看到了 Apache Spark 框架是什么 (第一部分) 还有如何使用 Spark SQL 库访问数据的 SQL 接口 (第二部分)。
这些方案是基于批处理模式下静态信息处理的,比如作为一个按小时或天运行的任务。但若是在数据驱动的业务决策场景下,当需要飞快地分析实时数据流以执行分析并创建决策支持时,又该如何呢?
使用流式数据处理,一旦数据到达计算就会被实时完成,而非作为批处理任务。实时数据处理与分析正在变为大多数组织的大数据战略中至关重要的一个组件。 在本文中,我们将会学习到如何使用 Apache Spark 中一个被称为 Spark 流的库进行实时数据分析。
我们将会看到一个网络服务器日志分析用例,该用例会向我们展示 Spark 流是如何帮助我们对持续产生的数据流进行分析的。

流数据分析

流数据基本上是一组连续的数据记录,它们通常产生于诸如传感器、服务器流量与在线搜索等数据源。常见的流数据的例子有网站上的用户行为、监控数据、服务器日志与其他事件数据。
流数据处理应用会有助于现场面板、实时在线推荐与即时诈骗检测。
如果我们正在构建一个实时收集、处理与分析流数据的应用,我们需要按照与批处理数据应用不同的设计视角进行考虑。
下面列出了三种不同的流数据处理框架:

在本文中我们将专注于 Spark 流。

Spark 流

Spark 流是核心 Spark API 的扩展。Spark 流使得基于实时数据流构建容错性处理变得更加简单。
下面的图 1 展示了 Spark 流是如何融入到整个 Apache Spark 生态系统中。

(点击放大图像)

图1. 具有Spark 流库的Spark 生态系统

Spark 流工作的方式是将数据流按照预先定义的间隔 (N 秒) 划分为批 (称微批次) 然后将每批数据视为一个弹性分布式数据集 (Resilient Distributed Datasets,RDDs)。随后我们就可以使用诸如 map、reduce、reduceByKey、join 和 window 这样的操作来处理这些 RDDs。这些 RDD 操作的结果会以批的形式返回。通常我们会将这些结果保存到数据存储中以供未来分析并生成报表与面板,或是发送基于事件的预警。
为 Spark 流决定时间间隔是很重要的,这需要基于你的用例与数据处理要求。如果值 N 太低,那么在分析阶段微批次就没有足够的数据以给出有意义的结果。
与 Spark 流相比,其他流处理框架是基于每个事件而非一个微批次来处理数据流的。用微批次的方法,我们可以在同一应用下使用 Spark 流 API 来应用其他 Spark 库 (比如核心、机器学习等)。
流数据可以来源于许多不同的数据源。下面列出一些这样的数据源:

使用诸如 Apache Spark 这种大数据处理框架的另外一个优势就是我们可以在同一系统中组合批处理与流处理。我们也可以在数据流上应用 Spark 的机器学习与图处理算法。在本系列的后续文章当中,我们将会讨论被称为 MLlib GraphX 的机器学习与图处理库。
Spark 流结构如下图 2 所示。

(点击放大图像)

图2.Spark 流如何工作

Spark 流用例

Spark 流正在变为实现实时数据处理与分析方案的首选平台,这些实时数据往往来源于物联网 (Internet of Things,IoT) 和传感器。它被用于各种用例与商业应用。
下面是一些最有趣的 Spark 流用例

  • Uber ,车驾共享服务背后的公司,在他们的持续流式 ETL 管道中使用了 Spark 流以每天从其移动用户处收集 TB 级的事件数据来进行实时遥测分析。
  • Pinterest ,可视化书签工具背后的公司,使用 Spark 流、MemSQL 与 Apache Kafka 技术以实时地深入了解他们全球的用户是怎样使用 Pins 的。
  • Netflix 使用 Kafka 与 Spark 流来构建一个实时在线电影推荐与数据监控解决方案,该方案每天要处理来自于不同数据源的数十亿条事件。

Spark 流其他现实世界的样例还包括:

  • 供应链分析
  • 实时安全情报操作以寻找威胁
  • 广告竞价平台
  • 实时视频分析,以帮助观看者实现个性化与互动体验

让我们看一下 Spark 流的架构与 API 方法。若要编写 Spark 流程序,我们需要知晓两个组件:DStream 与流上下文。

DStream

Dstream (离散流,Discretized Stream,的缩写) 是 Spark 流中最基本的抽象,它描述了一个持续的数据流。DStream 既可以从诸如 Kafka、Flume 与 Kinesis 这样的数据源中创建,也可以对其他 DStream 实施操作。在内部,一个 DStream 被描述为一个 RDD 对象的序列。
与 RDDs 上的转换与动作操作类似,DStream 支持以下操作

  • map
  • flatMap
  • filter
  • count
  • reduce
  • countByValue
  • reduceByKey
  • join
  • updateStateByKey

流上下文

与 Spark 中的 Spark 上下文 (SparkContext) 相似,流上下文 (StreamingContext) 是所有流功能的主入口。
流上下文拥有内置方法可以将流数据接收到 Spark 流程序中。
使用该上下文,我们可以创建一个描述基于 TCP 数据源的流数据的 DStream,可以用主机名与端口号指定 TCP 数据源。比如,如果我们使用像 netcat 这样的工具来测试 Spark 流程序的话,我们将会从运行 netcat 的机器 (比如 localhost) 的 9999 端口上接收到数据流。
当代码被执行,在启动时,Spark 流仅是设置将要执行的计算,此时还没有进行实时处理。在所有的转换都被设置完毕后,为了启动处理,我们最终会调用 start() 方法来启动计算,还有 awaitTermination() 方法来等待计算终结。

Spark 流 API

Spark 流附带了若干个用于处理数据流的 API 方法。有类似于 RDD 的操作,比如 map、flatMap、filter、count、reduce、groupByKey、reduceByKey、sortByKey 和 join。它也提供了其他基于 window 与 stateful 操作的处理流数据的 API。包括 window、countByWindow、reduceByWindow、countByValueAndWindow、reduceByKeyAndWindow 和 updateStateByKey。
Spark 流库当前支持 Scala、Java 和 Python 编程语言。这里是每个语言对应的 Spark 流 API 链接:

Spark 编程的步骤

在我们讨论样例应用之前,先来看看 Spark 流编程中与众不同的步骤:

  • Spark 流上下文被用于处理实时数据流。因此,第一步就是用两个参数初始化流上下文对象,Spark 上下文和切片间隔时间。切片间隔设置了流中我们处理输入数据的更新窗口。一旦上下文被初始化,就无法再向已经存在的上下文中定义或添加新的计算。并且,在同一时间只有一个流上下文对象可以被激活。
  • 当 Spark 流上下文被定义后,我们通过创建输入 DStreams 来指定输入数据源。在我们的样例应用中,输入数据源是一个使用了 Apache Kafka 分布式数据库和消息系统的日志消息生成器。日志生成器程序创建随机日志消息以模拟网络服务器的运行时环境,作为各种网络应用服务用户而产生的流量,日志消息被持续不断地生成。
  • 使用 map 和 reduce 这样的 Spark 流变换 API 为 DStreams 定义计算。
  • 当流计算逻辑被定义好后,我们可以使用先前创建的流上下文对象中的 start 方法来开始接收并处理数据。
  • 最终,我们使用流上下文对象的 awaitTermination 方法等待流数据处理完毕并停止它。

样例应用

在本文中我们讨论的样例应用是一个服务器日志处理与分析程序。它可以被用于对服务器日志进行实时监控并执行基于这些日志的数据分析。这些日志消息被认为是时序数据,也就是由在一个指定时间间隔内所捕捉到的连续度量的数据点组成的序列。
时序数据的例子包括传感器数据、天气信息和点击流数据。时序分析就是处理时序数据以提取有助于制定业务决策的信息。该数据也可以被用于基于历史数据的预测分析。
使用这样的方案,我们不需要每小时或每天的批处理任务来处理服务器日志。Spark 流接收持续产生的数据,对其进行处理并计算日志统计,以此来挖掘数据。
为了遵循服务器日志分析的标准样例,我们将会使用在 Data Bricks Spark 流参考应用中所讨论的 Apache 日志分析器作为我们样例应用的参考。该应用已经具备将在我们的应用中被重用的日志消息解析代码。这个参考应用是一个用来学习 Spark 通用框架以及 Spark 流的优秀资源。
点击他们的网站,以查看更多关于 Databricks Spark 参考应用的细节。

用例

样例应用的用例是一个网络服务器日志分析与统计的生成器。在样例应用中,我们分析网络服务器日志以计算如下统计信息,这些信息有助于进一步的数据分析和报表及面板的创建:

  • 不同 HTTP 响应代码的响应计数
  • 响应内容大小
  • 导致最高网络流量的访问客户端的 IP 地址
  • 最热门的终端 URL 以识别那些比其他服务被访问的更多服务

与本系列的前两篇文章不同,在本文中我们将使用 Java 而非 Scala 来创建 Spark 程序。我们按照独立应用的方式运行程序,而不是在控制台窗口中运行代码。在测试与产品环境中部署 Spark 程序也如此。Shell 控制台接口 (使用 Scala、Python 或 R 语言) 仅仅是用于开发者本地测试而已。

技术

在样例程序中我们将使用如下的技术来演示如何使用 Spark 流库处理实时数据流。

Zookeeper

Zookeeper 是一个为分布式应用提供可靠分布式协调的集中化的服务。Kafka,我们在样例应用中使用的消息系统,依赖于 Zoopkeeper 在整个集群中的详细设置。

Kafka

Apache Kafka 是一个实时的、容错的、可扩展的消息系统,它用于实时地移动数据。对于诸如捕捉网站上用户活动、日志、股票行情数据以及仪表数据这些用例来说,它是一个很好的选择。
Kafka 的工作方式类似于分布式数据库,它是基于被分区和复制的低延迟提交日志的。当我们将一个消息发送给 Kafka,在集群中它会被复制给不同的服务器,与此同时它也会被提交到磁盘。
Apache Kafka 包含客户端 API 以及一个称为 Kafka 连接的数据转换器框架。
Kafka 客户端:Kafka 包括 Java 客户端 (针对消息生产者与消费者)。在我们的样例应用中我们将会使用 Java 生产者客户端 API。
Kafka 连接:Kafka 也包含了 Kafka 连接,即一个介于 Apache Kafka 与外部数据系统之间的流数据框架,它可以支持组织内的数据管道。它包含了导入与导出连接器以将数据集移入或移出 Kafka。Kafka 连接程序可以作为独立进程或分布式服务运行,它支持 REST 接口的方式,即使用 REST API 提交连接器到 Kafka 连接集群。

Spark 流

我们将会使用 Spark 流 Java API 来接收数据流,计算日志统计信息并且运行查询以回答诸如“最多网络请求来自于哪个 IP 地址”这样的问题。
下面的表 1 展示了样例应用中所使用的技术与工具以及他们的版本。

技术

版本

URL

Zookeeper

3.4.6

https://zookeeper.apache.org/doc/r3.4.6/

Kafka

2.10

http://kafka.apache.org/downloads.html

Spark 流

1.4.1

https://spark.apache.org/releases/spark-release-1-4-1.html

JDK

1.7

http://www.oracle.com/technetwork/java/javase/downloads/jdk7-downloads-1880260.html

Maven

3.3.3

http://archive.apache.org/dist/maven/maven-3/3.3.3/

表 1.Spark 流样例应用技术及工具

在图 3 中演示了 Spark 流样例应用中不同架构组件。

(点击放大图像)

图3.Spark 流样例应用架构

Spark 流应用运行时

为了在本地设置 Java 项目,可以从 Github 上下载 Databricks 参考应用代码。一旦获取了参考应用代码,就需要两个额外的 Java 类来运行我们的样例应用。

  • 日志生成器 (SparkStreamingKafkaLogGenerator.java)
  • 日志分析器 (SparkStreamingKafkaLogAnalyzer.java)

在文章网站上提供了这些文件的 zip 压缩包 ( spark-streaming-kafka-sample-app.zip )。如果你想在你本地机器上运行样例应用,使用链接下载 zip 文件,抽出 Java 类并将他们添加到之前步骤中创建的 Java 项目中。
样例应用可以被执行在不同的操作系统上。我在 Windows 和 Linux(CentOS VM) 环境下都运行了应用。
让我们看一下应用架构中的每个组件还有执行 Spark 流程序的步骤。

Zookeeper 命令:

在样例程序中我使用的 Zookeeper 版本是 3.4.6。为了启动服务器,需要设置两个环境变量,JAVA_HOME 与 ZOOKEEPER_HOME 来指定 JDK 和 Zookeeper 各自的安装目录。然后导航到 Zookeeper 的 home 目录并运行如下命令来启动 Zookeeper 服务器。

复制代码
bin\zkServer.cmd

如果你使用的是 Linux 环境,命令就是:

复制代码
bin/zkServer.sh start

Kafka 服务器命令:

在程序中使用的 Kafka 版本是 2.10-0.9.00,基于 Scala2.10 版本。在 Kafka 中所使用的 Scala 版本是非常重要的,因为若是没有使用恰当的版本的话,当执行 Spark 流程序时就会遇到运行时错误。这里是启动 Kafka 服务器实例的步骤:

  • 打开一个新的命令行窗口
  • 设置 JAVA_HONE 与 KAFKA_HOME 环境变量
  • 导航到 Kafka 的 home 目录
  • 运行如下命令```
    bin\windows\kafka-server-start.bat config\server.properties
复制代码
对于 Linux 环境,命令如下:```
bin/kafka-server-start.sh config/server.properties

日志生成器命令:

在我们的样例应用中下一步就是运行消息日志生成器。
日志生成器以不同的 HTTP 响应码 (诸如 200、401 和 404) 及不同的终端 URL 创建测试日志消息。
在我们运行日志生成器之前,我们需要创建一个主题 (Topic),我们可以将消息写到里面去。
与之前的步骤类似,打开一个新的命令行窗口,设置 JAVA_HOME 和 KAFKA_HOME 环境变量,并且导航到 Kafka 的 home 目录。然后首先运行以下命令来查看在 Kafka 服务器中已经存在的可用主题。

复制代码
bin\windows\kafka-run-class.bat kafka.admin.TopicCommand --zookeeper localhost:2181 --list

在 Linux 上:

复制代码
bin/kafka-run-class.sh kafka.admin.TopicCommand --zookeeper localhost:2181 --list

我们将会用以下命令创建一个叫做“spark-streaming-sample-topic”的新主题:

复制代码
bin\windows\kafka-run-class.bat kafka.admin.TopicCommand --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --create --topic spark-streaming-sample-topic

在 Linux 上:

复制代码
bin/kafka-run-class.sh kafka.admin.TopicCommand --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --create --topic spark-streaming-sample-topic

你可以再次运行 list 主题命令以查看是否新主题已经被正确创建。
当主题已经被创建好后,我们就可以运行日志生成器程序了。通过调用称为 SparkStreamingKafkaLogGenerator 的 Java 类来完成此步骤。日志生成器类接收以下四个参数来指定配置参数:

  • 组标识:spark-streaming-sample-group
  • 主题:spark-streaming-sample-topic
  • 迭代次数:50
  • 间隔:1000

打开一个新的命令行窗口来运行日志生成器。我们将要为 JDK、Maven 和 Kafka 目录分别设置三个环境变量 (JAVA_HOME、MAVEN_HOME 和 KAFKA_HOME)。然后导航到样例项目根目录 (比如 c:\dev\projects\spark-streaming-kafka-sample-app) 并运行以下命令。

复制代码
mvn exec:java -Dexec.mainClass=com.sparkstreaming.kafka.example.SparkStreamingKafkaLogGenerator -Dexec.args="spark-streaming-sample-groupid spark-streaming-sample-topic 50 1000"

一旦日志生成器程序运行起来,就应该在控制台上通过 debug 消息看到被创建的测试日志消息。这只是个样例代码,所以日志消息被随机地生成以模拟从诸如网络服务器这种事件源生成的持续不断的数据流。
下面的图 4 展示了日志消息生产者还有正在生成的日志消息截屏。

(点击放大图像)

图4.Spark 流日志生成器程序输出

Spark 流命令:

这是使用了 Spark 流 API 的日志消息消费者。我们使用叫做 SparkStreamingKafkaLogAnalyzer 的 Java 类来从 Kafka 服务器上接收并处理数据流以创建日志统计信息。
Spark 流处理服务器日志消息并生成累计日志统计信息,比如网络请求大小 (最小、最大与平均)、响应代码计数、IP 地址与热点终端。
我们用“local[*]”创建 Spark 上下文,它会在本地系统中检测内核的数量并使用它们运行程序。
为了运行 Spark 流 Java 类,将会在 classpath 中用到以下 JAR 文件:

  • kafka_2.10-0.9.0.0.jar
  • kafka-clients-0.9.0.0.jar
  • metrics-core-2.2.0.jar
  • spark-streaming-kafka_2.10-1.4.0.jar
  • zkclient-0.3.jar

将上述 JAR 文件添加到 classpath 后我用 Eclipse IDE 运行了程序。日志分析 Spark 流程序的输出如图 5。

(点击放大图像)

图5.Spark 流日志分析程序输出

Spark 流应用的可视化

当 Spark 流程序运行的时候,我们可以检查 Spark 控制台来查看 Spark 任务的细节。
打开一个新的网络浏览器窗口并导航到 URL http://localhost:4040 以访问 Spark 控制台。
先看看一些展示 Spark 流程序统计信息的图表。
第一个可视化就是任务的 DAG(无回路有向图,Direct Acyclic Grapg),它展示了我们所运行的程序中不同操作的依赖图,操作有 map、window 和 foreachRDD 等。下面的图 6 展示了我们样例程序中 Spark 流任务的可视化截屏。

(点击放大图像)

图 6.Spark 流任务的可视化图形

我们将要看的下一个图形就是包含了输入比率的流统计图,它显示了每秒的事件数量,以及处理所花费的毫秒数。
图 7 展示了 Spark 流程序执行期间的这些统计信息,左面是流数据还没有产生时的情况,而右边是数据流被发送到 Kafka 并且被 Spark 流消费者处理的情况。

图7. 为样例程序展示流统计信息的Spark 可视化

结论

Spark 流库,Apache Spark 生态系统中的一部分,用于实时流数据的数据处理。在本文中,我们学习了如何使用 Spark 流 API 来处理由服务器日志生成的数据并基于实时数据流执行分析。

下一步是什么

机器学习、预测分析和数据科学在近期都在获得越来越多的关注,他们都是不同用例下的问题解决方案。 Spark MLlib ,Spark 机器学习库,提供了若干内置方法以使用诸如协同过滤、聚簇与归类这样的不同机器学习算法。
在下一篇文章中,我们将会探索 Spark MLlib 并观察几个用例来演示如何利用 Spark 的数据科学计算能力,它可以使机器学习算法的使用变得更加简单。
在本系的后续文章中,我们将看看像 BlinkDB Tachyon 这样的即将到来的框架。

参考

关于作者

Srini Penchikala目前是一家金融服务机构的软件架构师,这个机构位于德克萨斯州的奥斯汀。他在软件系统架构、设计和开发方面有超过 20 年的经验。Srini 目前正在撰写一本关于 NoSQL 数据库模式的书。他还是曼宁出版社出版的《 Spring Roo in Action 》一书的合著者。他还曾经出席各种会议,如 JavaOne,SEI Architecture Technology Conference(SATURN),IT Architect Conference(ITARC),No Fluff Just Stuff,NoSQL Now 和 Project World Conference 等。Srini 还在 InfoQ,The ServerSide,OReilly Network(ONJava),DevX Java,java.net 以及 JavaWorld 等网站上发表过很多关于软件系统架构、安全和风险管理以及 NoSQL 数据库等方面的文章。他还是 InfoQ NoSQL 数据库社区的责任编辑 ( http://www.infoq.com/author/Srini-Penchikala)。%E3%80%82)

查看英文原文 Big Data Processing with Apache Spark - Part 3: Spark Streaming


感谢百占辉对本文的审校。

给InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家通过新浪微博( @InfoQ @丁晓昀),微信(微信号: InfoQChina )关注我们。

2016-03-02 17:1116137

评论

发布
暂无评论
发现更多内容

如何修改已提交commit信息

Appleex

git

golang中的选项模式

六月的

golang 选项模式

golang中的一些实用功能

六月的

golang

golang中经常会犯的一些错误

六月的

golang

goroutine&waitgroup下载文件

六月的

goroutine waitgroup

golang中的errgroup

六月的

golang errgroup

Git的branch操作详解

timerring

git 10月月更 branch

揭秘百度智能测试在测试评估领域实践

百度Geek说

测试 数据 企业号十月 PK 榜

PaddleNLP基于ERNIR3.0文本分类:WOS数据集为例(层次分类)

汀丶人工智能

nlp 文本分类

golang单元测试一(简单函数测试)

六月的

golang 单元测试

golang垃圾回收

六月的

golang 垃圾回收

grpc中的拦截器

六月的

gRPC 拦截器

golang开发一个简单的grpc

六月的

golang gRPC

vue组件通信6种方式总结(常问知识点)

bb_xiaxia1998

Vue

进阶vue面试题总结

bb_xiaxia1998

Vue

go-zero docker-compose 搭建课件服务(九):http统一返回和集成日志服务

六月的

Docker-compose go-zero

100+款AI产品薅羊毛攻略(上)——轻轻松松节省几十万

夏夜许游

人工智能 阿里云 AI 视觉

一键上手时下最火AI作画工具

华为云开发者联盟

人工智能 华为云

golang中的几种并发模式

六月的

golang 并发模式

go-zero docker-compose 搭建课件服务(八):集成jaeger链路追踪

六月的

Docker-compose go-zero

租房小程序使用uniapp展示地图map

源字节1号

小程序开发

etcd实现分布式锁

六月的

分布式锁 etcd

golang的内存管理

六月的

golang 内存管理

golang中的socket编程

六月的

golang socket

CORS跨域

六月的

CORS

grpc错误处理

六月的

gRPC 错误处理

golang中的变量阴影

六月的

golang

固定QPS异步任务功能初探

FunTester

深入浅出redis缓存应用

六月的

redis

vue为什么v-for的优先级比v-if的高?

bb_xiaxia1998

Vue

rabbitmq原理和应用

六月的

Go RabbitMQ

用Apache Spark进行大数据处理——第三部分:Spark流_大数据_Srini Penchikala_InfoQ精选文章