写点什么

Redis,Apache Spark 和 Python 入门

  • 2020-03-01
  • 本文字数:4719 字

    阅读完需:约 15 分钟

Redis,Apache Spark 和 Python 入门

Apache Spark 是创建分布式数据处理管道最受欢迎的框架之一,在这篇博文中,我们将介绍在使用 Spark 时如何结合 Redis 用作计算的数据存储库。Spark 的主要功能是管道(Java,Scala,Python 或 R 脚本)可以在本地(用于开发)和集群上运行,而无需更改任何源代码。


Spark 通过巧妙地使用延迟计算或在某些情况下称为惰性来提供这种灵活性。一切都始于类 RDD,DataFrame 和更新的 Dataset,它们分别是数据的分布式惰性表示。他们使用分布式文件系统,数据库或其他类似服务作为实际的存储后端。他们的操作(例如 map / select,filter / where 和 reduce / groupBy)并没有真正地执行计算。每个操作都会在执行计划中添加一个步骤,该步骤最终会在需要实际结果时再(例如,尝试将其打印到屏幕上时)运行。


在本地启动脚本时,所有计算都在计算机上进行。或者,在分布式集群上启动时,您的数据将被分区到不同的节点上。(在大多数情况下)Spark 集群中并行执行相同的操作。

关于 RDD,DataFrame 和数据集

随着时间的推移,Spark 开发了三种不同的 API 来处理分布式数据集。尽管每个新添加的功能都比以前增加了更多功能,但是没有哪个 API 可以完全替代以前的功能。按创建顺序(从最早到最新),总体概述如下:


  • RDD 提供了用于将编译时类型安全操作应用于数据的底层方法。使用 RDD,您可以在代码中表达“事情如何发生”,而不是采用声明性方法。

  • DataFrame 引入了一种类 SQL 的方法来表达计算(它甚至支持实际的 SQL 查询)。它的声明性语法使 Spark 可以构建优化的查询计划,从而比 RDD 运行更快。

  • 数据集 是 DataFrame for Java Virtual Machine(JVM)语言的改进。它引入了 DataFrame 缺乏的编译时类型安全性,以及行的优化表示形式,从而大大减少了内存使用。由于动态语言(Python,R)具有动态特性,因此它实际上并没有任何作用,因此您仍然可以使用它们(同时在内部重新实现为数据集)使用 DataFrame。

  • 有关更多详细信息,请参阅 Jules Damji 的“ 三个 Apache Spark API 的故事 ”。

关于 spark-redis

spark-redis 是一个开放源代码连接器,可以让您使用 Redis 存储数据。


将 Redis 用作后端的三个主要原因是:


  • DataFrame / set 和 Redis 专用的 RDD: spark-redis 既实现了更通用的接口,又公开了 Redis 闻名的数据结构的 RDD。这意味着您可以非常轻松地在 Redis 上部署现有脚本,并在需要完全控制时使用 Redis 特定的功能。

  • Redis Cluster:连接器使用 Redis Cluster API,并充分利用分片数据库,包括重新分片和故障转移。将数据放在 Redis 集群中,可以极大地提高性能,因为您的管道会增加数据的多个使用者。

  • Redis Streams:Spark Streaming 非常适合新的 Redis Streams 数据结构。Redis Streams 还使用消费组,使您可以优雅地调整并行度。

  • 在本文中,我们将重点介绍 Python 入门以及如何使用 DataFrame API。在撰写本文时,可以被视为 Spark 的“本机”语言的 Scala 可以访问集成的一些更高级的功能,例如 Redis RDD 和 Streams。由于 Scala 是一种 JVM 语言,因此 Java 也可以使用这些功能。使用 Python,我们需要坚持使用 DataFrames。

配置

我们的第一步是使用 pip 安装 pyspark。您还将需要在计算机上安装 Java 8。


$ pip install pyspark
复制代码


接下来,我们将需要 Maven 构建 spark-redis。您可以从官方网站或使用软件包管理器(例如 macOS 上的自制软件)来获取它。


从 GitHub 下载 spark-redis(git clone 或以 zip 下载),并使用 Maven 进行构建。


$ cd spark-redis$ mvnclean package -DskipTests
复制代码


在里面 target/子目录中,您将找到已编译的 jar 文件。


需要一个正在运行的 Redis 服务器进行连接。您可以通过多种方式下载它:从官方网站上的软件包管理器(apt-get 或者 brew install redis)或 Docker Hub。


一旦启动并运行,就可以启动 pyspark。请注意,修改 VERSION 值成您需要从 GitHub 下载的版本。


$ pyspark –jars target /spark-redis-VERSION-jar-with-dependencies.jar
复制代码


如果您的 Redis 服务器运行在容器中或已启用身份验证,则将这些开关添加到上一次调用中(并更改值以适合您的情况)。


–conf“ spark.redis.host =localhost” –conf“ spark.redis.port = 6379” –conf“ spark.redis.auth = passwd”
复制代码

使用样本数据集运行

现在我们有了一个可以正常工作的 pyspark shell,可以将数据存储在 Redis 上,让我们来运行这个著名的 people 数据集。

开始

下载 TSV 文件后,让我们将其作为 Spark DataFrame 加载。


>>> full_df =  spark.read.csv("pantheon.tsv", sep="\t",  quote="", header=True, inferSchema=True)    >>> full_df.dtypes    [('en_curid', 'int'), ('name', 'string'), ('numlangs',  'int'), ('birthcity', 'string'), ('birthstate', 'string'), ('countryName',  'string'), ('countryCode', 'string'), ('countryCode3', 'string'), ('LAT',  'double'), ('LON', 'double'), ('continentName', 'string'), ('birthyear',  'string'), ('gender', 'string'), ('occupation', 'string'), ('industry',  'string'), ('domain', 'string'), ('TotalPageViews', 'int'), ('L_star',  'double'), ('StdDevPageViews', 'double'), ('PageViewsEnglish', 'int'),  ('PageViewsNonEnglish', 'int'), ('AverageViews', 'double'), ('HPI',  'double')]
复制代码


现在,调用.dtypes 显示数据集中所有列(和相对类型)的列表。在此数据集中,有许多事情可能值得研究,但出于本示例的目的,让我们着重于为每个国家/地区查找到名人最常见的职业。


让我们从仅保留与我们的目标相关的列开始。


>>> data =  full_df.select("en_curid", "countryCode",  "occupation")    >>> data.show(2)    +--------+-----------+-----------+    |en_curid|countryCode| occupation|+--------+-----------+-----------+    |     307|         US| POLITICIAN||     308|         GR|PHILOSOPHER|+--------+-----------+-----------+    only showing top 2 rows
复制代码


这将创建原始 DataFrame 的副本,该副本仅包含三列:每个人的唯一 ID,他们的国家和他们的职业。


我们首先下载了一个小的数据集,但在现实生活中,如果您使用的是 Spark,则该数据集可能会更大并且可以远程托管。出于这个原因,让我们通过将数据加载到 Redis 来使下一步变得更加现实:


>>> data.write.format("org.apache.spark.sql.redis").option("table","people").option("key.column", "en_curid").save()
复制代码


此命令会将我们的数据集加载到 Redis 中。我们现在将看到,我们指定的两个选项有助于定义 Redis 中的数据布局。

Redis 上的 DataFrames

让我们使用 redis-cli 查询下,看看 DataFrame 是如何存储在 Redis 上的:


$ redis-cli> SCAN 0 MATCH people:* COUNT 31) "2048"2) 1) "people:2113653"2)"people:44849"3)"people:399280"4)"people:101393"
复制代码


SCAN 查询出我们加载到 Redis 中的 Key 和数据。您可以立即看到我们之前给出的选项如何用于定义键名:


“table”, “people” 为表示此 DataFrame 的键定义一个公共前缀,并且


“ key.column”, “ en_curid” 为我们的 DataFrame 定义了主键。


让我们随机取一个 key 看一下具体的内容:


> HGETALL people:21136531) "countryCode"2) "DE"3) "occupation"4) "SOCCER PLAYER"
复制代码


现在,我们已经了解了数据如何存储在 Redis 上,让我们跳回到 pyspark,看看我们如何实际编写管道获取每个国家的名人最常见的职业。如您所见,DataFrame 的每一行都变成了 Redis 哈希结构,其中包含 countryCode 和 occupation。如前所述,en_curid 用作主键,因此它成为键名的一部分。


从 Redis DataFrame 执行计算


即使我们应该将数据仍然加载到内存中,也可以从 Redis 加载它,以便编写与您在现实生活中将要执行的操作更相似的代码。


>>> df = spark.read.format("org.apache.spark.sql.redis").option("table", "people").option("key.column", "en_curid").load()>>> df.show(2)+--------+-----------+----------+|en_curid|countryCode|occupation|+--------+-----------+----------+|  915950|         ZW|   SWIMMER||  726159|         UY|POLITICIAN|+--------+-----------+----------+only showing top 2 rows
复制代码


这就是您的 Spark 管道开始的方式,因此让我们最后进行计算!


>>> counts = df.groupby("countryCode", "occupation").agg({"en_curid": "count"})>>> counts.show(2)+-----------+-------------+---------------+|countryCode|   occupation|count(en_curid)|+-----------+-------------+---------------+|         FR|MATHEMATICIAN|             34||         IT|SOCCER PLAYER|             81|+-----------+-------------+---------------+only showing top 2 rows
复制代码


现在,每一行代表所有当前(国家,职业)组合的计数。下一步,我们只需要选择每个国家/地区计数最高的职业。


首先,导入所需的一些新模块,然后使用 windows 定义代码以选择最常见的职业:


>>> from pyspark.sql.window import Window>>> from pyspark.sql.functions import count, col, row_number>>> w = Window().partitionBy("countryCode").orderBy(col("count(en_curid)").desc())>>> result = counts.withColumn("rn", row_number().over(w)).where(col("rn") == 1).select("countryCode", "occupation")>>> result.show(5)+-----------+-------------+|countryCode|   occupation|+-----------+-------------+|         DZ|   POLITICIAN||         LT|   POLITICIAN||         MM|   POLITICIAN||         CI|SOCCER PLAYER||         AZ|   POLITICIAN|+-----------+-------------+only showing top 5 rows
复制代码


此代码将原始行分组为 countryCode,按 count(en_curid)在降序排列每个组的内容,并且仅采用第一个元素。如您所见,在这个小样本中,政客似乎是最常见的职业。


让我们看看有多少个国家是这样:


>>> result.where(col("occupation") == "POLITICIAN").count()150
复制代码


哇,考虑到今天全球有 195 个国家,这真是很多。现在,让我们将其余国家/地区保存在 Redis 中:


>>>no_pol = result.where(col("occupation") != "POLITICIAN")>>> no_pol.write.format("org.apache.spark.sql.redis").option("table","occupation").option("key.column","countryCode").save()
复制代码


如果现在进入 redis-cli,您将能够看到新数据:


$ redis-cli> HGETALL occupation:IT1) "occupation"2) "RELIGIOUS FIGURE"> HGETALL occupation:US1) "occupation"2) "ACTOR"
复制代码


如果您想练习更多,请检查原始数据集,看看是否发现其他引起您兴趣的细节。

关于 Spark 数据类型和 Redis 集群 API 的补充

值得重申的非常重要的一点是,RDD 或 DataFrame / set 对象上的每个操作都将分布在多个节点上。如果我们的例子不仅仅涉及名人,那么一开始我们将有数千万行。在这种情况下,Spark 将扩展计算。但是,如果只有一个 Redis 实例,则将有 N 个节点在其上运行,很可能会成为网络带宽的瓶颈。


为了充分利用 Redis,您需要使用 Redis Cluster API 适当地扩展它。这将确保您的所有计算节点在读取时不会得不到数据,在写入时不会阻塞。

结论

在本文中,我们探讨了如何下载,编译和部署 spark-redis,以便将 Redis 用作 Spark DataFrames 的后端。Redis 提供了对 DataFrame API 的全面支持,因此移植现有脚本并开始享受 Redis 提供的更高速度应该非常容易。如果您想了解更多信息,请查看 GitHub 上关于 spark-redis 的文档。


本文转载自 中间件小哥 公众号。


原文链接:https://mp.weixin.qq.com/s/GOu3EQyqfG3vRqy9mdHKyw


2020-03-01 21:431285

评论

发布
暂无评论
发现更多内容

用 Docker 工具管理 WebAssembly 应用程序

WasmEdge

Docker rust 云原生 webassembly

Watt瓦特系统APP开发搭建

架构实战营 模块七作业

netspecial

架构实战营

多项目并行,项目经理如何有效管理项目进度?

万事ONES

研发管理工具 ONES 项目经理 项目管理工具

华为前端工程师分享:查明网站访问故障原因,教你4招快速应对

华为云开发者联盟

高可用 网站 CDN 云安全 DNS故障

Redisson 分布式锁源码 01:可重入锁加锁

程序员小航

Java redis 源码 分布式锁 redisson

【Flutter 专题】98 易忽略的【小而巧】的技术点汇总 (六)

阿策小和尚

Flutter 小菜 0 基础学习 Flutter Android 小菜鸟 7月日更

数据结构——树和二叉树

若尘

数据结构 二叉树

Go 学习笔记之 命名

架构精进之路

Go 语言 7月日更

如何基于阿里云持久内存实例搭建高性价比Redis应用?

弹性计算百晓生

redis 阿里云 Redis 核心技术与实战 弹性计算

08 | 指针系列(二):记住,指针变量也是变量

Nydia

【LeetCode】雪糕的最大数量Java题解

Albert

算法 LeetCode 7月日更

云小课 | ModelArts Pro 自然语言处理套件:高效构建行业高精度文本处理模型

华为云开发者联盟

AI nlp ModelArts Pro 开发套件 文本处理模型

(VMware)ubuntu 环境下搭建 docker 镜像私服

逸少

Docker 镜像仓库

「项目管理100问」之一篇优秀的周报是怎样炼成的?

万事ONES

项目 周报 ONES

如何用EasyRecovery找回已经删除的图片?

淋雨

EasyRecovery 文件恢复 硬盘数据恢复

从零开始学习3D可视化之事件卸载、事件暂停

ThingJS数字孪生引擎

大前端 3D可视化 数字孪生 事件

.NET CORE 对象池简述

喵叔

7月日更

ES6中扩展运算符的8种用法

devpoint

数组去重 ES6 扩展运算符

阿里+头条+抖音+百度+蚂蚁+京东面经,都是精髓!

欢喜学安卓

android 程序员 面试 移动开发

阿里云ECS Cloudbuild开发者大赛重磅开启!40万奖金燃爆这个夏天!

弹性计算百晓生

云计算 阿里云 开发者大赛

ONES 课堂:敏捷开发和迭代

万事ONES

项目管理 敏捷开发 ONES 迭代

Rust从0到1-Cargo-自定义构建

rust build cargo 构建

「免费开源」基于Vue和Quasar的前端SPA项目crudapi后台管理系统实战之联合索引(十一)

crudapi

Vue crud crudapi quasar 联合索引

阿里+头条+腾讯等大厂Android面试题分享,神操作!

欢喜学安卓

android 程序员 面试 移动开发

Java零基础学习路线图(2021版)

Java入门到架构

Java 书籍

流量为王时代的短视频平台如何确保内容质量?|【话题讨论】

老猿Python

技术 内容审核 流量为王 负能量

Ant蚂蚁挖矿系统软件开发资料

Pi network/π币系统APP软件开发搭建

对象存储手把手教一 | 用户数据访问控制管理ACL

QingStor分布式存储

云原生 对象存储 分布式存储

JAVA 设计模式系列——工厂模式

加百利

7月日更

Redis,Apache Spark 和 Python 入门_行业深度_翻译自redis.io_InfoQ精选文章