写点什么

用 Apache Spark 进行大数据处理——第二部分:Spark SQL

  • 2015-06-12
  • 本文字数:3896 字

    阅读完需:约 13 分钟

在 Apache Spark 文章系列的前一篇文章中,我们学习了什么是Apache Spark 框架,以及如何用该框架帮助组织处理大数据处理分析的需求。

Spark SQL,作为 Apache Spark 大数据框架的一部分,主要用于结构化数据处理和对 Spark 数据执行类 SQL 的查询。通过 Spark SQL,可以针对不同格式的数据执行 ETL 操作(如 JSON,Parquet,数据库)然后完成特定的查询操作。

在这一文章系列的第二篇中,我们将讨论 Spark SQL 库,如何使用 Spark SQL 库对存储在批处理文件、JSON 数据集或 Hive 表中的数据执行 SQL 查询。

Spark 大数据处理框架目前最新的版本是上个月发布的 Spark 1.3。这一版本之前,Spark SQL 模块一直处于“Alpha”状态,现在该团队已经从 Spark SQL 库上将这一标签移除。这一版本中包含了许多新的功能特性,其中一部分如下:

  • 数据框架(DataFrame):Spark 新版本中提供了可以作为分布式 SQL 查询引擎的程序化抽象 DataFrame。
  • 数据源(Data Sources):随着数据源 API 的增加,Spark SQL 可以便捷地处理以多种不同格式存储的结构化数据,如 Parquet,JSON 以及 Apache Avro 库。
  • **JDBC服务器(JDBC Server):** 内置的 JDBC 服务器可以便捷地连接到存储在关系型数据库表中的结构化数据并利用传统的商业智能(BI)工具进行大数据分析。

Spark SQL 组件

使用 Spark SQL 时,最主要的两个组件就是 DataFrame 和 SQLContext。

首先,我们来了解一下 DataFrame。

DataFrame

DataFrame 是一个分布式的,按照命名列的形式组织的数据集合。DataFrame 基于 R 语言中的 data frame 概念,与关系型数据库中的数据库表类似。

之前版本的 Spark SQL API 中的 SchemaRDD 已经更名为 DataFrame。

通过调用将 DataFrame 的内容作为行 RDD(RDD of Rows)返回的 rdd 方法,可以将 DataFrame 转换成 RDD。

可以通过如下数据源创建 DataFrame:

  • 已有的 RDD
  • 结构化数据文件
  • JSON 数据集
  • Hive 表
  • 外部数据库

Spark SQL 和 DataFrame API 已经在下述几种程序设计语言中实现:

本文中所涉及的 Spark SQL 代码示例均使用 Spark Scala Shell 程序。

SQLContext

Spark SQL 提供 SQLContext 封装 Spark 中的所有关系型功能。可以用之前的示例中的现有 SparkContext 创建 SQLContext。下述代码片段展示了如何创建一个 SQLContext 对象。

val sqlContext = new org.apache.spark.sql.SQLContext(sc)此外,Spark SQL 中的 HiveContext 可以提供 SQLContext 所提供功能的超集。可以在用 HiveQL 解析器编写查询语句以及从 Hive 表中读取数据时使用。

在 Spark 程序中使用 HiveContext 无需既有的 Hive 环境。

JDBC 数据源

Spark SQL 库的其他功能还包括数据源,如 JDBC 数据源。

JDBC 数据源可用于通过 JDBC API 读取关系型数据库中的数据。相比于使用 JdbcRDD ,应该将 JDBC 数据源的方式作为首选,因为 JDBC 数据源能够将结果作为 DataFrame 对象返回,直接用 Spark SQL 处理或与其他数据源连接。

Spark SQL 示例应用

在上一篇文章中,我们学习了如何在本地环境中安装 Spark 框架,如何启动 Spark 框架并用 Spark Scala Shell 与其交互。如需安装最新版本的 Spark,可以从 Spark网站下载该软件。

对于本文中的代码示例,我们将使用相同的Spark Shell 执行Spark SQL 程序。这些代码示例适用于Windows 环境。

为了确保Spark Shell 程序有足够的内存,可以在运行spark-shell 命令时,加入driver-memory 命令行参数,如下所示:

spark-shell.cmd --driver-memory 1G### Spark SQL 应用

Spark Shell 启动后,就可以用 Spark SQL API 执行数据分析查询。

在第一个示例中,我们将从文本文件中加载用户数据并从数据集中创建一个 DataFrame 对象。然后运行 DataFrame 函数,执行特定的数据选择查询。

文本文件 customers.txt 中的内容如下:

复制代码
100, John Smith, Austin, TX, 78727
200, Joe Johnson, Dallas, TX, 75201
300, Bob Jones, Houston, TX, 77028
400, Andy Davis, San Antonio, TX, 78227
500, James Williams, Austin, TX, 78727

下述代码片段展示了可以在 Spark Shell 终端执行的 Spark SQL 命令。

复制代码
// 首先用已有的 Spark Context 对象创建 SQLContext 对象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// 导入语句,可以隐式地将 RDD 转化成 DataFrame
import sqlContext.implicits._
// 创建一个表示客户的自定义类
case class Customer(customer_id: Int, name: String, city: String, state: String, zip_code: String)
// 用数据集文本文件创建一个 Customer 对象的 DataFrame
val dfCustomers = sc.textFile("data/customers.txt").map(_.split(",")).map(p => Customer(p(0).trim.toInt, p(1), p(2), p(3), p(4))).toDF()
// 将 DataFrame 注册为一个表
dfCustomers.registerTempTable("customers")
// 显示 DataFrame 的内容
dfCustomers.show()
// 打印 DF 模式
dfCustomers.printSchema()
// 选择客户名称列
dfCustomers.select("name").show()
// 选择客户名称和城市列
dfCustomers.select("name", "city").show()
// 根据 id 选择客户
dfCustomers.filter(dfCustomers("customer_id").equalTo(500)).show()
// 根据邮政编码统计客户数量
dfCustomers.groupBy("zip_code").count().show()

在上一示例中,模式是通过反射而得来的。我们也可以通过编程的方式指定数据集的模式。这种方法在由于数据的结构以字符串的形式编码而无法提前定义定制类的情况下非常实用。

如下代码示例展示了如何使用新的数据类型类 StructType,StringType 和 StructField 指定模式。

复制代码
//
// 用编程的方式指定模式
//
// 用已有的 Spark Context 对象创建 SQLContext 对象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// 创建 RDD 对象
val rddCustomers = sc.textFile("data/customers.txt")
// 用字符串编码模式
val schemaString = "customer_id name city state zip_code"
// 导入 Spark SQL 数据类型和 Row
import org.apache.spark.sql._
import org.apache.spark.sql.types._;
// 用模式字符串生成模式对象
val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
// 将 RDD(rddCustomers)记录转化成 Row。
val rowRDD = rddCustomers.map(_.split(",")).map(p => Row(p(0).trim,p(1),p(2),p(3),p(4)))
// 将模式应用于 RDD 对象。
val dfCustomers = sqlContext.createDataFrame(rowRDD, schema)
// 将 DataFrame 注册为表
dfCustomers.registerTempTable("customers")
// 用 sqlContext 对象提供的 sql 方法执行 SQL 语句。
val custNames = sqlContext.sql("SELECT name FROM customers")
// SQL 查询的返回结果为 DataFrame 对象,支持所有通用的 RDD 操作。
// 可以按照顺序访问结果行的各个列。
custNames.map(t => "Name: " + t(0)).collect().foreach(println)
// 用 sqlContext 对象提供的 sql 方法执行 SQL 语句。
val customersByCity = sqlContext.sql("SELECT name,zip_code FROM customers ORDER BY zip_code")
// SQL 查询的返回结果为 DataFrame 对象,支持所有通用的 RDD 操作。
// 可以按照顺序访问结果行的各个列。
customersByCity.map(t => t(0) + "," + t(1)).collect().foreach(println)

除了文本文件之外,也可以从其他数据源中加载数据,如 JSON 数据文件,Hive 表,甚至可以通过 JDBC 数据源加载关系型数据库表中的数据。

如上所示,Spark SQL 提供了十分友好的 SQL 接口,可以与来自多种不同数据源的数据进行交互,而且所采用的语法也是团队熟知的 SQL 查询语法。这对于非技术类的项目成员,如数据分析师以及数据库管理员来说,非常实用。

总结

本文中,我们了解到 Apache Spark SQL 如何用熟知的 SQL 查询语法提供与 Spark 数据交互的 SQL 接口。Spark SQL 是一个功能强大的库,组织中的非技术团队成员,如业务分析师和数据分析师,都可以用 Spark SQL 执行数据分析。

下一篇文章中,我们将讨论可用于处理实时数据或流数据的 Spark Streaming 库。Spark Streaming 库是任何一个组织的整体数据处理和管理生命周期中另外一个重要的组成部分,因为流数据处理可为我们提供对系统的实时观察。这对于欺诈检测、在线交易系统、事件处理解决方案等用例来说至关重要。

参考文献

关于作者

Srini Penchikala目前是一家金融服务机构的软件架构师,这个机构位于德克萨斯州的奥斯汀。他在软件系统架构、设计和开发方面有超过 20 年的经验。Srini 目前正在撰写一本关于 NoSQL 数据库模式的书。他还是曼宁出版社出版的《Spring Roo in Action》一书的合著者( http://www.manning.com/SpringRooinAction)。他还曾经出席各种会议,如 JavaOne,SEI Architecture Technology Conference(SATURN),IT Architect Conference(ITARC),No Fluff Just Stuff,NoSQL Now 和 Project World Conference 等。Srini 还在 InfoQ,The ServerSide,OReilly Network(ONJava),DevX Java,java.net 以及 JavaWorld 等网站上发表过很多关于软件系统架构、安全和风险管理以及 NoSQL 数据库等方面的文章。他还是 InfoQ NoSQL 数据库社区的责任编辑( http://www.infoq.com/author/Srini-Penchikala)。

查看英文原文: Big Data Processing with Apache Spark - Part 2: Spark SQL

2015-06-12 03:4253959
用户头像

发布了 75 篇内容, 共 63.8 次阅读, 收获喜欢 6 次。

关注

评论

发布
暂无评论
发现更多内容

软件测试 | 测试开发 | 测试人生 | 00后拿下了名企大厂 offer,这个后浪学习之路全公开

测吧(北京)科技有限公司

软件测试 测试

软件测试 | 测试开发 | 测试人生 | 双非学历,从外包到某大厂只用了1年时间,在2线城市年薪近30万,我柠檬了......

测吧(北京)科技有限公司

软件测试 | 测试开发 | 测试人生 | 97年双非学历的小哥哥,2线城市涨薪100%,我酸了......

测吧(北京)科技有限公司

软件测试 测试

英特尔将推出第四代至强可扩展服务器,为高性能计算、人工智能和网络提供全方位加速服务

科技之家

Meta公司内部项目-RaptorX:将Presto性能提升10倍

Alluxio

presto Alluxio #Facebook meta 9月月更

软件测试 | 测试开发 | 多种框架小程序测试环境构建总结

测吧(北京)科技有限公司

软件测试 测试

软件测试 | 测试开发 | 测试人生 | 从传统行业到名企大厂,薪资翻倍,我做到了

测吧(北京)科技有限公司

软件测试 测试

NFT拍卖交易系统开发NFT商城

薇電13242772558

NFT

软件测试 | 测试开发 | 接口测试实战 | Android 高版本无法抓取 HTTPS,怎么办?

测吧(北京)科技有限公司

https 测试 自动化测试

软件测试 | 测试开发 | 测试人生 | 年薪50w+ 并入职名企大厂,这是双非学历小哥哥给自己30岁的礼物

测吧(北京)科技有限公司

软件测试 测试

软件测试中的『草莓酱定律』

BY林子

敏捷测试 草莓酱定律 温伯格

IDaaS 系统ArkID一账通内置插件:图形验证码认证因素的配置流程

龙归科技

单点登录 Idaas

软件测试 | 测试开发 | Linux下的Nginx内存泄露定位

测吧(北京)科技有限公司

nginx Liunx 测试开发

软件测试 | 测试开发 | Jenkins job 机制该如何使用?

测吧(北京)科技有限公司

测试 测试工程师

软件测试 | 测试开发 | Redis Zset Score精度问题

测吧(北京)科技有限公司

redis 软件测试 测试

内存管理:程序是如何被优雅的装载到内存中

C++后台开发

内存管理 Linux内核 内核源码 内核开发 嵌入式开发

软件测试 | 测试开发 | 测试人生 | 毕业2年,拒绝独角兽入职名企大厂涨薪10万+,这个95后小姐姐好飒

测吧(北京)科技有限公司

测试

软件测试 | 测试开发 | 测试人生 | 三十而立终圆大厂梦,测试开发开启新征程

测吧(北京)科技有限公司

软件测试 测试 测试开发

软件测试 | 测试开发 | Python数据驱动测试 unittest+ddt

测吧(北京)科技有限公司

Python 软件测试

MobLink Android端业务场景简单说明

MobTech袤博科技

android 开发者

软件测试 | 测试开发 | Hybird app开发入门之Native和H5页面交互原理

测吧(北京)科技有限公司

软件测试

软件测试 | 测试开发 | ADBLib 在 android 中的使用

测吧(北京)科技有限公司

android 测试

软件测试 | 测试开发 | 测试人生 | 双非院校跨城重新开始,薪资翻了2倍还多,这个90后小姐姐好飒

测吧(北京)科技有限公司

测试 测试工程师

软件测试 | 测试开发 | Jenkins 持续集成体系介绍

测吧(北京)科技有限公司

软件测试 测试

软件测试 | 测试开发 | 高性能高维向量的KNN搜索方案

测吧(北京)科技有限公司

软件测试 测试

软件测试 | 测试开发 | Android 10 来袭

测吧(北京)科技有限公司

android Android开发

赋能企业敏捷开发的低代码平台

力软低代码开发平台

软件测试 | 测试开发 | 浅谈测试需求分析

测吧(北京)科技有限公司

测试 需求 用户需求分析

软件测试 | 测试开发 | 测试人生 | 拿到多个 offer 从了一线互联网公司并涨薪70%,90后小哥哥免费分享面试经验~

测吧(北京)科技有限公司

软件测试 测试 offer

软件测试 | 测试开发 | 测试人生 | 双非院校、入职某知名电商公司薪资翻倍还有股票奖励,这个90后小姐姐也太飒了吧?

测吧(北京)科技有限公司

测试 面试题 软件测试面试题

软件测试 | 测试开发 | 从跨专业手工测试转岗外包,再到 Python 测试开发,跳槽涨薪 85%!

测吧(北京)科技有限公司

软件测试 测试

用Apache Spark进行大数据处理——第二部分:Spark SQL_DevOps & 平台工程_Srini Penchikala_InfoQ精选文章