写点什么

Redis,Apache Spark 和 Python 入门

  • 2020-03-01
  • 本文字数:4719 字

    阅读完需:约 15 分钟

Redis,Apache Spark 和 Python 入门

Apache Spark 是创建分布式数据处理管道最受欢迎的框架之一,在这篇博文中,我们将介绍在使用 Spark 时如何结合 Redis 用作计算的数据存储库。Spark 的主要功能是管道(Java,Scala,Python 或 R 脚本)可以在本地(用于开发)和集群上运行,而无需更改任何源代码。


Spark 通过巧妙地使用延迟计算或在某些情况下称为惰性来提供这种灵活性。一切都始于类 RDD,DataFrame 和更新的 Dataset,它们分别是数据的分布式惰性表示。他们使用分布式文件系统,数据库或其他类似服务作为实际的存储后端。他们的操作(例如 map / select,filter / where 和 reduce / groupBy)并没有真正地执行计算。每个操作都会在执行计划中添加一个步骤,该步骤最终会在需要实际结果时再(例如,尝试将其打印到屏幕上时)运行。


在本地启动脚本时,所有计算都在计算机上进行。或者,在分布式集群上启动时,您的数据将被分区到不同的节点上。(在大多数情况下)Spark 集群中并行执行相同的操作。

关于 RDD,DataFrame 和数据集

随着时间的推移,Spark 开发了三种不同的 API 来处理分布式数据集。尽管每个新添加的功能都比以前增加了更多功能,但是没有哪个 API 可以完全替代以前的功能。按创建顺序(从最早到最新),总体概述如下:


  • RDD 提供了用于将编译时类型安全操作应用于数据的底层方法。使用 RDD,您可以在代码中表达“事情如何发生”,而不是采用声明性方法。

  • DataFrame 引入了一种类 SQL 的方法来表达计算(它甚至支持实际的 SQL 查询)。它的声明性语法使 Spark 可以构建优化的查询计划,从而比 RDD 运行更快。

  • 数据集 是 DataFrame for Java Virtual Machine(JVM)语言的改进。它引入了 DataFrame 缺乏的编译时类型安全性,以及行的优化表示形式,从而大大减少了内存使用。由于动态语言(Python,R)具有动态特性,因此它实际上并没有任何作用,因此您仍然可以使用它们(同时在内部重新实现为数据集)使用 DataFrame。

  • 有关更多详细信息,请参阅 Jules Damji 的“ 三个 Apache Spark API 的故事 ”。

关于 spark-redis

spark-redis 是一个开放源代码连接器,可以让您使用 Redis 存储数据。


将 Redis 用作后端的三个主要原因是:


  • DataFrame / set 和 Redis 专用的 RDD: spark-redis 既实现了更通用的接口,又公开了 Redis 闻名的数据结构的 RDD。这意味着您可以非常轻松地在 Redis 上部署现有脚本,并在需要完全控制时使用 Redis 特定的功能。

  • Redis Cluster:连接器使用 Redis Cluster API,并充分利用分片数据库,包括重新分片和故障转移。将数据放在 Redis 集群中,可以极大地提高性能,因为您的管道会增加数据的多个使用者。

  • Redis Streams:Spark Streaming 非常适合新的 Redis Streams 数据结构。Redis Streams 还使用消费组,使您可以优雅地调整并行度。

  • 在本文中,我们将重点介绍 Python 入门以及如何使用 DataFrame API。在撰写本文时,可以被视为 Spark 的“本机”语言的 Scala 可以访问集成的一些更高级的功能,例如 Redis RDD 和 Streams。由于 Scala 是一种 JVM 语言,因此 Java 也可以使用这些功能。使用 Python,我们需要坚持使用 DataFrames。

配置

我们的第一步是使用 pip 安装 pyspark。您还将需要在计算机上安装 Java 8。


$ pip install pyspark
复制代码


接下来,我们将需要 Maven 构建 spark-redis。您可以从官方网站或使用软件包管理器(例如 macOS 上的自制软件)来获取它。


从 GitHub 下载 spark-redis(git clone 或以 zip 下载),并使用 Maven 进行构建。


$ cd spark-redis$ mvnclean package -DskipTests
复制代码


在里面 target/子目录中,您将找到已编译的 jar 文件。


需要一个正在运行的 Redis 服务器进行连接。您可以通过多种方式下载它:从官方网站上的软件包管理器(apt-get 或者 brew install redis)或 Docker Hub。


一旦启动并运行,就可以启动 pyspark。请注意,修改 VERSION 值成您需要从 GitHub 下载的版本。


$ pyspark –jars target /spark-redis-VERSION-jar-with-dependencies.jar
复制代码


如果您的 Redis 服务器运行在容器中或已启用身份验证,则将这些开关添加到上一次调用中(并更改值以适合您的情况)。


–conf“ spark.redis.host =localhost” –conf“ spark.redis.port = 6379” –conf“ spark.redis.auth = passwd”
复制代码

使用样本数据集运行

现在我们有了一个可以正常工作的 pyspark shell,可以将数据存储在 Redis 上,让我们来运行这个著名的 people 数据集。

开始

下载 TSV 文件后,让我们将其作为 Spark DataFrame 加载。


>>> full_df =  spark.read.csv("pantheon.tsv", sep="\t",  quote="", header=True, inferSchema=True)    >>> full_df.dtypes    [('en_curid', 'int'), ('name', 'string'), ('numlangs',  'int'), ('birthcity', 'string'), ('birthstate', 'string'), ('countryName',  'string'), ('countryCode', 'string'), ('countryCode3', 'string'), ('LAT',  'double'), ('LON', 'double'), ('continentName', 'string'), ('birthyear',  'string'), ('gender', 'string'), ('occupation', 'string'), ('industry',  'string'), ('domain', 'string'), ('TotalPageViews', 'int'), ('L_star',  'double'), ('StdDevPageViews', 'double'), ('PageViewsEnglish', 'int'),  ('PageViewsNonEnglish', 'int'), ('AverageViews', 'double'), ('HPI',  'double')]
复制代码


现在,调用.dtypes 显示数据集中所有列(和相对类型)的列表。在此数据集中,有许多事情可能值得研究,但出于本示例的目的,让我们着重于为每个国家/地区查找到名人最常见的职业。


让我们从仅保留与我们的目标相关的列开始。


>>> data =  full_df.select("en_curid", "countryCode",  "occupation")    >>> data.show(2)    +--------+-----------+-----------+    |en_curid|countryCode| occupation|+--------+-----------+-----------+    |     307|         US| POLITICIAN||     308|         GR|PHILOSOPHER|+--------+-----------+-----------+    only showing top 2 rows
复制代码


这将创建原始 DataFrame 的副本,该副本仅包含三列:每个人的唯一 ID,他们的国家和他们的职业。


我们首先下载了一个小的数据集,但在现实生活中,如果您使用的是 Spark,则该数据集可能会更大并且可以远程托管。出于这个原因,让我们通过将数据加载到 Redis 来使下一步变得更加现实:


>>> data.write.format("org.apache.spark.sql.redis").option("table","people").option("key.column", "en_curid").save()
复制代码


此命令会将我们的数据集加载到 Redis 中。我们现在将看到,我们指定的两个选项有助于定义 Redis 中的数据布局。

Redis 上的 DataFrames

让我们使用 redis-cli 查询下,看看 DataFrame 是如何存储在 Redis 上的:


$ redis-cli> SCAN 0 MATCH people:* COUNT 31) "2048"2) 1) "people:2113653"2)"people:44849"3)"people:399280"4)"people:101393"
复制代码


SCAN 查询出我们加载到 Redis 中的 Key 和数据。您可以立即看到我们之前给出的选项如何用于定义键名:


“table”, “people” 为表示此 DataFrame 的键定义一个公共前缀,并且


“ key.column”, “ en_curid” 为我们的 DataFrame 定义了主键。


让我们随机取一个 key 看一下具体的内容:


> HGETALL people:21136531) "countryCode"2) "DE"3) "occupation"4) "SOCCER PLAYER"
复制代码


现在,我们已经了解了数据如何存储在 Redis 上,让我们跳回到 pyspark,看看我们如何实际编写管道获取每个国家的名人最常见的职业。如您所见,DataFrame 的每一行都变成了 Redis 哈希结构,其中包含 countryCode 和 occupation。如前所述,en_curid 用作主键,因此它成为键名的一部分。


从 Redis DataFrame 执行计算


即使我们应该将数据仍然加载到内存中,也可以从 Redis 加载它,以便编写与您在现实生活中将要执行的操作更相似的代码。


>>> df = spark.read.format("org.apache.spark.sql.redis").option("table", "people").option("key.column", "en_curid").load()>>> df.show(2)+--------+-----------+----------+|en_curid|countryCode|occupation|+--------+-----------+----------+|  915950|         ZW|   SWIMMER||  726159|         UY|POLITICIAN|+--------+-----------+----------+only showing top 2 rows
复制代码


这就是您的 Spark 管道开始的方式,因此让我们最后进行计算!


>>> counts = df.groupby("countryCode", "occupation").agg({"en_curid": "count"})>>> counts.show(2)+-----------+-------------+---------------+|countryCode|   occupation|count(en_curid)|+-----------+-------------+---------------+|         FR|MATHEMATICIAN|             34||         IT|SOCCER PLAYER|             81|+-----------+-------------+---------------+only showing top 2 rows
复制代码


现在,每一行代表所有当前(国家,职业)组合的计数。下一步,我们只需要选择每个国家/地区计数最高的职业。


首先,导入所需的一些新模块,然后使用 windows 定义代码以选择最常见的职业:


>>> from pyspark.sql.window import Window>>> from pyspark.sql.functions import count, col, row_number>>> w = Window().partitionBy("countryCode").orderBy(col("count(en_curid)").desc())>>> result = counts.withColumn("rn", row_number().over(w)).where(col("rn") == 1).select("countryCode", "occupation")>>> result.show(5)+-----------+-------------+|countryCode|   occupation|+-----------+-------------+|         DZ|   POLITICIAN||         LT|   POLITICIAN||         MM|   POLITICIAN||         CI|SOCCER PLAYER||         AZ|   POLITICIAN|+-----------+-------------+only showing top 5 rows
复制代码


此代码将原始行分组为 countryCode,按 count(en_curid)在降序排列每个组的内容,并且仅采用第一个元素。如您所见,在这个小样本中,政客似乎是最常见的职业。


让我们看看有多少个国家是这样:


>>> result.where(col("occupation") == "POLITICIAN").count()150
复制代码


哇,考虑到今天全球有 195 个国家,这真是很多。现在,让我们将其余国家/地区保存在 Redis 中:


>>>no_pol = result.where(col("occupation") != "POLITICIAN")>>> no_pol.write.format("org.apache.spark.sql.redis").option("table","occupation").option("key.column","countryCode").save()
复制代码


如果现在进入 redis-cli,您将能够看到新数据:


$ redis-cli> HGETALL occupation:IT1) "occupation"2) "RELIGIOUS FIGURE"> HGETALL occupation:US1) "occupation"2) "ACTOR"
复制代码


如果您想练习更多,请检查原始数据集,看看是否发现其他引起您兴趣的细节。

关于 Spark 数据类型和 Redis 集群 API 的补充

值得重申的非常重要的一点是,RDD 或 DataFrame / set 对象上的每个操作都将分布在多个节点上。如果我们的例子不仅仅涉及名人,那么一开始我们将有数千万行。在这种情况下,Spark 将扩展计算。但是,如果只有一个 Redis 实例,则将有 N 个节点在其上运行,很可能会成为网络带宽的瓶颈。


为了充分利用 Redis,您需要使用 Redis Cluster API 适当地扩展它。这将确保您的所有计算节点在读取时不会得不到数据,在写入时不会阻塞。

结论

在本文中,我们探讨了如何下载,编译和部署 spark-redis,以便将 Redis 用作 Spark DataFrames 的后端。Redis 提供了对 DataFrame API 的全面支持,因此移植现有脚本并开始享受 Redis 提供的更高速度应该非常容易。如果您想了解更多信息,请查看 GitHub 上关于 spark-redis 的文档。


本文转载自 中间件小哥 公众号。


原文链接:https://mp.weixin.qq.com/s/GOu3EQyqfG3vRqy9mdHKyw


2020-03-01 21:431289

评论

发布
暂无评论
发现更多内容

如何在游戏中快速集成聊天功能

LeanCloud

游戏开发 即时通讯 聊天室 sdk

keras深度学习框架

AI_robot

tensorflow实现cifar10彩色图像多类别分类

AI_robot

一个数组通过配置随机抽取组成小数组

waitmoon

Java

区块链底层Baas平台搭建,区块链政务底层平台开发

阿里天猫3面(Java研发):GC回收+Redis Hash算法+架构部署+秒杀等

钟奕礼

Java 编程 程序员 架构 面试

最全Java架构师技能树:Java编程+网络+设计模式+数据库+分布式

钟奕礼

Java 编程 程序员 架构 面试

tensorflow实现低分辨率灰度图像分类算法

AI_robot

NodeJs中Buffer与Stream理解

小风以北

stream 原理 Node buffer

【译】ECMAScript 2021: 最终功能集确定

清秋

JavaScript ecmascript 翻译 ES6 新闻

推荐16款强大的Twitter视频下载器(2021精选)

科技猫

twitter 软件 网站 分享 视频下载

tensorflow实现两种图像风格融合 即神经风格迁移

AI_robot

Android 高通Camx架构学习 - 第1章

小驰笔记

android 音视频 camera 引航计划

tensorflow实现像素级图像分割算法

AI_robot

寻找音乐API接入正版音乐曲库?了解HIFIVE音乐开放平台!

HIFIVE音加加

音乐api 正版曲库 音乐sdk

UUID不失精度,长度改进

waitmoon

Java uuid

tensorflow实现CNN模型垃圾分类算法

AI_robot

云图说|将源端MongoDB业务搬迁至华为云DDS的几种方式

华为云开发者联盟

mongodb 数据迁移 华为云文档数据库服务 DDS 文档数据库

专访孙立坚:印度经济发展实力几何 ?

了了Vita

这可能是全网关于Camera慢动作录像(SlowMotion)介绍最全的文章了

小驰笔记

android 音视频 camera 引航计划

深入分析小程序运行环境框架原理

小风以北

小程序 编译原理 框架 工作原理

最新阿里蚂蚁金服四面(已拿offer)Java技术面经总结

钟奕礼

Java 编程 程序员 架构 面试

《月亮与六便士》:给你500万,你会用它买套房子还是周游世界?

了了Vita

iOS开发:git上传代码到开源中国的步骤,以及pod的更新方法

花花

ios

阿里巴巴研究员叔同:云原生是企业数字创新的最短路径

阿里巴巴中间件

云计算 Serverless 容器 云原生 Faas

在项目启动时(无request)获取Tomcat端口号

waitmoon

Java

看完这篇文章,你起码对分析视频卡顿有点思路了

小驰笔记

android 音视频 camera 引航计划

NodeJs 介绍

小风以北

nodejs 新特性

民国最出名的女作家,为什么是她?

了了Vita

Java面试过了京东五面之后,发现掌握了这些技术也没有那么难

钟奕礼

Java 编程 程序员 架构 面试

tensorflow实现深度卷积生成对抗网络(DCGAN)生成手写数字图片

AI_robot

Redis,Apache Spark 和 Python 入门_行业深度_翻译自redis.io_InfoQ精选文章