自从 Redis 出现以来,就在时间序列数据的存储与分析方面得到了一定程度的使用。Redis 最初只是被实现为一种缓冲,其目的是用于日志的记录,而随着其功能的不断发展,它已经具备了 5 种显式、3 种隐式的结构或类型,为 Redis 中的数据分析提供了多种方法。本文将为读者介绍使用 Redis 进行时间序列分析最灵活的一种方法。
关于竞态与事务
在 Redis 中,每个单独的命令本身都是原子性的,但按顺序执行的多条命令却未必是原子性的,有可能因出现竞态而导致不正确的行为。为了应对这一限制,本文将使用“事务管道”以及“Lua 脚本"这两种方式避免出现数据的竞态冲突。
在使用 Redis 以及用于连接 Redis 的 Python 客户端时,我们会调用 Redis 连接的.pipeline() 方法以创建一个“事务管道”(在使用其他客户端时,通常也将其称为“事务”或“MULTI/EXEC 事务”),在调用时无需传入参数,或者可以传入一个布尔值 True。通过该方法创建的管道将收集所有传入的命令,直到调用.execute() 方法为止。当.execute() 方法调用之后,客户端将对 Redis 发送 MULTI 命令,然后发送所收集的全部命令,最后是 EXEC 命令。当 Redis 在执行这一组命令时,不会被其他任何命令所打断,从而确保了原子性的执行。
在 Redis 中对一系列命令进行原子性的执行还存在着另一种选择,即服务端的 Lua 脚本。简单来说,Lua 脚本的行为与关系型数据库中的存储过程非常相似,但仅限于使用 Lua 语言以及一种专用的 Redis API 以执行 Lua。与事务的行为非常相似,Lua 中的脚本在执行时通常来说不会被打断 1,不过未处理的错误也会造成 Lua 脚本提前中断。从语法上说,我们将通过调用 Redis 连接对象的.register_script() 方法以加载一个 Lua 脚本,该方法所返回的对象可以作为一个函数,以调用 Redis 中的脚本,而无需再调用 Redis 连接中的其他方法,并结合使用 SCRIPT LOAD 与 EVALSHA 命令以加载与执行脚本。
用例
当谈到 Redis 以及使用它作为一个时间序列数据库时,我们首先提出的一个问题是:“时间序列数据库的用途或目的是什么?”时间序列数据库的用例更多地与数据相关,尤其在你的数据结构被定义为一系列事件、一个或多个值的示例、以及随着时间推移而变化的度量值的情况下。以下是这些方面应用的一些示例(但不仅限于此):
- 股票交易的卖价与交易量
- 在线零售商的订单总价与送货地址
- 视频游戏中玩家的操作
- IoT 设备中内嵌的传感器中收集的数据
我们将继续进行深入的探讨,不过基本上来说,时间序列数据库的作用就是如果发生了某件事,或是你进行了一次评估操作后,可以在记录的数据中加入一个时间戳。一旦你收集了某些事件的信息,就可以对这些事件进行分析。你可以选择在收集的同时进行实时分析,也可以在事件发生后需要进行某些更复杂的查询时进行分析。
使用通过有序集合与哈希进行高级分析
在 Redis 中,对于时间序列数据的保存与分析有一种最为灵活的方式,它需要结合使用 Redis 中的两种不同的结构,即有序集合(Sorted Set)与哈希(Hash)。
在 Redis 中,有序集合这种结构融合了哈希表与排序树(Redis 在内部使用了一个跳表结构,不过你可以先忽略这一细节)的特性。简单来说,有序集合中的每个项都是一个字符串型的“成员”以及一个 double 型的“分数”的组合。成员在哈希中扮演了键的角色,而分数则承担了树中的排序值的作用。通过这种组合,你就可以通过成员或分数的值直接访问成员与分数,此外,你也可以通过多种方式对按照分数的值排好序的成员与分数进行访问 2。
保存事件
如今,从各种方面来说,使用一个或多个有序集合以及部分哈希的组合用于保存时间序列数据的做法都是 Redis 最常见的用例之一。它表现了一种底层的构建块,用于实现各种不同的应用程序。包括像 Twitter 一样的社交网络,以及类似于 Reddit 和 Hacker News 一样的新闻网站,乃至于基于 Redis 本身的一种接近完成的关系 - 对象映射器
在本文的示例中,我们将获取用户在网站中的各种行为所产生的事件。所有的事件都将共享 4 种属性,以及不同数量的其他属性,这取决于事件的类型。我们已知的属性包括:id、timestamp、type 以及 user。为了保存每个事件,我们将使用一个 Redis 哈希,它的键由事件的 id 所派生而来。为了生成事件的 id,我们将在大量的源中选择一种方式,但现在我们将通过 Redis 中的一个计数器来生成我们的 id。如果在 64 位的平台上使用 64 位的 Redis,我们将能够创建最多 263-1 个事件,主要的限制取决于可用的内存大小。
当我们准备好进行数据的记录与插入后,我们就需要将数据保存为哈希,并在有序集合中插入一个成员 / 分数对,分别对应事件的 id(成员)与事件的时间戳(分数)。记录某个事件的代码如下
def record_event(conn, event): id = conn.incr('event:id') event['id'] = id event_key = 'event:{id}'.format(id=id) pipe = conn.pipeline(True) pipe.hmset(event_key, event) pipe.zadd('events', **{id: event['timestamp']}) pipe.execute()
在这个 record_event() 函数中,我们获取了一个事件,从 Redis 中获得一个计算得出的新 id,将它赋给事件,并生成了事件保存的键。这个键的构成是字符串“event”加上新的 id,并在两者之间由冒号分割所构成的 3。随后我们创建了一个管道,并准备设置该事件相关的全部数据,同时准备将事件 id 与时间戳对保存在有序集合中。当事务管道完成执行之后,这一事件将被记录并保存在 Redis 中。
事件分析
从现在起,我们可以通过多种方式对时间序列进行分析了。我们可以通过 ZRANGE 4 的设置对最新或最早的事件 id 进行扫描,并且可以在稍后获取这些事件本身以进行分析。通过结合使用 ZRANGEBYSCORE 与 LIMIT 参数,我们能够立即获取到某个时间戳之前或之后的 10 个、甚至是 100 个事件。我们也可以通过 ZCOUNT 计算某一特定时间段内事件发生的次数,甚至选择用 Lua 脚本实现自己的分析方式。以下的示例将通过 Lua 脚本计算在一个给定时间范围内各种不同的事件类型的数量。
import json def count_types(conn, start, end): counts = count_types_lua(keys=['events'], args=[start, end]) return json.loads(counts) count_types_lua = conn.register_script(''' local counts = {} local ids = redis.call('ZRANGEBYSCORE', KEYS[1], ARGV[1], ARGV[2]) for i, id in ipairs(ids) do local type = redis.call('HGET', 'event:' .. id, 'type') counts[type] = (counts[type] or 0) + 1 end {1} return cjson.encode(counts) ''')
这里所定义的 count_types() 函数首先将参数传递给经过封装的 Lua 脚本,并对经过 json 编码的事件类型与数量的映射进行解码。Lua 脚本首先创建了一个结果表(对应 counts 变量),随后通过 ZRANGEBYSCORE 读取这一时间范围内的事件 id 的列表。当获取到这些 id 之后,脚本将一次性读取每个事件中的类型属性,让事件数量表保持不断增长,最后结束时返回一个经过 json 编码的映射结果。
对性能的思考以及数据建模
正如代码所展示的一样,这个用于在特定时间范围内计算不同事件类型数量的方法能够正常工作,但这种方式需要对这一时间范围内的每个事件的类型属性进行大量的读取。对于包含几百或是几千个事件的时间范围来说,这样的分析是比较快的。但如果某个时间范围内饮食了几万乃至几百万个事件,情况又会如何呢?答案很简单,Redis 在计算结果时将会阻塞。
有一种方法能够处理在分析事件流时,由于长时间的脚本执行而产生的性能问题,即预先考虑一下需要被执行的查询。具体来说,如果你知道你需要对某一段时间范围内的每种事件的总数进行查询,你就可以为每种事件类型使用一个额外的有序集合,每个集合只保存这种类型事件的 id 与时间戳对。当你需要计算每种类型事件的总数时,你可以执行一系列 ZCOUNT 或相同功能的方法调用 5,并返回该结果。让我们来看一下这个修改后的 record_event() 函数,它将保存基于事件类型的有序集合。
def record_event_by_type(conn, event): id = conn.incr('event:id') event['id'] = id event_key = 'event:{id}'.format(id=id) type_key = 'events:{type}'.format(type=event['type']) ref = {id: event['timestamp']} pipe = conn.pipeline(True) pipe.hmset(event_key, event) pipe.zadd('events', **ref) pipe.zadd(type_key, **ref) pipe.execute()
新的 record_event_by_type() 函数与旧的 record_event() 函数在许多方面都是相同的,但新加入了一些操作。在新的函数中,我们将计算一个 type_key,这里将保存该事件在对应这一类型事件的有序集合中的位置索引。当 id 与时间戳对添加到 events 有序集合中后,我们还要将 id 与时间戳对添加到 type_key 这个有序集合中,随后与旧的方法一样执行数据插入操作。
现在,如果需要计算两个时间点之间“visit”这一类型的事件所发生的次数,我们只需在调用 ZCOUNT 命令时传入所计算的事件类型的特定键,以及开始与结束的时间戳。
def count_type(conn, type, start, end): type_key = 'events:{type}'.format(type=type) return conn.zcount(type_key, start, end)
如果我们能够预先知道所有可能出现的事件类型,我们就能够对每种类型分别调用以上的 count_type() 函数,并构建出之前在 count_types() 中所创建的表。而如果我们无法预先知道所有可能会出现的事件类型,或是有可能在未来出现新的事件类型,我们将可以将每种类型加入一个集合(Set)结构中,并在之后使用这个集合以发现所有的事件类型。以下是经我们修改后的记录事件函数。
def record_event_types(conn, event): id = conn.incr('event:id') event['id'] = id event_key = 'event:{id}'.format(id=id) type_key = 'events:{type}'.format(type=event['type']) ref = {id: event['timestamp']} pipe = conn.pipeline(True) pipe.hmset(event_key, event) pipe.zadd('events', **ref) pipe.zadd(type_key, **ref) pipe.sadd('event:types', event['type']) pipe.execute()
与之前相比,唯一的改变就在于我们将事件的类型加入了一个命名为“event:types”的集合,然后我们需要相应地修改一下 count_types() 函数的实现……
def count_types_fast(conn, start, end): event_types = conn.smembers('event:types') counts = {} for event_type in event_types: counts[event_type] = conn.zcount( 'events:{type}'.format(type=event_type), start, end) return counts
如果某个时间范围内存在大量的事件,那么新的 count_types_fast() 函数将比旧的 count_types() 函数执行更快,主要原因在于 ZCOUNT 命令比起从哈希中获取每个事件类型速度更快。
以 Redis 作为数据存储
虽然 Redis 自带的分析工具及其命令和 Lua 脚本非常灵活并且性能出色,但某些类型的时间序列分析还能够从特定的计算方法、库或工具中受益。对于这些情形来说,将数据保存在 Redis 中仍然是一种非常有意义的做法,因为 Redis 对于数据的存取非常快。
举例来说,对于一支股票来说,整个 10 年的成交金额数据按照每分钟取样也最多不过 120 万条数据,这点数据能够轻易地保存在 Redis 中。但如果要通过 Redis 中的 Lua 脚本对数据执行任何复杂的函数,则需要对现有的优化库进行移植或是调试,让他们在 Redis 中也实现相同的功能。而如果使用 Redis 进行数据存储,你就可以获取时间范围内的数据,将他们保存在已有的经过优化的内核中,以计算不断变化的平均价格、价格波动等等。
那么为什么不选用一种关系型数据库作为替代呢?原因就在于速度。Redis 将所有数据都保存在 RAM 中,并且对数据结构进行了优化(正如我们所举的有序集合的例子一样)。在内存中保存数据及经过优化的数据结构的结合在速度上不仅比起以 SSD 为存储介质的数据库快了 3 个数量级,并且对于一般的内存键值存储系统、或是在内存中保存序列化数据的系统也快了 1 至 2 个数量级。
结论及后续
当使用 Redis 进行时间序列分析,乃至任何类型的分析时,一种合理的方式是记录不同事件的某些通用属性与数值,保存在一个通用的地址,以便于搜索包含这些通用属性与数值的事件。我们通过为每个事件类型实现对应的有序集合实现了这一点,并且也提到了集合的使用。虽然这篇文章主要讨论的是有序集合的应用,但 Redis 中还存在着更多的结构,在分析工作中使用 Redis 还存在其他许多不同的选择。除了有序集合与哈希之外,在分析工作中还有一些常用的结构,包括(但不限于):位图、数组索引的字节字符串、HyperLogLogs、列表(List)、集合,以及很快将发布的基于地理位置索引的有序集合命令 6。
在使用 Redis 时,你会不时地重新思索如何为更特定的数据访问模式添加相关的数据结构。你所选择的数据保存形式既为你提供了保存能力,也限定了你能够执行的查询的类型,这一点几乎总是不变的。理解这一点很重要,因为与传统的、更为人熟悉的关系型数据库不同,在 Redis 中可用的查询与操作受限于数据保存的类型。
在看过了分析时间序列数据的这些示例之后,你可以进一步阅读《Redis in Action》这本书第 7 章中关于通过创建索引查找相关数据的各种方法,可以在 RedisLabs.com 的 eBooks 栏目中找到它。而在《Redis in Action》一书的第 8 章中提供了一个近乎完整的、类似于 Twitter 的社交网络的实现,包括关注者、列表、时间线、以及一个流服务器,这些内容对于理解如何使用 Redis 保存时间序列中的时间线及事件以及对查询的响应是一个很好的起点。
1 如果你启动了 lua-time-limit 这一配置选项,并且脚本的执行时间超过了配置的上限,那么只读的脚本也可能会被打断。
2 当分数相同时,将按照成员本身的字母顺序对于项目进行排序。
3 在本文中,我们通常使用冒号作为操作 Redis 数据时对名称、命名空间以及数据的分割符,但你也可以随意选择任何一种符号。其他 Redis 用户可能会选择句号“.”或分号“;”等作为分割符。只要选择一种在键或数据中通常不会出现的字符,就是一种比较好的做法。
4 ZRANGE 及 ZREVRANGE 提供了基于排序位置从有序集合中获取元素的功能,ZRANGE 的最小分数索引为 0,而 ZREVRANGE 的最大分数索引为 0。
5 ZCOUNT 命令将对有序集合中某个范围内的数据计算值的总和,但它的做法是从某个端点开始增量式的遍历整个范围。对于包含大量项目的范围来说,这一命令的开销可能会很大。作为另一种选择,你可以使用 ZRANGEBYSCORE 和 ZREVRANGEBYSCORE 命令以查找范围内成员的起始与终结点。而通过在成员列表的两端使用 ZRANK,你可以查找这些成员在有序集合中的两个索引,通过使用这两个索引,你可以将两者相减(再加上 1)以得到相同的结果,而其计算开销则大大减少了,即使这种方式需要对 Redis 进行更多的调用。
6 在 Redis 2.8.9 中引入的 Z*LEX 命令会使用有序集合以提供对有序集合有限的前缀搜索功能,与之类似,最新的还未发布的 Redis 3.2 中将通过 GEO* 命令提供有限的地理位置搜索与索引功能。
关于作者
Josiah Carlson博士是一位经验丰富的数据库专家,也是 Redis 社区的活跃贡献者。作为初创公司方面的老手,自从了解了 Salvatore Sanfilippo 在 2010 年的工作后,Josiah 就查觉了 Redis 的价值与目的。经过大量的使用、误用以及帮助他人理解 Redis 的各种文档化或未文档化的特性之后,他最终在为他的前一家初创公司进行技术开拓时编写了《Redis in Action》一书。他的部分工作成果转化为了使用 Redis 的功能的相关开源库,并且持续地在邮件列表中回答各种问题,同时也不时地编写 Redis 与其他主题的博客文章。Josiah Carlson 博士目前在 Openmail 担任技术部的 VP,这是一家位于洛杉矶的尚处于早期的初创公司。他非常乐于为你讲解如何通过 Redis 帮助你解决公司中的各种问题。
评论