背景
大数据时代,数据的重要性不言而喻,尤其对于互联网公司,随着业务的快速变化,商业模式的不断创新、用户体验个性化、实时化需求日益突出,海量数据实时处理在商业方面的需求越来越大。如何通过数据快速分析出用户的行为,以便做出准确的决策,越来越体现一个公司的价值。现阶段对于实时数据的建设比较单一,主要存在以下问题:
实时仓库建设不足,维度及指标不够丰富,无法快速满足不同业务需求。
实时数据和离线数据对比不灵活,无法自动化新增对比基期数据,且对比数据无法预先生产。
数据监控不及时,一旦数据出现问题而无法及时监控到,就会影响业务分析决策。
因此,本文将基于美团大交通实时数据产品,从面临的挑战、总体解决方案、数据设计架构、后台设计架构等几个方面,详细介绍实时数据系统的整体建设思路。
挑战
实时流数据来源系统较多,处理非常复杂,并且不同业务场景对实时数据的要求不同,因此在建设过程主要有以下挑战:
如何在保证数据准确性的前提下实现多实时流关联;实时流出现延迟、乱序、重复时如何解决。流式计算中通常需要将多个实时流按某些主键进行关联得到特定的实时数据,但不同于离线数据表关联,实时流的到达是一个增量的过程,无法获取实时流的全量数据,并且实时流的达到次序无法确定,因此在进行关联时需要考虑存储一些中间状态及下发策略问题。
实时流可复用性,实时流的处理不能只为解决一个问题,而是一类甚至几类问题,需要从业务角度对数据进行抽象,分层建设,以快速满足不同场景下对数据的要求。
中台服务如何保证查询性能、数据预警及数据安全。实时数据指标维度较为丰富,多维度聚合查询场景对服务层的性能要求较高,需要服务层能够支持较快的计算能力和响应能力;同时数据出现问题后,需要做好及时监控并快速修复。如何保证产品应用需求个性化。
实时数据与离线数据对比不灵活,需要提供可配置方案,并能够及时生产离线数据。
解决思路
我们在充分梳理业务需求的基础上,重新对实时流进行了建设,将实时数据分层建模,并对外提供统一的接口,保证数据同源同口径;同时,在数据服务层,增加可配置信息模块解决了配置信息不能自动化的问题,在数据处理策略上做了多线程处理、预计算、数据降级等优化,在数据安全方面增加数据审计功能,更好地提升了产品的用户体验。
总体方案
产品整体建设方案基于美团技术平台,总共分为源数据层、存储层、服务层及 WEB 层,整体架构如下所示:
图 1 整体架构图
源数据层:主要提供三部分数据,实时数据、离线数据、配置信息、维度信息。
存储层:源数据清洗后放入相应的存储引擎中,为服务层提供数据服务。
服务层:提供三部分功能,数据 API 服务、预计算服务、权限服务、数据审计服务。
Web 层:使用 Echarts 可视化数据。
数据层
数据架构
依托于美团提供的公共资源平台,数据架构按功能分为数据采集、数据处理、数据存储、数据服务四层,如下所示:
图 2 数据架构图
数据采集
数据来源主要有两种:业务上报的 Log 日志及数据库 Binlog 日志。这些日志通过美团日志中心进行采集后存储在消息中间件 Kafka 中,并按照不同的分类存储在不同的 Topic 中,供下游订阅。
数据处理
数据处理顾名思义,就是对采集的实时流进行逻辑处理,按业务需求输出对应的实时数据,因此这一步骤是流式计算的关键,分两步进行:数据加工、数据推送。
数据加工:数据加工通常需要在流式计算系统中进行,目前流行的流式处理系统主要有 Storm、Spark Streaming 系统及 Flink 系统,这些系统都能在不同的应用场景下发挥很好处理能力,并各有优缺点,如下图所示:
最终我们选择 Storm 作为实时数据处理框架,并借助公司提供的通用组件来简化拓扑开发流程和重复代码编码。例如,组件 MTSimpleLogBolt 的主要功能是将 Kafka 中读取的数据(Log or Binlog)解析成 Java 实体对象;组件 StormConfHelper 的功能是获取 Storm 作业应用配置信息。
数据推送:将处理好的数据推送到存储引擎中。
数据存储
数据加工完成后会被存储到实时存储引擎中,以提供给下游使用。目前常用的存储引擎主要有 MySQL、Druid、Elasticsearch、Redis、Tair 比较如下:
综上比较,由于实时数据量较大,且数据精度要求较高,因此我们最终选择交易存储使用 ES,流量存储使用 Druid,维度存储使用 Tair,中间数据存储使用 Redis;而离线数据,我们采用 Hive 和 Kylin 存储。
数据服务
将存储引擎数据统一对外提供查询服务,支持不同业务应用场景。
具体实现
实时流处理流程
整个数据层架构上主要分为实时数据和离线数据两部分:实时数据分为交易的 Binlog 日志和流量的 Log 日志,经过 Strom 框架处理后写入 Kafka,再经过 DataLinkStreaming 分别写入 ES 和 Druid;离线数据通过 Hive 处理写入 Kylin。
图 3 产品数据架构
下图所示为一条消息的处理流程:
图 4 数据关系
两个 Topic 分别是 order_base(主要存放订单基本信息:订单 id、订单状态、支付时间、票量、金额等);order_biz(主要存放订单的扩展信息:订单 id、订单类型、出发时间、到达时间、出发城市、到达城市)。我们最终要拿到一条包括上述全部内容的一条记录。
图 5 数据处理流程
具体例子:Bolt 在处理一条记录时,首先判断这条记录是 base 还是 biz,如果是 base 则写入缓存中 base 的 Category 中,如果是 biz 则写入 biz 的 Category 中。以 order_id 为 Key,如果是 base 则去和 biz 关联,如果 biz 存在则代表能够关联上,这时发送关联后的完整数据,同时删除该主键(order_key)记录;如果 biz 中不存在,则说明没关联上,这时可能 biz 的数据延迟或者是丢失,为了保证主数据的准确性,这时我们只发送 base 的数据,缓存中的数据保留不被删除。如果这条消息是 biz,则首先会更新缓存中该主键的 biz 记录,然后去和 base 关联,关联上则发送同时删除 base 中数据,否则不发送。此时我们会根据 ES 的 Update 特性去更新之前的数据。从现实效果来看保证了 99.2%的数据完整性,符合预期。
数据写入
在 Topic2es 的数据推送中,通过 DataLinkString 工具(底层 Spark Streaming)实现了 Kafka2es 的微批次同步,一方面通过多并发 batch 写入 ES 获得了良好的吞吐,另一方面提供了 5 秒的实时写入效率,保证了 ES 查询的实时可见。同时我们也维护了 Kafka 的 Offset,可以提供 At lease once 的同步服务,并结合 ES 的主键,可以做到 Exactly once,有效解决了数据重复问题。
ES 索引设计及优化
在数据写入 ES 过程中,由于数据量大,索引时间区间长,在建设索引时需要考虑合理设计保证查询效率,因此主要有以下三点优化:
写入优化 在通过 DataLinkString 写入 ES 时,在集群可接受的范围内,数据 Shuffle 后再分组,增加 Client 并发数,提升写入效率。
数据结构化 根据需要设计了索引的模版,使用了最小的足够用的数据类型。
按天建索引 通过模版按天建索引,避免影响磁盘 IO 效率,同时通过别名兼容搜索一致性。
设置合理的分片和副本数 如果分片数过少或过多都会导致检索比较慢。分片数过多会导致检索时打开比较多的文件,另外也会影响多台服务器之间通讯。而分片数过少为导至单个分片索引过大,所以检索速度慢。在确定分片数之前需要进行单服务单索引单分片的测试。 我们根据 索引分片数=数据总量/单分片数 设置了合理的分片数。
实时数据仓库模型
整个实时数据开发遵循大交通实时数仓的分层设计,在此也做一下简单介绍,实时数仓架构如下:
图 6 实时数仓架构
ODS 层:包含美团页面流量日志、模块事件日志以及用户操作的 Binlog 信息日志,是直接从业务系统采集过来的原始数据。
事实明细层:根据主题和业务过程,生成订单事实和流量事实。
汇总层:对明细层的数据扩展业务常用的维度信息,形成主题宽表。
App 层:针对不同应用在汇总层基础上加工扩展的聚合数据,如火车票在抢票业务下的交易数据汇总信息。
规范建模后,业务需求来临时,只需要在 App 层建模即可,底层数据统一维护。
中台服务层
后台服务主要实现 登陆验证和权限验证(UPM)、指标逻辑计算和 API、预计算服务、数据质量监控、数据审计功能。由于数据量大且实时性要求较高,在实现过程遇到如下挑战:
如何保证查询响应性能。
服务发生故障后,数据降级方案。
数据监控预警方案及数据出现问题解决方案。
针对以上问题,下面进行一一详述:
性能响应优化
服务层处理数据过程中,由于数据量大,在查询时需要一定的响应时间,所以在保证响应性能方面,主要做了以下优化:
图 7 性能响应优化
项目初始由于数据量不是很大,采用单线程直接查询 ES,能够满足需求。
随着节假日来临,数据量大增,并行查询人次增多,查询响应变慢,无法快速响应结果,因此引入缓存技术,将中间结果进行缓存,并在缓存有效期内,直接读取缓存数据大大提高了时间效率;并且在此基础上,引入 Master-Worker 多线程模式,将多指标查询拆分,并行查询 ES,使得查询响应大大提高。
虽然问题得到解决,但仍存在一个问题,就是每次都是现查 ES 及部分中间缓存结果,尤其是第一次查询,需要完全走 ES,这样就会让第一个查询数据的用户体验较差,因此引入预计算服务,通过定时调度任务,将部分重要维度下的指标进行预计算放入缓存,用户查询时直接读取缓存数据。而一些不常用的维度下的数据,采用的策略是,第一个用户查询时现查 ES,并将结果数据预加载到缓存,后续所有用户再次查询直接读缓存数据,这样既能保证用户体验,也不至于占用太多缓存空间。
数据降级方案
使用缓存避免不了出现一些问题,比如缓存失效、缓存雪崩等问题,针对缓存雪崩问题,通过设置不同 Key 的过期时间能够很好的解决;而对于缓存数据失效,我们有自己的数据降级方案,具体方案如下:
图 8 数据降级方案
预计算数据会分别在 Redis、Tair 和本地缓存中存储一份以保证查询效率,当查询 Redis 数据不存在时,会去 Tair 中读取数据,Tair 也为空时,会读取本地缓存,只有当本地缓存数据也为空时,才会现查 ES 做聚合计算,这样也会降低 ES 的查询压力。
数据监控
实时监控预警非常重要,在数据出现问题时,一方面能够及时通知我们快速定位修复数据,另一方面也能够及时周知业务同学,避免做出错误分析。基于此,我们做了两方面的实时监控,其一是对源实时流在 Storm 处理层面的监控,确保源实时流正确生产;其二是对展示的汇总数据进行监控,确保产品展示指标数据正常。
针对数据出现问题预警,我们在解决方案上规范了流程:
监控报警机制及时周知相关同学。
第一时间通过产品上方的黄条提示用户哪些数据异常。
快速定位问题,给出修复方案。
目前对于实时异常数据的修补,主要有两种方法:
针对特殊情况的数据修补方案第一灵活指定 Offset,重新消费 Kafka 数据。
预留了 Hive2es 的准实时重导功能,确保生产数据的准确和完整。
数据安全
在以数据取胜的时代,数据的安全不言而喻,我们采用公司提供的 UPM 权限接口进行二级权限管理并加入审计功能及水印功能,能够准确记录用户的所有访问以及操作记录,并且将日志数据格式化到数据库中,进行实时监控分析。
总结
实时数据可以为业务特定场景分析决策提供巨大支持,尤其对于大交通节假日及春运期间。在大交通实时战场沙盘产品化过程中,我们投入了大量的思考和实践,主要取得以下收益:
可视化的产品,为业务方实时分析提供极大便利,取得较好的反馈。
优化实时数据仓库建设,合理分层建模,规范命名设计,统一维度建设和指标口径,对外提供统一接口,保证数据规范准确。
在 Storm 框架下实时开发和数据写入方面积累了一定的经验。
服务层支持可配置信息,可以灵活配置个性化信息。
服务层性能及获取数据策略的优化,为用户带来更好的产品体验。
作者介绍
娣娣,美团数据开发工程师,2015 年加入美团,从事数据仓库建设、大数据产品开发工作。
晓磊,美团数据开发工程师,2017 年加入美团,从事数据仓库建设、大数据产品开发工作。
评论