写点什么

Apache Spark 实现可扩展日志分析,挖掘系统最大潜力

  • 2019-05-05
  • 本文字数:13726 字

    阅读完需:约 45 分钟

Apache Spark实现可扩展日志分析,挖掘系统最大潜力

几乎每个大大小小的组织都有多个系统和基础设施日复一日地运行。为了有效地保持业务运行,组织需要知道他们的基础设施是否发挥了最大潜力。这包括分析系统和应用程序日志,甚至可能对日志数据应用预测分析。

引言

现如今,在利用分析的案例中,日志分析是最流行、最有效的企业案例之一。几乎每个大大小小的组织都有多个系统和基础设施日复一日地运行。为了有效地保持业务运行,组织需要知道他们的基础设施是否发挥了最大潜力。这包括分析系统和应用程序日志,甚至可能对日志数据应用预测分析。根据运行在上面的组织基础设施和应用程序的类型,日志数据的数量通常是巨大的。以前,由于计算资源限制,我们只能在一台机器上分析数据样本,这种日子已经一去不复返了。


来源:Doug Henschen)


在大数据、更好的分布式计算、大数据处理和 Spark 等开源分析框架的支持下,我们每天可以对潜在的数百万乃至数十亿条日志消息执行可扩展的日志分析。本教程面向案例研究,目的是采用一种实际操作的方法,展示如何利用 Spark 在半结构化日志数据上执行大规模日志分析。如果你对使用 Spark 的可扩展 SQL 感兴趣,请查阅Spark上的大规模SQL


本文将主要探讨以下几个主题。



尽管有很多优秀的开源框架和工具可以用于日志分析,包括 ElasticSearch,但本教程的目的是展示如何利用 Spark 对日志进行大规模分析。在现实世界中,你可以在分析日志数据时自由选择你的工具箱。让我们开始吧!

主要目标——NASA 日志分析

正如我们前面提到的,Apache Spark 是一个优秀的、理想的开源框架,用于结构化和非结构化数据清理、分析和建模——大规模的!在本教程中,我们的主要目标是关注业界最流行的案例研究之一——日志分析。通常,服务器日志是企业中非常常见的数据源,是常常包含可操作见解和信息的金矿。企业中的日志数据有许多来源,比如 Web、客户端和计算服务器、应用程序、用户生成的内容、平面文件。它们可以用于监视服务器、改进业务和客户智能、构建推荐系统、欺诈检测等等。



Spark 让你能以很低的成本将日志转储并存储在磁盘上的文件中,同时提供丰富的 API 来执行大规模的数据分析。这个实践案例研究将向你展示如何在 NASA 的实际生产日志上使用 Apache Spark,并学习数据清理和探索性数据分析中基本但强大的技术。在本案例研究中,我们将分析来自佛罗里达州 NASA 肯尼迪航天中心 Web 服务器的日志数据集。完整的数据集可以在这里免费下载。


这两个数据集包含了佛罗里达州 NASA 肯尼迪航天中心 WWW 服务器上两个月内的所有 HTTP 请求。你可以到网站下载以下文件(或直接点击以下链接)。



请确保这两个文件与包含教程的笔记本在同一个目录中,该教程可以从我的GitHub上找到。

设置依赖项

第一步是确保你能够访问 Spark 会话和集群。为此,你可以使用自己的本地设置或基于云的设置。通常,现在大多数云平台都会提供一个 Spark 集群,你还可以选择免费的Databricks社区版。本教程假设你已经安装了 Spark,因此我们不会花费额外的时间从头配置或设置 Spark。


通常,在启动你的 jupyter 笔记本服务器时,预配置的 Spark 设置已经预先加载了必要的环境变量或依赖项。在我的例子中,我可以在笔记本中使用以下命令来检查它们。


spark
复制代码



这说明我的集群目前正在运行 Spark 2.4.0。我们还可以使用以下代码检查 sqlContext 是否存在。


sqlContext#Output:<pyspark.sql.context.SQLContext at 0x7fb1577b6400>
复制代码


现在,如果你没有预先配置这些变量并得到一个错误,你可以加载它们并使用以下代码配置它们。除此之外,我们还加载了一些用于处理数据流和正则表达式的其他库。


# configure spark variablesfrom pyspark.context import SparkContextfrom pyspark.sql.context import SQLContextfrom pyspark.sql.session import SparkSession    sc = SparkContext()sqlContext = SQLContext(sc)spark = SparkSession(sc)
# load up other dependenciesimport reimport pandas as pd
复制代码


使用正则表达式将是解析日志文件的主要方面之一。正则表达式是一种非常强大的模式匹配技术,可以用于提取和发现半结构化和非结构化数据中的模式。


来源:xkcd


正则表达式非常有效、非常强大,但有时也会让人不知所措或感到困惑。不过不用担心,通过更多的练习,你可以真正充分地利用它的潜力。下面的示例展示了在 Python 中使用正则表达式的一种方法。


m = re.finditer(r'.*?(spark).*?', "I'm searching for a spark in PySpark", re.I)for match in m:    print(match, match.start(), match.end())
复制代码


<_sre.SRE_Match object; span=(0, 25), match=“I’m searching for a spark”> 0 25

<_sre.SRE_Match object; span=(25, 36), match=’ in PySpark’> 25 36


让我们进入下一部分的分析。

加载和查看 NASA 日志数据集

假设我们的数据存储在下面提到的路径中(以平面文件的形式),让我们将其加载到一个 DataFrame 中。我们将分步骤来做。下面的代码获取磁盘中的日志数据文件名。


import glob
raw_data_files = glob.glob('*.gz')raw_data_files
复制代码


[‘NASA_access_log_Jul95.gz’, ‘NASA_access_log_Aug95.gz’]


现在,我们将使用 sqlContext.read.text()spark.read.text()来读取文本文件。这将生成一个 DataFrame,其中只有一个名为 value 的字符串列。


base_df = spark.read.text(raw_data_files)base_df.printSchema()
复制代码


root

|-- value: string (nullable = true)


这使我们能够看到日志数据的模式,它看起来很像我们将很快要检查的文本数据。你可以使用以下代码查看保存日志数据的数据结构类型。


type(base_df)
复制代码


#Output:

pyspark.sql.dataframe.DataFrame


我们将在整个教程中使用 Spark DataFrame。但是,如果需要,还可以将数据 DataFrame 转换为 RDD,即 Spark 的原始数据结构(弹性分布式数据集)。


base_df_rdd = base_df.rddtype(base_df_rdd)
复制代码


#Output

pyspark.rdd.RDD


现在让我们看一下 DataFrame 中实际的日志数据。


base_df.show(10, truncate=False)
复制代码



这看起来绝对像标准的服务器日志数据,它是半结构化的,在使用它们之前,我们肯定需要做一些数据处理和清理。请记住,从 RDD 访问数据略有不同,如下所示。


base_df_rdd.take(10)
复制代码



现在我们已经加载并查看了日志数据,让我们对其进行处理和清理。

数据清理

在本节中,我们将尝试清理和解析日志数据集,以便真正从每个日志消息中提取包含有意义信息的结构化属性。

日志数据理解

如果你熟悉 Web 服务器日志,你将认识到上面显示的数据是通用的日志格式


字段包括:remotehost rfc931 authuser [date] “request” status bytes


字段说明
remotehost远程主机名(或IP地址,如果DNS主机名获取不到或DNSLookup未开启)
rfc931用户的远程登录名(如果有的话)。
authuser通过HTTP服务器验证的远程用户的用户名。
[date]请求的日期和时间。
“request”请求,与来自浏览器或客户端的请求完全相同。
status服务器返回客户端的HTTP状态码
bytes传输到客户端的字节数(Content-Length)。


我们需要使用一些特定的技术来解析、匹配和提取日志数据中的这些属性。

使用正则表达式进行数据解析和提取

接下来,我们必须将半结构化的日志数据解析为单独的列。我们将使用专门的内置函数regexp_extract()进行解析。此函数将针对具有一个或多个捕获组的正则表达式匹配列,并允许提取其中一个匹配的组。我们将对希望提取的每个字段使用一个正则表达式。


到目前为止,你一定已经听说或使用了大量正则表达式。如果你发现正则表达式令人困惑,并且希望了解更多关于正则表达式的信息,我们建议你访问RegexOne网站。你可能还会发现,Goyvaerts 和 Levithan 编写的《正则表达式手册》是非常有用的参考资料。


让我们看下我们使用的数据集中的日志总数。


print((base_df.count(), len(base_df.columns)))
复制代码


#Output

(3461613, 1)


看起来我们总共有大约 346 万条日志消息。一个不小的数字!让我们提取并查看一些日志消息。


sample_logs = [item['value'] for item in base_df.take(15)]sample_logs
复制代码


提取主机名

让我们尝试编写一些正则表达式来从日志中提取主机名。


host_pattern = r'(^\S+\.[\S+\.]+\S+)\s'hosts = [re.search(host_pattern, item).group(1)           if re.search(host_pattern, item)           else 'no match'           for item in sample_logs]hosts
复制代码


[‘199.72.81.55’,

** ‘unicomp6.unicomp.net’,**

** ‘199.120.110.21’,**

** ‘burger.letters.com’,**

…,

…,

** ‘unicomp6.unicomp.net’,**

** ‘d104.aa.net’,**

** ‘d104.aa.net’]**

提取时间戳

现在让我们尝试使用正则表达式从日志中提取时间戳字段。


ts_pattern = r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]'timestamps = [re.search(ts_pattern, item).group(1) for item in sample_logs]timestamps
复制代码


[‘01/Jul/1995:00:00:01 -0400’,

‘01/Jul/1995:00:00:06 -0400’,

‘01/Jul/1995:00:00:09 -0400’,

…,

…,

‘01/Jul/1995:00:00:14 -0400’,

‘01/Jul/1995:00:00:15 -0400’,

‘01/Jul/1995:00:00:15 -0400’]

提取 HTTP 请求方法、URI 和协议

现在让我们尝试使用正则表达式从日志中提取 HTTP 请求方法、URI 和协议模式字段。


method_uri_protocol_pattern = r'\"(\S+)\s(\S+)\s*(\S*)\"'method_uri_protocol = [re.search(method_uri_protocol_pattern, item).groups()               if re.search(method_uri_protocol_pattern, item)               else 'no match'              for item in sample_logs]method_uri_protocol
复制代码


[(‘GET’, ‘/history/apollo/’, ‘HTTP/1.0’),

** (‘GET’, ‘/shuttle/countdown/’, ‘HTTP/1.0’),**

…,

…,

** (‘GET’, ‘/shuttle/countdown/count.gif’, ‘HTTP/1.0’),**

** (‘GET’, ‘/images/NASA-logosmall.gif’, ‘HTTP/1.0’)]**

提取 HTTP 状态码

现在让我们尝试使用正则表达式从日志中提取 HTTP 状态码。


status_pattern = r'\s(\d{3})\s'status = [re.search(status_pattern, item).group(1) for item in sample_logs]print(status)
复制代码


[‘200’, ‘200’, ‘200’, ‘304’, …, ‘200’, ‘200’]

提取 HTTP 响应内容大小

现在让我们尝试使用正则表达式从日志中提取 HTTP 响应内容大小。


content_size_pattern = r'\s(\d+)$'content_size = [re.search(content_size_pattern, item).group(1) for item in sample_logs]print(content_size)
复制代码


[‘6245’, ‘3985’, ‘4085’, ‘0’, …, ‘1204’, ‘40310’, ‘786’]

把它们放在一起

现在,让我们尝试利用前面构建的所有正则表达式模式,并使用 regexp_extract(…)方法构建 DataFrame,所有日志属性都整齐地提取到各自的列中。


from pyspark.sql.functions import regexp_extract
logs_df = base_df.select(regexp_extract('value', host_pattern, 1).alias('host'), regexp_extract('value', ts_pattern, 1).alias('timestamp'), regexp_extract('value', method_uri_protocol_pattern, 1).alias('method'), regexp_extract('value', method_uri_protocol_pattern, 2).alias('endpoint'), regexp_extract('value', method_uri_protocol_pattern, 3).alias('protocol'), regexp_extract('value', status_pattern, 1).cast('integer').alias('status'), regexp_extract('value', content_size_pattern, 1).cast('integer').alias('content_size'))logs_df.show(10, truncate=True)print((logs_df.count(), len(logs_df.columns)))
复制代码


查找缺失值

缺失值和空值是数据分析和机器学习的祸根。让我们看看我们的数据解析和提取逻辑是如何工作的。首先,让我们验证原始数据框中有没有空行。


(base_df    .filter(base_df['value']                .isNull())    .count())
复制代码


0


没问题!现在,如果我们的数据解析和提取工作正常,我们就不应该有任何可能存在空值的行。让我们来试试吧!


bad_rows_df = logs_df.filter(logs_df['host'].isNull()|                              logs_df['timestamp'].isNull() |                              logs_df['method'].isNull() |                             logs_df['endpoint'].isNull() |                             logs_df['status'].isNull() |                             logs_df['content_size'].isNull()|                             logs_df['protocol'].isNull())bad_rows_df.count()
复制代码


33905


哎哟!看起来我们的数据中有超过 33K 的缺失值!我们能搞定吗?


请记住,这不是一个常规的 pandas DataFrame,你无法直接查询并获得哪些列为空。我们所谓的大数据集驻留在磁盘上,它可能存在于 Spark 集群中的多个节点上。那么我们如何找出哪些列有可能为空呢?

查找 Null 值

我们通常可以使用以下技术找出哪些列具有空值。(注意:这种方法是从 StackOverflow 上的一个绝妙的回答改造而来的。)


from pyspark.sql.functions import colfrom pyspark.sql.functions import sum as spark_sum
def count_null(col_name): return spark_sum(col(col_name).isNull().cast('integer')).alias(col_name)
# Build up a list of column expressions, one per column.exprs = [count_null(col_name) for col_name in logs_df.columns]
# Run the aggregation. The *exprs converts the list of expressions into# variable function arguments.logs_df.agg(*exprs).show()
复制代码



看起来 status 列中有一个缺失值而其它的都在 content_size 列中。让我们看看能不能找出问题所在!

处理 HTTP 状态中的空值

状态列解析使用的原始正则表达式是:


regexp_extract('value', r'\s(\d{3})\s', 1).cast('integer')                                          .alias( 'status')
复制代码


是否有更多的数字使正则表达式出错?还是数据点本身的问题?让我们试着找出答案。


注意:在下面的表达式中,~表示“非”。


null_status_df = base_df.filter(~base_df['value'].rlike(r'\s(\d{3})\s'))null_status_df.count()
复制代码


1


让我们看看这条糟糕的记录是什么样子?


null_status_df.show(truncate=False)
复制代码



看起来像一条有很多信息丢失的记录!让我们通过日志数据解析管道来传递它。


bad_status_df = null_status_df.select(regexp_extract('value', host_pattern, 1).alias('host'),                                      regexp_extract('value', ts_pattern, 1).alias('timestamp'),                                      regexp_extract('value', method_uri_protocol_pattern, 1).alias('method'),                                      regexp_extract('value', method_uri_protocol_pattern, 2).alias('endpoint'),                                      regexp_extract('value', method_uri_protocol_pattern, 3).alias('protocol'),                                      regexp_extract('value', status_pattern, 1).cast('integer').alias('status'),                                      regexp_extract('value', content_size_pattern, 1).cast('integer').alias('content_size'))bad_status_df.show(truncate=False)
复制代码



看起来这条记录本身是一个不完整的记录,没有有用的信息,最好的选择是删除这条记录,如下所示!


logs_df = logs_df[logs_df['status'].isNotNull()]exprs = [count_null(col_name) for col_name in logs_df.columns]logs_df.agg(*exprs).show()
复制代码


处理 HTTP content size 列中的空值

根据之前的正则表达式,content_size 列的原始解析正则表达式为:


regexp_extract('value', r'\s(\d+)$', 1).cast('integer')                                       .alias('content_size')
复制代码


原始数据集中是否有数据丢失?让我们试着找出答案吧!我们首先尝试找出基本 DataFrame 中可能缺少内容大小的记录。


null_content_size_df = base_df.filter(~base_df['value'].rlike(r'\s\d+$'))null_content_size_df.count()
复制代码


33905


这个数值似乎与处理后的 DataFrame 中缺失的内容大小的数量相匹配。让我们来看看我们的数据框中缺少内容大小的前十条记录。


null_content_size_df.take(10)
复制代码


很明显,糟糕的原始数据记录对应错误响应,其中没有发回任何内容,服务器为 content_size 字段发出了一个“-”。


因为我们不想从我们的分析中丢弃这些行,所以我们把它们代入或填充为 0。

修复 content_size 为 null 的行

最简单的解决方案是像前面讨论的那样,用 0 替换 logs_df 中的 null 值。Spark DataFrame API 提供了一组专门为处理 null 值而设计的函数和字段,其中包括:


  • fillna():用指定的非空值填充空值。

  • na:它返回一个DataFrameNaFunctions对象,其中包含许多用于在空列上进行操作的函数。


有几种方法可以调用这个函数。最简单的方法就是用已知值替换所有空列。但是,为了安全起见,最好传递一个包含(column_name, value)映射的 Python 字典。这就是我们要做的。下面是文档中的一个示例:


>>> df4.na.fill({'age': 50, 'name': 'unknown'}).show()+---+------+-------+|age|height|   name|+---+------+-------+| 10|    80|  Alice||  5|  null|    Bob|| 50|  null|    Tom|| 50|  null|unknown|+---+------+-------+
复制代码


现在我们使用这个函数,用 0 填充 content_size 字段中所有缺失的值!


logs_df = logs_df.na.fill({'content_size': 0})exprs = [count_null(col_name) for col_name in logs_df.columns]logs_df.agg(*exprs).show()
复制代码



看,没有缺失值了!

处理时间字段(时间戳)

现在我们有了一个干净的、已解析的 DataFrame,我们必须将 timestamp 字段解析为一个实际的时间戳。通用的日志格式时间有点不标准。用户定义函数(UDF)是解析它最直接的方法。


from pyspark.sql.functions import udf
month_map = { 'Jan': 1, 'Feb': 2, 'Mar':3, 'Apr':4, 'May':5, 'Jun':6, 'Jul':7, 'Aug':8, 'Sep': 9, 'Oct':10, 'Nov': 11, 'Dec': 12}
def parse_clf_time(text): """ Convert Common Log time format into a Python datetime object Args: text (str): date and time in Apache time format [dd/mmm/yyyy:hh:mm:ss (+/-)zzzz] Returns: a string suitable for passing to CAST('timestamp') """ # NOTE: We're ignoring the time zones here, might need to be handled depending on the problem you are solving return "{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}".format( int(text[7:11]), month_map[text[3:6]], int(text[0:2]), int(text[12:14]), int(text[15:17]), int(text[18:20]) )
复制代码


现在,让我们使用这个函数来解析 DataFrame 中的 time 列。


udf_parse_time = udf(parse_clf_time)
logs_df = (logs_df.select('*', udf_parse_time(logs_df['timestamp']) .cast('timestamp') .alias('time')) .drop('timestamp')logs_df.show(10, truncate=True)
复制代码



一切看起来都很好!让我们通过检查 DataFrame 的模式来验证这一点。


logs_df.printSchema()root |-- host: string (nullable = true) |-- method: string (nullable = true) |-- endpoint: string (nullable = true) |-- protocol: string (nullable = true) |-- status: integer (nullable = true) |-- content_size: integer (nullable = false) |-- time: timestamp (nullable = true)
复制代码


现在,让我们缓存 logs_df,因为我们将在下一部分的数据分析部分中大量地使用它!


logs_df.cache()
复制代码

Web 日志数据分析

现在我们有了一个 DataFrame,其中包含经过解析和清理的日志文件,我们可以执行一些有趣的探索性数据分析(EDA),尝试获得一些有趣的见解!

内容大小统计

让我们计算一些关于 Web 服务器返回内容大小的统计信息。特别是,我们想知道内容大小的平均值、最小值和最大值。


我们可以通过在 logs_df content_size 列上调用**.describe()**来计算统计数据。该函数的作用是返回给定列的计数、平均值、stddev、最小值和最大值。


content_size_summary_df = logs_df.describe(['content_size'])content_size_summary_df.toPandas()
复制代码



或者,我们可以使用 SQL 直接计算这些统计数据。你可以研究下文档pyspark.sql.functions 模块提供的许多有用的函数。


在应用**.agg()函数之后,我们调用 toPandas()来提取结果并将其转换为 panda** DataFrame,该数据框在 Jupyter 笔记本上具有更好的格式。


from pyspark.sql import functions as F
(logs_df.agg(F.min(logs_df['content_size']).alias('min_content_size'), F.max(logs_df['content_size']).alias('max_content_size'), F.mean(logs_df['content_size']).alias('mean_content_size'), F.stddev(logs_df['content_size']).alias('std_content_size'), F.count(logs_df['content_size']).alias('count_content_size')) .toPandas())
复制代码



我们可以验证结果,并看到它们和预期相同。

HTTP 状态码分析

接下来,让我们看看日志中出现的状态码值。我们想知道数据中出现了哪些状态码值以及出现了多少次。我们再次从 logs_df 开始,然后按 status 列分组,运用**.count()聚合函数,并按 status**列排序。


status_freq_df = (logs_df                     .groupBy('status')                     .count()                     .sort('status')                     .cache())print('Total distinct HTTP Status Codes:', status_freq_df.count())                     
复制代码


Total distinct HTTP Status Codes: 8


看起来,我们总共有 8 个不同的 HTTP 状态码。让我们以频率表的形式看一下它们的出现情况。


status_freq_pd_df = (status_freq_df                         .toPandas()                         .sort_values(by=['count'],                                      ascending=False))status_freq_pd_df
复制代码



看起来,状态码 200 OK 是最常见的代码,这是一个很好的信号,说明大多数时候事情都运行正常。让我们将其可视化。


import matplotlib.pyplot as pltimport seaborn as snsimport numpy as np%matplotlib inline
sns.catplot(x='status', y='count', data=status_freq_pd_df, kind='bar', order=status_freq_pd_df['status'])
复制代码


HTTP状态码出现次数


不是太坏!但是由于数据存在巨大的差别,一些状态码几乎不可见。让我们做一个对数变换,看看情况是否有所改善。


log_freq_df = status_freq_df.withColumn('log(count)',                                         F.log(status_freq_df['count']))log_freq_df.show()
复制代码



结果看起来很好,似乎已经处理了偏斜度,让我们通过可视化这些数据来验证一下。


log_freq_pd_df = (log_freq_df                    .toPandas()                    .sort_values(by=['log(count)'],                                 ascending=False))sns.catplot(x='status', y='log(count)', data=log_freq_pd_df,             kind='bar', order=status_freq_pd_df['status'])
复制代码


HTTP状态码出现次数——对数变换


这看起来确实好多了,偏斜度不那么大了!

分析高频主机

让我们看看频繁访问服务器的主机。我们将尝试获得每个主机的总访问数,然后根据这些数进行排序,并只显示访问频率排在前十位的主机。


host_sum_df =(logs_df               .groupBy('host')               .count()               .sort('count', ascending=False).limit(10))
host_sum_df.show(truncate=False)
复制代码



这看起来不错,但是让我们更仔细地检查下第 9 行中的空白记录。


host_sum_pd_df = host_sum_df.toPandas()host_sum_pd_df.iloc[8]['host']
复制代码


看起来,我们有一些访问最频繁的主机使用了空字符串作为主机名。这给我们上了宝贵的一课,不仅要检查 null,还要在数据清理时检查潜在的空字符串。


显示出现频率排名前 20 的端点


现在,让我们可视化日志中端点(URI)的访问次数。要执行这个任务,我们从 logs_df 开始,按 endpoint 列分组,按 count 聚合,并按降序排序,就像前面的问题一样。


paths_df = (logs_df            .groupBy('endpoint')            .count()            .sort('count', ascending=False).limit(20))
paths_pd_df = paths_df.toPandas()paths_pd_df
复制代码



毫不奇怪,GIF、主页和一些 CGI 脚本似乎是访问最多的资产。

错误数排名前 10 的端点

返回代码不是 200 (HTTP 状态 OK)的次数排名前 10 的端点是什么?我们创建一个有序列表,其中包含端点和返回代码不是 200 的次数,并显示前十位。


not200_df = (logs_df               .filter(logs_df['status'] != 200))
error_endpoints_freq_df = (not200_df .groupBy('endpoint') .count() .sort('count', ascending=False) .limit(10) ) error_endpoints_freq_df.show(truncate=False)
复制代码



看起来 GIF(动画/静态图片)加载失败最多。你知道为什么吗?考虑到这些日志是 1995 年的,考虑到当时的互联网速度,我一点也不惊讶!

不同主机总数

在这两个月里,访问 NASA 网站的不同主机的总数是多少?我们可以通过一些变换来确定。


unique_host_count = (logs_df                     .select('host')                     .distinct()                     .count())unique_host_count
复制代码


137933

每天的不同主机数

作为高级示例,让我们看看如何确定整个日志中每天中不同主机的数量。这个计算将为我们提供每天中不同主机的数量。


我们希望 DataFrame 中数据按照日期的升序排序,其中包括天和与那一天关联的不同主机的数量。


考虑一下你需要执行哪些步骤来计算每天发出请求的不同主机的数量。因为日志只包含一个月,所以可以忽略月份。你可能希望使用 pyspark.sql.functions 模块中的dayofmonth函数(函数模块我们已经导入为 F)。


host_day_df :一个包含两列的 DataFrame。



对于 logs_df 中的每一行,DataFrame 中都有一行。实际上,我们只是转换了 logs_df 的每一行。例如,对于 logs_df 中的这一行:


unicomp6.unicomp.net - - [01/Aug/1995:00:35:41 -0400] "GET /shuttle/missions/sts-73/news HTTP/1.0" 302 -
复制代码


host_day_df 应该包含unicomp6.unicomp.net 1


host_day_distinct_df = (host_day_df                          .dropDuplicates())host_day_distinct_df.show(5, truncate=False)
复制代码



host_day_distinct_df :这个 DataFrame 具有与 host_day_df 相同的列,但是删除了重复的(day, host)行。


host_day_distinct_df = (host_day_df                          .dropDuplicates())host_day_distinct_df.show(5, truncate=False)
复制代码



daily_unique_hosts_df:一个包含两列的 DataFrame。



def_mr = pd.get_option('max_rows')pd.set_option('max_rows', 10)
daily_hosts_df = (host_day_distinct_df .groupBy('day') .count() .sort("day"))
daily_hosts_df = daily_hosts_df.toPandas()daily_hosts_df
复制代码



这为我们提供了一个很好的 DataFrame,显示了每天不同主机的总数。让我们将其可视化!


c = sns.catplot(x='day', y='count',                 data=daily_hosts_df,                 kind='point', height=5,                 aspect=1.5)
复制代码


每天的不同主机数

每台主机每天的平均请求数

在前面的示例中,我们研究了一种方法,该方法可以逐日确定整个日志中不同主机的数量。现在,让我们试着根据我们的日志找出每个主机每天向 NASA 网站发出的平均请求数。我们想要一个 DataFrame,通过日期升序来排序,其中包括天和每个主机在这一天的平均请求数。


daily_hosts_df = (host_day_distinct_df                     .groupBy('day')                     .count()                     .select(col("day"),                                       col("count").alias("total_hosts")))
total_daily_reqests_df = (logs_df .select(F.dayofmonth("time") .alias("day")) .groupBy("day") .count() .select(col("day"), col("count").alias("total_reqs")))
avg_daily_reqests_per_host_df = total_daily_reqests_df.join(daily_hosts_df, 'day')avg_daily_reqests_per_host_df = (avg_daily_reqests_per_host_df .withColumn('avg_reqs', col('total_reqs') / col('total_hosts')) .sort("day"))avg_daily_reqests_per_host_df = avg_daily_reqests_per_host_df.toPandas()avg_daily_reqests_per_host_df
复制代码



现在,我们可以可视化每台主机平均每天的请求数。


c = sns.catplot(x='day', y='avg_reqs',                 data=avg_daily_reqests_per_host_df,                 kind='point', height=5, aspect=1.5)
复制代码


每台主机的平均日请求数


看来 13 号每台主机的请求数最大。

404 响应代码数量

创建一个只包含 404 状态码(未找到)的日志记录 DataFrame。我们确保缓存 cache()这个 not_found_df 数据框,因为我们将在下面的示例中使用它。你认为日志中有多少 404 记录?


not_found_df = logs_df.filter(logs_df["status"] == 404).cache()print(('Total 404 responses: {}').format(not_found_df.count()))
复制代码


Total 404 responses: 20899

列出 404 响应代码数排名前 20 的端点

使用只包含 404 响应代码的日志记录 DataFrame,我们现在将打印出产生最多 404 错误的前 20 个端点的列表。记住,这些端点应该是有序的。


endpoints_404_count_df = (not_found_df                          .groupBy("endpoint")                          .count()                          .sort("count", ascending=False)                          .limit(20))
endpoints_404_count_df.show(truncate=False)
复制代码


列出 404 响应代码数排名前 20 的主机

使用只包含 404 响应代码的日志记录 DataFrame,我们现在将打印出产生最多 404 错误的前 20 个主机的列表。记住,这些主机应该是按顺序排列的。


hosts_404_count_df = (not_found_df                          .groupBy("host")                          .count()                          .sort("count", ascending=False)                          .limit(20))
hosts_404_count_df.show(truncate=False)
复制代码



这让我们可以很容易了解,对于 NASA 的网页,哪台主机最终生成的 404 错误最多。

可视化每天的 404 错误

现在让我们临时(按时间)研究一下 404 记录。与显示每日不同主机数的示例类似,我们将按天划分 404 请求,并在 errors_by_date_sorted_df 中按天排序每日计数。


errors_by_date_sorted_df = (not_found_df                                .groupBy(F.dayofmonth('time').alias('day'))                                .count()                                .sort("day"))
errors_by_date_sorted_pd_df = errors_by_date_sorted_df.toPandas()errors_by_date_sorted_pd_df
复制代码



现在,让我们按天可视化 404 错误。


每天的404错误总数

404 错误最多的 3 天

根据前面的图表,一个月中 404 错误最多的前三天是哪三天?为此,我们可以利用前面创建的 errors_by_date_sorted_df


(errors_by_date_sorted_df    .sort("count", ascending=False)    .show(3))
复制代码


可视化每小时的 404 错误数

使用我们之前缓存的 not_found_df 数据框,现在我们将按小时分组并按天排序,创建一个 DataFrame,其中包含一天中每个小时的 HTTP 请求的 404 响应总数(从 0 点到午夜)。然后,我们会从 DataFrame 构建可视化。


hourly_avg_errors_sorted_df = (not_found_df                                   .groupBy(F.hour('time')                                             .alias('hour'))                                   .count()                                   .sort('hour'))hourly_avg_errors_sorted_pd_df = hourly_avg_errors_sorted_df.toPandas()
c = sns.catplot(x='hour', y='count', data=hourly_avg_errors_sorted_pd_df, kind='bar', height=5, aspect=1.5)
复制代码


每小时的404错误总数


看起来,404 错误多发生在下午,早上最少。现在,我们可以将 panda 显示的最大行重置为默认值,因为我们之前已经将其更改为显示有限数量的行。


pd.set_option('max_rows', def_mr)
复制代码

小结

在一个非常常见但又非常重要的日志分析案例研究中,我们采用实操的方法探讨了大规模的数据清理、解析、分析和可视化。虽然我们在这里研究的数据从规模或数量的角度来看可能并不是传统意义上的“大数据”,但是这些技术和方法非常通用,可以扩展到更大的数据量。我希望这个案例研究能让你很好地了解像 Apache Spark 这样的开源框架,它们让你可以在很大的规模上使用结构化和半结构化数据!


本文附带的所有代码和分析都可以在我的GitHub存储库中找到。


这个Jupyter笔记本中提供了详细的步骤。


原文链接


https://towardsdatascience.com/scalable-log-analytics-with-apache-spark-a-comprehensive-case-study-2be3eb3be977


2019-05-05 14:378038
用户头像

发布了 729 篇内容, 共 465.7 次阅读, 收获喜欢 1542 次。

关注

评论

发布
暂无评论
发现更多内容

培训学习大数据开发技术怎么样

小谷哥

软件开发入门教程网之Git 分支管理

雪奈椰子

git git pull cannot lock ref git 学习

Integer.valueOf(String) 方法之惑

Steven

SiamRPN++: Evolution of Siamese Visual Tracking with Very Deep Networks 深层网络连体视觉跟踪的演变

Geek_7ubdnf

神经网络

深圳大数据程序员培训多长时间可以找工作

小谷哥

上海前端培训课程哪家的好

小谷哥

CLIPPO:纯图像的CLIP,参数减半且更强大!

Zilliz

机器学习

中华财险进击数字化

OceanBase 数据库

数据库 oceanbase

软件开发入门教程网之Git 基本操作

雪奈椰子

git clone git push

稳扎稳打,坚定前行 | 一文带你回顾 StoneDB 的 2022 年

StoneDB

MySQL 数据库 HTAP StoneDB 企业号 1 月 PK 榜

小令观点 | 人脸识别遇到了什么关键性难题?

令牌云数字身份

人脸识别 难题攻克

自编码器 AE(AutoEncoder)程序

Geek_7ubdnf

自编码器

小令观点 | 数字世界里,拿什么来保护你的身份安全?

令牌云数字身份

身份安全 人脸识别 安全技术

开源数据可视化/自服务BI工具哪家强?

搞大屏的小北

数据可视化工具 DataEase 行转列

国内外开源数据可视化工具对比:DataEase相较于MetaBase有何优势

搞大屏的小北

DataEase Metabase 数据可视化工具对比 对比

转租、重组、裁员,Salesforce给中国学徒带来了哪些启示?

ToB行业头条

IoT设备接入物联网平台华北2(北京) 节点开发实战——实践类

阿里云AIoT

小程序 监控 物联网 消息中间件 弹性计算

DataEase 数据源插件开发——如何替换 STGroupFile 模板文件

搞大屏的小北

数据可视化工具 DataEase STGroupFile 模版替换 数据源插件

前端面授培训课程哪里好呢

小谷哥

SiamRPN:High Performance Visual Tracking with Siamese Region Proposal Network 孪生网络

Geek_7ubdnf

神经网络

创建Root权限虚拟环境

Geek_7ubdnf

Linux

目标跟踪相关知识总结

Geek_7ubdnf

图像处理

SiamFC:用于目标跟踪的全卷积孪生网络 fully-convolutional siamese networks for object tracking

Geek_7ubdnf

图像处理

SA-Siam:用于实时目标跟踪的孪生网络A Twofold Siamese Network for Real-Time Object Tracking

Geek_7ubdnf

神经网络

又一创新!阿里云 Serverless 调度论文被云计算顶会 ACM SoCC 收录

阿里巴巴中间件

阿里云 Serverless 云原生

软件测试/测试开发丨Google 测试总监聊如何经营成功的测试职业生涯

测试人

软件测试 自动化测试 测试开发 职业生涯

【DBA100人】白鳝:一直往上走,从程序员到数据库专家

OceanBase 数据库

数据库 oceanbase

作业帮:探索多云架构下的数据库集群解决方案

OceanBase 数据库

数据库 oceanbase

Kubernetes HPA 的三个误区与避坑指南

阿里巴巴中间件

阿里云 Kubernetes 云原生

带你来吃瓜!Andy Pavlo教授带您一文回顾数据库的2022年

StoneDB

MySQL 数据库 HTAP StoneDB 企业号 1 月 PK 榜

堆叠降噪自动编码器 Stacked Denoising Auto Encoder(SDAE)

Geek_7ubdnf

图像处理

Apache Spark实现可扩展日志分析,挖掘系统最大潜力_数据库_Dipanjan Sarkar_InfoQ精选文章