携程实时用户行为服务作为基础服务,目前普遍应用在多个场景中,比如猜你喜欢(携程的推荐系统),动态广告,用户画像,浏览历史等等。
以猜你喜欢为例,猜你喜欢为应用内用户提供潜在选项,提高成交效率。旅行是一项综合性的需求,用户往往需要不止一个产品。作为一站式的旅游服务平台,跨业务线的推荐,特别是实时推荐,能实际满足用户的需求,因此在上游提供打通各业务线之间的用户行为数据有很大的必要性。
携程原有的实时用户行为系统存在一些问题,包括:1)数据覆盖不全;2)数据输出没有统一格式,对众多使用方提高了接入成本;3)日志处理模块是 web service,比较难支持多种数据处理策略和实现方便扩容应对流量洪峰的需求等。
而近几年旅游市场高速增长,数据量越来越大,并且会持续快速增长。有越来越多的使用需求,对系统的实时性,稳定性也提出了更高的要求。总的来说,当前需求对系统的实时性 / 可用性 / 性能 / 扩展性方面都有很高的要求。
一、架构
这样的背景下,我们按照如下结构重新设计了系统:
图 1:实时用户行为系统逻辑视图
新的架构下,数据有两种流向,分别是处理流和输出流。在处理流,行为日志会从客户端(App/Online/H5)上传到服务端的 Collector Service。Collector Service 将消息发送到分布式队列。数据处理模块由流计算框架完成,从分布式队列读出数据,处理之后把数据写入数据层,由分布式缓存和数据库集群组成。
输出流相对简单,web service 的后台会从数据层拉取数据,并输出给调用方,有的是内部服务调用,比如推荐系统,也有的是输出到前台,比如浏览历史。系统实现采用的是 Java+Kafka+Storm+Redis+Mysql+Tomcat+Spring 的技术栈。
- Java:目前公司内部 Java 化的氛围比较浓厚,并且 Java 有比较成熟的大数据组件
- Kafka/Storm:Kafka 作为分布式消息队列已经在公司有比较成熟的应用,流计算框架 Storm 也已经落地,并且有比较好的运维支持环境。
- Redis: Redis 的 HA,SortedSet 和过期等特性比较好地满足了系统的需求。
- MySQL: 作为基础系统,稳定性和性能也是系统的两大指标,对比 nosql 的主要选项,比如 hbase 和 elasticsearch,十亿数据级别上 mysql 在这两方面有更好的表现,并且经过设计能够有不错的水平扩展能力。
目前系统每天处理 20 亿左右的数据量,数据从上线到可用的时间在 300 毫秒左右。查询服务每天服务 8000 万左右的请求,平均延迟在 6 毫秒左右。下面从实时性 / 可用性 / 性能 / 部署几个维度来说明系统的设计。
二、实时性
作为一个实时系统,实时性是首要指标。线上系统面对着各种异常情况。例如如下几种情况:
- 突发流量洪峰,怎么应对;
- 出现失败数据或故障模块,如何保证失败数据重试并同时保证新数据的处理;
- 环境问题或 bug 导致数据积压,如何快速消解;
- 程序 bug,旧数据需要重新处理,如何快速处理同时保证新数据;
系统从设计之初就考虑了上述情况。
首先是用 storm 解决了突发流量洪峰的问题。storm 具有如下特性:
图 2:Storm 特性
作为一个流计算框架,和早期大数据处理的批处理框架有明显区别。批处理框架是执行完一次任务就结束运行,而流处理框架则持续运行,理论上永不停止,并且处理粒度是消息级别,因此只要系统的计算能力足够,就能保证每条消息都能第一时间被发现并处理。
对当前系统来说,通过 storm 处理框架,消息能在进入 kafka 之后毫秒级别被处理。此外,storm 具有强大的 scale out 能力。只要通过后台修改 worker 数量参数,并重启 topology(storm 的任务名称),可以马上扩展计算能力,方便应对突发的流量洪峰。
对消息的处理 storm 支持多种数据保证策略,at least once,at most once,exactly once。对实时用户行为来说,首先是保证数据尽可能少丢失,另外要支持包括重试和降级的多种数据处理策略,并不能发挥 exactly once 的优势,反而会因为事务支持降低性能,所以实时用户行为系统采用的 at least once 的策略。这种策略下消息可能会重发,所以程序处理实现了幂等支持。
storm 的发布比较简单,上传更新程序 jar 包并重启任务即可完成一次发布,遗憾的是没有多版本灰度发布的支持。
图 3:Storm 架构
在部分情况下数据处理需要重试,比如数据库连接超时,或者无法连接。连接超时可能马上重试就能恢复,但是无法连接一般需要更长时间等待网络或数据库的恢复,这种情况下处理程序不能一直等待,否则会造成数据延迟。实时用户行为系统采用了双队列的设计来解决这个问题。
图 4:双队列设计
生产者将行为纪录写入 Queue1(主要保持数据新鲜),Worker 从 Queue1 消费新鲜数据。如果发生上述异常数据,则 Worker 将异常数据写入 Queue2(主要保持异常数据)。
这样 Worker 对 Queue1 的消费进度不会被异常数据影响,可以保持消费新鲜数据。RetryWorker 会监听 Queue2,消费异常数据,如果处理还没有成功,则按照一定的策略(如下图)等待或者重新将异常数据写入 Queue2。
图 5:补偿重试策略
另外,数据发生积压的情况下,可以调整 Worker 的消费游标,从最据流程也随之改费,保证最新数据得到处理。中间未经处理的一段数据则启动 backupWorker,指定起止游标,在消费完指定区间的数据之后,backupWorker 会自动停止。(如下图)
图 6:积压数据消解
三、可用性
作为基础服务,对可用性的要求比一般的服务要高得多,因为下游依赖的服务多,一旦出现故障,有可能会引起级联反应影响大量业务。项目从设计上对以下问题做了处理,保障系统的可用性:
- 系统是否有单点?
- DB 扩容 / 维护 / 故障怎么办?
- Redis 维护 / 升级补丁怎么办?
- 服务万一挂了如何快速恢复?如何尽量不影响下游应用?
首先是系统层面上做了全栈集群化。kafka 和 storm 本身比较成熟地支持集群化运维;web 服务支持了无状态处理并且通过负载均衡实现集群化;Redis 和 DB 方面携程已经支持主备部署,使用过程中如果主机发生故障,备机会自动接管服务;通过全栈集群化保障系统没有单点。
另外系统在部分模块不可用时通过降级处理保障整个系统的可用性。先看看正常数据处理流程:(如下图)
图 7:正常数据流程
在系统正常状态下,storm 会从 kafka 中读取数据,分别写入到 redis 和 mysql 中。服务从 redis 拉取(取不到时从 db 补偿),输出给客户端。DB 降级的情况下,数据流程也随之改变(如下图)
图 8:系统降级 -DB
当 mysql 不可用时,通过打开 db 降级开关,storm 会正常写入 redis,但不再往 mysql 写入数据。数据进入 reids 就可以被查询服务使用,提供给客户端。另外 storm 会把数据写入一份到 kafka 的 retry 队列,在 mysql 正常服务之后,通过关闭 db 降级开关,storm 会消费 retry 队列中的数据,从而把数据写入到 mysql 中。redis 和 mysql 的数据在降级期间会有不一致,但系统恢复正常之后会通过 retry 保证数据最终的一致性。redis 的降级处理也类似(如下图)
图 9:系统降级 -Redis
唯一有点不同的是 redis 的服务能力要远超过 mysql。所以在 redis 降级时系统的吞吐能力是下降的。这时我们会监控 db 压力,如果发现 mysql 压力较大,会暂时停止数据的写入,降低 mysql 的压力,从而保证查询服务的稳定。为了降低故障情况下对下游的影响,查询服务通过 Netflix 的 Hystrix 组件支持了熔断模式(如下图)。
图 10:Circuit Breaker Pattern
在该模式下,一旦服务失败请求在给定时间内超过一个阈值,就会打开熔断开关。在开关开启情况下,服务对后续请求直接返回失败响应,不会再让请求经过业务模块处理,从而避免服务器进一步增加压力引起雪崩,也不会因为响应时间延长拖累调用方。
开关打开之后会开始计时,timeout 后会进入 Half Open 的状态,在该状态下会允许一个请求通过,进入业务处理模块,如果能正常返回则关闭开关,否则继续保持开关打开直到下次 timeout。这样业务恢复之后就能正常服务请求。另外,为了防止单个调用方的非法调用对服务的影响,服务也支持了多个维度限流,包括调用方 AppId/ip 限流和服务限流,接口限流等。
四、性能 & 扩展
由于在线旅游行业近几年的高速增长,携程作为行业领头羊也蓬勃发展,因此访问量和数据量也大幅提升。公司对业务的要求是可以支撑 10 倍容量扩展,扩展最难的部分在数据层,因为涉及到存量数据的迁移。
实时用户行为系统的数据层包括 Redis 和 Mysql,Redis 因为实现了一致性哈希,扩容时只要加机器,并对分配到新分区的数据作读补偿就可以。
Mysql 方面,我们也做了水平切分作为扩展的准备,分片数量的选择考虑为 2 的 n 次方,这样做在扩容时有明显的好处。因为携程的 mysql 数据库现在普遍采用的是一主一备的方式,在扩容时可以直接把备机拉平成第二台(组)主机。假设原来分了 2 个库,d0 和 d1,都放在服务器 s0 上,s0 同时有备机 s1。扩容只需要如下几步:
- 确保 s0 -> s1 同步顺利,没有明显延迟
- s0 暂时关闭读写权限
- 确认 s1 已经完全同步 s0 更新
- s1 开放读写权限
- d1 的 dns 由 s0 切换到 s1
- s0 开放读写权限
迁移过程利用 mysql 的复制分发特性,避免了繁琐易错的人工同步过程,大大降低了迁移成本和时间。整个操作过程可以在几分钟完成,结合 DB 降级的功能,只有在 dns 切换的几秒钟时间会产生异常。
整个过程比较简单方便,降低了运维负担,一定程度也能降低过多操作造成类似 gitlab 式悲剧的可能性。
五、部署
前文提到 storm 部署是比较方便的,只要上传重启就可以完成部署。部署之后由于程序重新启动上下文丢失,可以通过 Kafka 记录的游标找到之前处理位置,恢复处理。另外有部分情况下程序可能需要多版本运行,比如行为纪录暂时有多个版本,这种情况下我们会新增一个 backupJob,在 backupJob 中运行历史版本。
作者介绍
陈清渠,毕业于武汉大学,多年软件及互联网行业开发经验。14 年加入携程,先后负责了订单查询服务重构,实时用户行为服务搭建等项目的架构和研发工作,目前负责携程技术中心基础业务研发部订单中心团队。
评论