产品战略专家梁宁确认出席AICon北京站,分享AI时代下的商业逻辑与产品需求 了解详情
写点什么

闲鱼端侧如何实现实时 CEP 引擎

  • 2020-02-16
  • 本文字数:5487 字

    阅读完需:约 18 分钟

闲鱼端侧如何实现实时CEP引擎

闲鱼端侧如何实现实时 CEP 引擎

背景

用户来闲鱼,主要是为了获得自己关心的内容。随着闲鱼的体量越来越大,内容也变得越来越丰富。闲鱼基于用户画像,可以将用户关心的内容推送给用户。具体在哪些场景下才需要触发推送?我们定义了很多触发规则,包括停留时长、点击路径等。起初我们把触发规则的逻辑放在服务端(Blink)运行。但实践下来发现 Blink 存在诸多限制:


  • 服务端要对客户端埋点进行数据清洗,考虑到闲鱼的 DAU 已经突破 2000w,这个量是非常庞大的,非常消耗服务端资源;

  • Blink 的策略是实时执行的,同样因为资源问题,现在只能同时上线十几个策略。


如何解决这些问题呢,我们开始考虑能否将 Blink 的策略跑在客户端!

CEP 模型

Blink,作为是 Flink 的一个分支,最初是阿里巴巴内部创建的,针对 Flink 进行了改进,所以我们这里还是围绕 Flink 讨论。CEP(Complex Event Process)是 Flink 中的一个子库,用来快速检测无尽数据流中的复杂模式。

Flink CEP

Flink 的 CEP 的核心是 NFA(Non-determined Finite Automaton),全称叫不确定的有限状态机。提到 NFA,就不得不提 Jagrati Agrawal 等撰写的关于 NFA 模型的论文《Efficient Pattern Matching over Event Streams》,本篇论文中描述了 NFA 的匹配原理。



上面这张图,就是一个不确定的有限状态机,它由状态(State)还有之间的连线(StateTransition)组成的。


  • 状态(State):状态是根据 flink 脚本里面的代码来决定的,最终会有一个 $end$的 Final 状态

  • 转换(StateTransition):State 的转换条件,包括 take/proceed/ignore


不同的条件,代表的含义不同:


  • take: 满足条件,获取当前元素,进入下一状态

  • proceed:不论是否满足条件,不获取当前元素,直接进入下状态(如 optional)并进行判断是否满足条件。

  • ignore:不满足条件,忽略,进入下一状态。


我们只要在端上实现这样一个状态机,就可以实现一个 CEP 引擎。

Python CEP

对于客户端来说,首先要解决的问题是如何构建一个 CEP 环境。经过调研,可以复用集团的端智能容器(Walle),作为 Python 容器可以执行 cep 的策略。


在构建 NFA 之前,首先要解决的一个问题是数据来源,手淘信息流团队有一套完整的解决方案 BehaviX/BehaviR,可以对 UT 埋点进行结构化,能很好的结合 Walle 容器来触发策略。有了事件来源,还需要解决的是 Python 脚本如何执行。Walle 平台可以将多个 Python 脚本打包下载并执行,因此,我们可以将 CEP 封装成一个 Python 的库,然后跟策略脚本一起下发。


最终的整体架构设计如下图所示:



本文重点介绍下如何用 Python 来实现一个 CEP 的编译器,这个编译器主要用来将 CEP 的描述语言转换成为 NFA。

编译器原理

在 Flink 中,java 侧会有一套完善的 API 来编写一个策略脚本,《efficient Pattern Matching over Event Streams》论文中还定义了一套完备的 DSL 描述语言,也是会转化成 java 文件去调用这些 API 去完成匹配。那么接下来会重点讨论,flink 是如何将上述 API 转化成 NFA 去匹配,以及 Python CEP 如何实现上述一套完整 API 接口。

Pattern

在 Flink 里面,是通过 Pattern来构建这个 NFA,首先用它描述这个不确定性状态机。首先是构建一个 Pattern的一个链表,得到这个链表之后,会将每个 Pattern 映射成为 State的图,点与点之间会通过 StateTransition来连接。以下面的 Python 代码为例,看下如何 API 是如何工作的:


例如,需要创建这样一个规则,描述如下:


以 start 事件开始,后续跟随一个 middle 的事件,后面紧跟着一个 end 事件作为结尾


用 Pattern 编写如下所示:


.followed_by('middle').where(SimpleCondition())\.next_('end').where(SimpleCondition())
复制代码


这个代码里面声明了 3 个 Pattern,依次命名为 startmiddleend。Pattern 里面保存了指向前面节点的引用 previous,整个 Pattern 链表构建完如下图所示:



最终拿到的是 end节点的一个引用 Ref,Pattern 中会有一个变量指向前一个节点,这样就可以得到一个 Pattern 的反向链表。


Pattern 的对外接口定义如下:


# 静态方法,用来生成起始的pattern@staticmethoddef begin(self, name):pass# 标记紧接着的事件def followed_by(self, name):pass# 标记不需要紧跟的事件def not_followed_by(self, name):pass# 标记紧跟的事件def next_(self, name):pass# 标记事件循环次数def times(self, times):pass# 标记当前事件触发的条件def where(self, condition):pass# 标记当前事件的and条件def and_(self, condition):pass# 标记当前事件的or条件def or_(self, condition):pass# 用于聚合def group_by(self, fields):pass# 用于聚合,渠道特定字段的值def fields(self, key_by_state_name, field):pass# 用于聚合,统计事件具体的数量def count(self, field, condition):pass
复制代码


不同接口会生成不同的消费策略的节点,具体细节可以参考 StateTransition。有了 Pattern 链表,接下来就需要编译器(Compiler)了,它主要是将 Pattern 链表转化成 NFA 图,首先来看下 NFA 的 2 个核心组件:StateStateTransition

State

结构定义如下:



def __init__(self, name, state_type): self.__name = name # 节点的名称,同Pattern的名称 self.__state_type = state_type # 节点的类型:Start/Normal/Stop/Final self.__state_transitions = [] # 到其他节点的边
复制代码


State 一共有 4 种类型:Start/Final/Normal/Stop


生成 NFA 的过程就是将反向解析 Pattern 链表的过程,大概的过程如下:


  1. 创建一个 $end$的结束节点( Final

  2. 再从后往前创建每个 state 节点,作为中间节点( Normal/Stop

  3. 最后创建一个开始节点( Start)State 的名称就是 Pattern 的节点名称,创建完成之后如下图所示。


Transition

State 代表了当前状态机的状态,不同状态之前的切换定义成 StateTransition


结构定义如下:



def __init__(self, source_state, action, target_state, condition): self.__source_state = source_state # 开始的State节点 self.__action = action # 具体action类型:take/ignore/proceed self.__target_state = target_state # 结束的State节点 self.__condition = condition # 节点之间的判断条件
复制代码


边的生成逻辑跟 Pattern 的事件消费策略相关,以下是事件消费策略:


    STRICT = 0# 严格匹配下个    SKIP_TILL_NEXT = 1# 跳过下一个    SKIP_TILL_ANY = 2# 跳过任意一个    NOT_FOLLOW = 3# 非跟随模式    NOT_NEXT = 4# 非紧邻模式
复制代码


不同的消费策略,得到的状态机如下图所示:



  • STRICT: 如果命中了事件了,会进到下个状态

  • SKIP_TILL_NEXT: 如果命中了会进入下一个状态,否则会再当前节点循环,进入 ignore 的边

  • SKIP_TILL_ANY: 不管是否命中条件,都会一直在当前状态循环

  • NOT_FOLLOW: 如果遇到了一个匹配的,就会进入 Stop 状态

  • NOT_NEXT: 如果命中一条,则进入 Stop 状态


在 Pattern 中,不同的接口会创建出不同的消费策略节点,例如 followed_by接口会创建 SKIP_TILL_NEXT的节点。

Times

如果有的规则,要求特定的事件,循环出现几次,那现在就要用到 times 接口。比如浏览 3 次宝贝这个规则,规则就可以写成:


SimpleCondition
复制代码


最终就会得到一个 Times=3的 Pattern,编译器在拿到这个 Pattern 之后,一样先创建一个的 Final 节点,在处理 times 的时候,会创建重复的节点,只不过名称不同,不同的点之间用 take 链接起来,如下图所示:


Python CEP 聚合

Flink 是通过 InputStream 将匹配的事件转移给 CEPOperator,执行聚合操作;但是在客户端的聚合,一次执行就一个事件流,所以可以将聚合简化到一次匹配过程中,因此我们对于 Flink 的聚合操作做了改造,使其更适合端上的场景。


那么聚合的脚本写法如下:


.followed_by('middle').where(SimpleCondition())\.next_('end').where(self.end_filter)\.group_by('group_by').fields('start', 'userId')
复制代码


这里声明了,以 start节点中的 userId作为聚合的节点,我们就会得到如下的 Pattern链表:



在解析 group_by节点的时候,我们需要做个特殊处理,判断如果有聚合节点,我们就需要再 $end$节点和前面节点之间插入一个聚合的节点和哨兵位节点,哨兵位节点命名为 $aggregationStartState$,最终效果如下图所示:



在 NFA 匹配的过程中,当匹配结束,就可以将匹配到的事件流,传到聚合节点,再进一步聚合。$aggregationStartState$节点和 group_by节点之间,是通过 proceed 结合,不需要满足特定条件就可以执行。


具体的实现过程如下,可见与 Flink 不同的是,我们创建了一个特殊的 State节点 AggregationState


def __create_aggregation_state(self, sink_state):# 渠道聚合节点的condition    _aggregation_condition = self.__current_pattern.get_aggregation_condition()
# 创建AggregationState not_next = AggregationState( self.__current_pattern.get_name(),StateType.Normal, _aggregation_condition.get_key_by_state_name(), _aggregation_condition.get_field()) self.__states.append(not_next)
# 获取take的条件 take_condition = self.__get_take_condition(self.__current_pattern) not_next.add_take(sink_state, take_condition)
# 将游标指向上一个节点 self.__following_pattern = self.__current_pattern self.__current_pattern = self.__current_pattern.get_previous()
return not_nex
复制代码

Show me the code

讲了太多原理的东西,接下来看下代码里面如何工作的,先来看下如何来编写一个 CEP 策略。

策略脚本

现在看下如何写一个完整的 python 版本的 cep 规则,以宝贝详情页为例,规则描述如下:


需要匹配用户查看 3 次宝贝详情页


那规则的写法如下:


_pattern = Pattern.begin('e1').where(KVCondition('scene', 'Page_xyItemDetail')).times(3)
# 2. 将需要匹配的事件流_batch_data和待匹配的Pattern# CEP内部会先将pattern转化成NFA,然后再用NFA去匹配事件流_cep = CEP.pattern(_batch_data['eventSeq'], _pattern)
# 用来选择的逻辑def select_function(data):pass
# 3. 匹配完成,通过cep的select接口查询匹配到的结果self.result = _cep.select(select_function)
复制代码


CEP.pattern()函数里面,会先创建 NFA,然后去进行匹配,可见整个匹配策略脚本非常的短小精悍。

生成 NFA

如下代码用来将 Pattern链表转化成 NFA图:


if self.__current_pattern.get_quantifier().get_consuming_strategy() == ConsumingStrategy.NOT_FOLLOW:raiseException('NotFollowedBy is not supported as a last part of a Pattern!')# 校验Pattern的名称,必须唯一self.__check_pattern_name_uniqueness()# 校验Pattern的策略self.__check_pattern_skip_strategy()# 首先创建Final节点sink_state = self.__create_ending_state()# 判定是否有聚合节点if self.__current_pattern.get_aggregation_condition() isnotNone:# 首先创建聚合节点    sink_state = self.__create_aggregation_state(sink_state)# 然后创建聚合几点的起始节点    sink_state = self.__create_aggregation_start_state(sink_state)# 创建状态机中的中间节点,此函数会循环知道Start节点的Patternsink_state = self.__create_middle_states(sink_state)# 最后创建Start节点self.__create_start_state(sink_state)# 根据state列表和window来创建NFAreturn NFA(self.__states, self.__window_time, False)
复制代码

效果

闲鱼已经上了几个策略,整体看来比较稳定,不过还有很多优化的空间。从实测效果来看,端侧从触发策略到执行 Action 用时不会超过 1s,其中还包含了一次网络请求的时间。

性能数据

  • 执行时间



单个脚本,执行时间大概在 100ms 左右。


  • 内存使用



现在内存使用峰值还是比较高,大概在 15M 左右。关于内存过大的问题,目前正在讨论一个方案:Python CEP 可以持久化当前 NFA 的状态,然后再触发策略的时候,只带从未触发过的事件流,避免很多重复计算。之前运行一次脚本要处理 500 个事件,现在可能就缩减到 100 之内,可以极大的减小内存消耗。同时带来另外一个问题,就是执行脚本的都会有一个 IO 操作,耗时会增加。

Flink 与客户端对比

现在对于 Flink 和客户端 Python CEP 做一个简单的对比:



相比 Flink,端侧 CEP 还是有它的优势,在端侧可以直接利用客户端的埋点信息进行计算,运行时长缩减了 80%,而且也支持动态发布。Python 脚本支持 2 端通投,在保证 2 端埋点一致的前提下,也极大的减少了维护成本。

未来

现在端计算还存在很多待优化的地方:


  1. 端计算是用 Python 实现,无法做到像 Flink 的状态机常驻内存,每次都要重新创建匹配,带来了额外的消耗

  2. 在事件流的清洗上面,现在是通过回朔拿到之前的事件流,存在大量的重复计算,后续可以借鉴 Flink 的 Window 机制来进行优化。

  3. 目前编译器暂时还不支持 Group Pattern,后续还要对其进行扩展。

  4. Python 脚本现在还是需要手动编写,后续还可以考虑通过 DSL 来自动生成。


整体看来,Python 脚本执行策略还是有一定的性能损耗,不管是在创建 NFA 或者是匹配过程,后续可以考虑将匹配引擎用 C++实现,然后真正做到常驻内存,从而做到高效的执行效率。后期做到 NFA 持久化之后,C++也可以复用整套持久化协议,从而优化整个引擎的执行效率。除此之外,策略在执行的过程中,还可以考虑用 TensorFlowLite 优化参数策略参数,从而真正做到千人前面的策略。


本文转载自公众号闲鱼技术(ID:XYtech_Alibaba)。


原文链接


https://mp.weixin.qq.com/s/_jPQvX3aiKp4ScnqgHgyow


2020-02-16 10:002986

评论

发布
暂无评论
发现更多内容

2022年中国智慧医疗行业洞察

易观分析

智慧医疗

云效发布策略指南|滚动、分批、灰度怎么选?

阿里云云效

云计算 阿里云 云原生 持续交付 发布策略

极致用云,数智护航

阿里云云效

阿里云 DevOps 运维 云原生 运维安全

基于 Kafka 的实时数仓在搜索的实践应用

vivo互联网技术

kafka 服务器 搜索 数据舱

边缘计算场景下Service Mesh的延伸和扩展

华为云原生团队

开源 边缘计算 边缘技术 边缘 边缘云

一个关于 += 的谜题

AlwaysBeta

Python 编程语言

郑州轻工业大学——HarmonyOS宠物健康系统的开发分享

HarmonyOS开发者

HarmonyOS 健康检查

鉴机识变,面向未来|RocketMQ Summit 2022 即将来袭

阿里巴巴云原生

阿里云 开源 RocketMQ 云原生 开源消息队列

混合编程:如何用pybind11调用C++

华为云开发者联盟

c++ Python API 混合编程 pybind11

Android技术分享| 【你画我猜】Android 快速实现

anyRTC开发者

音视频 移动开发 互动白板 Andriod 你画我猜

web前端培训:vue3源码中细节知多少

@零度

Vue 前端开发

oracle数据库审计用什么数据库审计软件好?可以用什么方式部署?

行云管家

数据库 IT运维 数据库审计

混合云管平台哪家强?采购时候需要注意什么?

行云管家

混合云 云管平台

Deep dive #2:API 与 Python SDKs 详解

Zilliz

Python 数据库

java培训:Java类加载机制的理解

@零度

JAVA开发 类加载机制

优化| 手把手教你学会杉数求解器(COPT)的安装、配置与测试

杉数科技

线性规划 求解器 优化求解器 混合整数规划 杉数科技

前所未有的 Milvus 源码架构解析

Zilliz

互联网人的命运,就是活到30岁都难?

码农参上

人生 互联网人 打工人

模块八作业

黄秀明

「架构实战营」

如何通过 draftjs 设计留言框

全象云低代码

前端 低代码 留言 draftjs 留言框

2022重磅:增长法则-巧用数字营销 突破企业困局

博文视点Broadview

理论+实践,带你掌握动态规划法

华为云开发者联盟

AI 算法 动态规划法 子问题

ModStartCMS模块化建站系统 v3.3.0 组件功能升级,事件触发增强

ModStart开源

乘冬奥之风:北京2022年冬奥会用户信息获取偏好专题分析

易观分析

冬奥会用户分析

产品经理:「点这里,我要跳到任何我想跳的页面」—— 解耦提效神器「统跳路由」

百瓶技术

ios 前端 客户端 路由

“pip不是内部或外部命令,也不是可运行的程序或批处理文件” 到底有多么神秘

华为云开发者联盟

Python pip 批处理 scripts pip install

【重磅发布】蚂蚁动态卡片,让 App 首页实现敏捷更新

蚂蚁集团移动开发平台 mPaaS

ios android 前端 mPaaS

80 行代码实现简易 RxJS

CRMEB

移动开发er,10万奖金等你来战!

Speedoooo

活动 前端开发 移动开发 黑客马拉松 黑客松

Hudi Bucket Index 在字节跳动的设计与实践

字节跳动数据平台

数据库 字节跳动 数据湖 Hudi

Go 语言入门很简单:读写锁

宇宙之一粟

读写锁 Go 语言 2月月更

闲鱼端侧如何实现实时CEP引擎_容器_景松_InfoQ精选文章