写点什么

Redis,Apache Spark 和 Python 入门

  • 2020-03-01
  • 本文字数:4719 字

    阅读完需:约 15 分钟

Redis,Apache Spark 和 Python 入门

Apache Spark 是创建分布式数据处理管道最受欢迎的框架之一,在这篇博文中,我们将介绍在使用 Spark 时如何结合 Redis 用作计算的数据存储库。Spark 的主要功能是管道(Java,Scala,Python 或 R 脚本)可以在本地(用于开发)和集群上运行,而无需更改任何源代码。


Spark 通过巧妙地使用延迟计算或在某些情况下称为惰性来提供这种灵活性。一切都始于类 RDD,DataFrame 和更新的 Dataset,它们分别是数据的分布式惰性表示。他们使用分布式文件系统,数据库或其他类似服务作为实际的存储后端。他们的操作(例如 map / select,filter / where 和 reduce / groupBy)并没有真正地执行计算。每个操作都会在执行计划中添加一个步骤,该步骤最终会在需要实际结果时再(例如,尝试将其打印到屏幕上时)运行。


在本地启动脚本时,所有计算都在计算机上进行。或者,在分布式集群上启动时,您的数据将被分区到不同的节点上。(在大多数情况下)Spark 集群中并行执行相同的操作。

关于 RDD,DataFrame 和数据集

随着时间的推移,Spark 开发了三种不同的 API 来处理分布式数据集。尽管每个新添加的功能都比以前增加了更多功能,但是没有哪个 API 可以完全替代以前的功能。按创建顺序(从最早到最新),总体概述如下:


  • RDD 提供了用于将编译时类型安全操作应用于数据的底层方法。使用 RDD,您可以在代码中表达“事情如何发生”,而不是采用声明性方法。

  • DataFrame 引入了一种类 SQL 的方法来表达计算(它甚至支持实际的 SQL 查询)。它的声明性语法使 Spark 可以构建优化的查询计划,从而比 RDD 运行更快。

  • 数据集 是 DataFrame for Java Virtual Machine(JVM)语言的改进。它引入了 DataFrame 缺乏的编译时类型安全性,以及行的优化表示形式,从而大大减少了内存使用。由于动态语言(Python,R)具有动态特性,因此它实际上并没有任何作用,因此您仍然可以使用它们(同时在内部重新实现为数据集)使用 DataFrame。

  • 有关更多详细信息,请参阅 Jules Damji 的“ 三个 Apache Spark API 的故事 ”。

关于 spark-redis

spark-redis 是一个开放源代码连接器,可以让您使用 Redis 存储数据。


将 Redis 用作后端的三个主要原因是:


  • DataFrame / set 和 Redis 专用的 RDD: spark-redis 既实现了更通用的接口,又公开了 Redis 闻名的数据结构的 RDD。这意味着您可以非常轻松地在 Redis 上部署现有脚本,并在需要完全控制时使用 Redis 特定的功能。

  • Redis Cluster:连接器使用 Redis Cluster API,并充分利用分片数据库,包括重新分片和故障转移。将数据放在 Redis 集群中,可以极大地提高性能,因为您的管道会增加数据的多个使用者。

  • Redis Streams:Spark Streaming 非常适合新的 Redis Streams 数据结构。Redis Streams 还使用消费组,使您可以优雅地调整并行度。

  • 在本文中,我们将重点介绍 Python 入门以及如何使用 DataFrame API。在撰写本文时,可以被视为 Spark 的“本机”语言的 Scala 可以访问集成的一些更高级的功能,例如 Redis RDD 和 Streams。由于 Scala 是一种 JVM 语言,因此 Java 也可以使用这些功能。使用 Python,我们需要坚持使用 DataFrames。

配置

我们的第一步是使用 pip 安装 pyspark。您还将需要在计算机上安装 Java 8。


$ pip install pyspark
复制代码


接下来,我们将需要 Maven 构建 spark-redis。您可以从官方网站或使用软件包管理器(例如 macOS 上的自制软件)来获取它。


从 GitHub 下载 spark-redis(git clone 或以 zip 下载),并使用 Maven 进行构建。


$ cd spark-redis$ mvnclean package -DskipTests
复制代码


在里面 target/子目录中,您将找到已编译的 jar 文件。


需要一个正在运行的 Redis 服务器进行连接。您可以通过多种方式下载它:从官方网站上的软件包管理器(apt-get 或者 brew install redis)或 Docker Hub。


一旦启动并运行,就可以启动 pyspark。请注意,修改 VERSION 值成您需要从 GitHub 下载的版本。


$ pyspark –jars target /spark-redis-VERSION-jar-with-dependencies.jar
复制代码


如果您的 Redis 服务器运行在容器中或已启用身份验证,则将这些开关添加到上一次调用中(并更改值以适合您的情况)。


–conf“ spark.redis.host =localhost” –conf“ spark.redis.port = 6379” –conf“ spark.redis.auth = passwd”
复制代码

使用样本数据集运行

现在我们有了一个可以正常工作的 pyspark shell,可以将数据存储在 Redis 上,让我们来运行这个著名的 people 数据集。

开始

下载 TSV 文件后,让我们将其作为 Spark DataFrame 加载。


>>> full_df =  spark.read.csv("pantheon.tsv", sep="\t",  quote="", header=True, inferSchema=True)    >>> full_df.dtypes    [('en_curid', 'int'), ('name', 'string'), ('numlangs',  'int'), ('birthcity', 'string'), ('birthstate', 'string'), ('countryName',  'string'), ('countryCode', 'string'), ('countryCode3', 'string'), ('LAT',  'double'), ('LON', 'double'), ('continentName', 'string'), ('birthyear',  'string'), ('gender', 'string'), ('occupation', 'string'), ('industry',  'string'), ('domain', 'string'), ('TotalPageViews', 'int'), ('L_star',  'double'), ('StdDevPageViews', 'double'), ('PageViewsEnglish', 'int'),  ('PageViewsNonEnglish', 'int'), ('AverageViews', 'double'), ('HPI',  'double')]
复制代码


现在,调用.dtypes 显示数据集中所有列(和相对类型)的列表。在此数据集中,有许多事情可能值得研究,但出于本示例的目的,让我们着重于为每个国家/地区查找到名人最常见的职业。


让我们从仅保留与我们的目标相关的列开始。


>>> data =  full_df.select("en_curid", "countryCode",  "occupation")    >>> data.show(2)    +--------+-----------+-----------+    |en_curid|countryCode| occupation|+--------+-----------+-----------+    |     307|         US| POLITICIAN||     308|         GR|PHILOSOPHER|+--------+-----------+-----------+    only showing top 2 rows
复制代码


这将创建原始 DataFrame 的副本,该副本仅包含三列:每个人的唯一 ID,他们的国家和他们的职业。


我们首先下载了一个小的数据集,但在现实生活中,如果您使用的是 Spark,则该数据集可能会更大并且可以远程托管。出于这个原因,让我们通过将数据加载到 Redis 来使下一步变得更加现实:


>>> data.write.format("org.apache.spark.sql.redis").option("table","people").option("key.column", "en_curid").save()
复制代码


此命令会将我们的数据集加载到 Redis 中。我们现在将看到,我们指定的两个选项有助于定义 Redis 中的数据布局。

Redis 上的 DataFrames

让我们使用 redis-cli 查询下,看看 DataFrame 是如何存储在 Redis 上的:


$ redis-cli> SCAN 0 MATCH people:* COUNT 31) "2048"2) 1) "people:2113653"2)"people:44849"3)"people:399280"4)"people:101393"
复制代码


SCAN 查询出我们加载到 Redis 中的 Key 和数据。您可以立即看到我们之前给出的选项如何用于定义键名:


“table”, “people” 为表示此 DataFrame 的键定义一个公共前缀,并且


“ key.column”, “ en_curid” 为我们的 DataFrame 定义了主键。


让我们随机取一个 key 看一下具体的内容:


> HGETALL people:21136531) "countryCode"2) "DE"3) "occupation"4) "SOCCER PLAYER"
复制代码


现在,我们已经了解了数据如何存储在 Redis 上,让我们跳回到 pyspark,看看我们如何实际编写管道获取每个国家的名人最常见的职业。如您所见,DataFrame 的每一行都变成了 Redis 哈希结构,其中包含 countryCode 和 occupation。如前所述,en_curid 用作主键,因此它成为键名的一部分。


从 Redis DataFrame 执行计算


即使我们应该将数据仍然加载到内存中,也可以从 Redis 加载它,以便编写与您在现实生活中将要执行的操作更相似的代码。


>>> df = spark.read.format("org.apache.spark.sql.redis").option("table", "people").option("key.column", "en_curid").load()>>> df.show(2)+--------+-----------+----------+|en_curid|countryCode|occupation|+--------+-----------+----------+|  915950|         ZW|   SWIMMER||  726159|         UY|POLITICIAN|+--------+-----------+----------+only showing top 2 rows
复制代码


这就是您的 Spark 管道开始的方式,因此让我们最后进行计算!


>>> counts = df.groupby("countryCode", "occupation").agg({"en_curid": "count"})>>> counts.show(2)+-----------+-------------+---------------+|countryCode|   occupation|count(en_curid)|+-----------+-------------+---------------+|         FR|MATHEMATICIAN|             34||         IT|SOCCER PLAYER|             81|+-----------+-------------+---------------+only showing top 2 rows
复制代码


现在,每一行代表所有当前(国家,职业)组合的计数。下一步,我们只需要选择每个国家/地区计数最高的职业。


首先,导入所需的一些新模块,然后使用 windows 定义代码以选择最常见的职业:


>>> from pyspark.sql.window import Window>>> from pyspark.sql.functions import count, col, row_number>>> w = Window().partitionBy("countryCode").orderBy(col("count(en_curid)").desc())>>> result = counts.withColumn("rn", row_number().over(w)).where(col("rn") == 1).select("countryCode", "occupation")>>> result.show(5)+-----------+-------------+|countryCode|   occupation|+-----------+-------------+|         DZ|   POLITICIAN||         LT|   POLITICIAN||         MM|   POLITICIAN||         CI|SOCCER PLAYER||         AZ|   POLITICIAN|+-----------+-------------+only showing top 5 rows
复制代码


此代码将原始行分组为 countryCode,按 count(en_curid)在降序排列每个组的内容,并且仅采用第一个元素。如您所见,在这个小样本中,政客似乎是最常见的职业。


让我们看看有多少个国家是这样:


>>> result.where(col("occupation") == "POLITICIAN").count()150
复制代码


哇,考虑到今天全球有 195 个国家,这真是很多。现在,让我们将其余国家/地区保存在 Redis 中:


>>>no_pol = result.where(col("occupation") != "POLITICIAN")>>> no_pol.write.format("org.apache.spark.sql.redis").option("table","occupation").option("key.column","countryCode").save()
复制代码


如果现在进入 redis-cli,您将能够看到新数据:


$ redis-cli> HGETALL occupation:IT1) "occupation"2) "RELIGIOUS FIGURE"> HGETALL occupation:US1) "occupation"2) "ACTOR"
复制代码


如果您想练习更多,请检查原始数据集,看看是否发现其他引起您兴趣的细节。

关于 Spark 数据类型和 Redis 集群 API 的补充

值得重申的非常重要的一点是,RDD 或 DataFrame / set 对象上的每个操作都将分布在多个节点上。如果我们的例子不仅仅涉及名人,那么一开始我们将有数千万行。在这种情况下,Spark 将扩展计算。但是,如果只有一个 Redis 实例,则将有 N 个节点在其上运行,很可能会成为网络带宽的瓶颈。


为了充分利用 Redis,您需要使用 Redis Cluster API 适当地扩展它。这将确保您的所有计算节点在读取时不会得不到数据,在写入时不会阻塞。

结论

在本文中,我们探讨了如何下载,编译和部署 spark-redis,以便将 Redis 用作 Spark DataFrames 的后端。Redis 提供了对 DataFrame API 的全面支持,因此移植现有脚本并开始享受 Redis 提供的更高速度应该非常容易。如果您想了解更多信息,请查看 GitHub 上关于 spark-redis 的文档。


本文转载自 中间件小哥 公众号。


原文链接:https://mp.weixin.qq.com/s/GOu3EQyqfG3vRqy9mdHKyw


2020-03-01 21:431258

评论

发布
暂无评论
发现更多内容

visionOS 专门应用提交数大幅下降;Kimi 不断「吊打」国内各大厂 AI 模型丨 RTE 开发者日报 Vol.180

声网

云行| 算赋山城,天翼云智启数字重庆智慧新篇!

天翼云开发者社区

人工智能 云计算 智算中心

AI 数据观 | 大模型私有化部署落地过程中,那些容易被忽视的“数据集成”难题

tapdata

生成式 AI 应用 大语言模型集成 企业 AI 应用

大型央国企“信创化”与数字化转型的建设思路

优秀

信创 央国企数字化转型

计算机网络协议介绍

京东科技开发者

SD-WAN在金融行业的重要性

Ogcloud

SD-WAN 企业网络 SD-WAN组网 SD-WAN服务商 SDWAN

Edge浏览器兼容性问题如何修复,这篇文章告诉你

霍格沃兹测试开发学社

如何找到香港虚拟主机5元一月的方案?

一只扑棱蛾子

香港虚拟主机

【IJCAI-2018】搜索广告数据探索与可视化

阿里云天池

阿里云

不要在自我提升方面吝啬

老张

自我提升 能力提升

猫头虎博主深度探索:Amazon Q——2023 re:Invent 大会的 AI 革新之星

亚马逊云科技 (Amazon Web Services)

re:Invent 亚马逊云科技 生成式人工智能 Amazon SageMaker Amazon Q

测试领域革新:ChatGPT助你轻松编写测试方案!

测吧(北京)科技有限公司

测试

SD-WAN解决方案七大便利点

Ogcloud

SD-WAN 企业组网 SD-WAN组网 SD-WAN服务商 SDWAN

从基础到高级,带你深入了解和使用curl命令(三)

霍格沃兹测试开发学社

从基础到高级,带你深入了解和使用curl命令(四)

霍格沃兹测试开发学社

小程序技术实践——快速开发适配鸿蒙的App

Geek_2305a8

二手车价格预测第十三名方案总结

阿里云天池

阿里云

真的假的!有个物联网公司通过自建数据库省了 98% 的云成本?!

小猿姐

数据库 云计算 aurora

手把手教你在 Spring Boot 中实现 AOP

Liam

Java 程序员 Spring Boot 后端 aop

低代码开发有哪些优势?

万界星空科技

低代码开发 低代码平台 mes 万界星空科技

人社大赛算法赛题解题思路分享+季军+三马一曹团队

阿里云天池

阿里云

“翼”马当先!混合云价值影响力领导者!

天翼云开发者社区

人工智能 云计算 云服务

SD-WAN为出海电商提供了什么支持

Ogcloud

SD-WAN 企业网络 SD-WAN组网 SD-WAN服务商 SDWAN

天池医疗AI大赛[第一季] Rank5解决方案

阿里云天池

阿里云

物联网中的预测分析:当IoTDA遇上ModelArts

华为云开发者联盟

物联网 华为云 华为云ModelArts 华为云开发者联盟 企业号2024年4月PK榜

EMQX Enterprise 5.6 发布:新增 Amazon S3 数据集成、JSON Schema 验证函数

EMQ映云科技

mqtt emqx mqtt broker

【稳定性】浅谈团队如何做好系统稳定性

京东科技开发者

提高 AI 训练算力效率:蚂蚁 DLRover 故障自愈技术的创新实践

可信AI进展

开源 算力 DLRover

企业上了MES系统后,能有什么好处?

万界星空科技

生产管理系统 mes 万界星空科技

零基础入门NLP - 新闻文本分类比赛方案分享 nano- Rank1

阿里云天池

阿里云

一本书了解AI的下一个风口:AI Agent

博文视点Broadview

Redis,Apache Spark 和 Python 入门_行业深度_翻译自redis.io_InfoQ精选文章