统计流程
所有流计算统计的流程都是:
接入数据源
进行多次数据转换操作(过滤、拆分、聚合计算等)
计算结果的存储
其中数据源可以是多个,数据转换的节点处理完数据可以发送到一个和多个下一个节点继续处理数据。Flink 程序构建的基本单元是 stream 和 transformation(DataSet 实质上也是 stream)。stream 是一个中间结果数据,transformation 对数据的加工和操作,该操作以一个或多个 stream 为输入,计算输出一个或多个 stream 为结果,最后可以 sink 来存储数据。
包括数据源,每一次发射出来的数据结果都通过 DataStream 来传递给下一级继续处理。每一个 Transformation 要有 2 步:
处理数据
将处理完的数据发射出去
Flink 的数据源
Flink 提供数据源只需要实现 SourceFunction 接口即可。SourceFunction 有一个抽象实现类 RichParallelSourceFunction。继承该实现类实现 3 个方法,既可以自定义 Source
public void open(Configuration parameters) //初始化时调用,可以初始化一些参数
public void run(SourceContext<T> ctx)//发送数据,在该方法里调用 ctx 的 collect 方法将数据发射出去。下面例子是每 20 秒发送一个 Order 类型的实体:
Flink 的数据转换操作
Flink 针对于不同的场景提供了不同的解决方案,减少了用户去关注处理过程中的一些逻辑和效率问题。常见的操作有下面这些:
“map” 是做一些映射,比如我们把两个字符串合并成一个字符串,把一个字符串拆成两个或者三个字符串
“flatMap” 是把一个记录拆分成两条、三条、甚至是四条记录,例如把一个字符串分割成一个字符数组
“Filter” 类似于过滤
“keyBy” 等效于 SQL 里的 group by
“aggregate” 是一个聚合操作,如计数、求和、求平均等
“reduce” 类似于 MapReduce 里的 reduce
“join” 操作等同数据库里面的 join
“connect” 实现把两个流连成一个流
常见的操作有 filter、map、flatMap、keyBy(分组)、aggregate(聚合) 等,也可以在官方文档找到其他相关的 Function,具体的使用方式后面的例子中会体现。
窗口
流数据的计算可以把连续不断的数据按照一定的规则拆分成大量的片段,在片段内进行统计和计算。比如可以把一小时内的数据保存到一个小的数据库表里,然后对这部分数据进行计算和统计,这时流计算是提供自动切割的一种机制-窗口。常见的窗口有:
以时间为单位的 Time Window,例如:每 1 秒钟、每 1 个小时等
以数据的数量为单位的 Count Window,例如:每一百个元素
Flink 给我们提供了一些通用的时间窗口模型。
1、Tumbling Windows(不重叠的)
数据流中的每一条数据仅属于一个窗口。每一个都有固定的大小,同时窗口间彼此之间不会出现重叠的部分。如果指定一个大小为 5 分钟的 tumbling 窗口,那么每 5 分钟便会启动一个窗口,如下图所示:
2、Sliding Windows(重叠的)
与 Tumbling 窗口不同的是,在构建 Sliding 窗口时不仅需要指定窗口大小,还会指定一个窗口滑动参数(window slide parameter)来确定窗口的开始位置。因此当窗口滑动参数小于窗口大小时,窗口之间可能会出现重复的区域。例如,当你指定窗口大小为 10 分钟,滑动参数为 5 分钟时,如下图所示:
3、Session Windows (会话窗口)
当数据流中一段时间没有数据,则 Session 窗口会关闭。因此,Session Windows 没有固定的大小,无法计算 Session 窗口的开始位置。事实上,Session 窗口最终是通过窗口合并来实现不规则窗口的。
Flink 中的时间概念
Flink 中有 3 中不同的时间概念
1. 处理时间 Processing Time,指的是上面进行 Transformation 操作时,当时的系统时间。
2. 事件时间 Event Time,指的是业务发生时间,每一条业务记录上会携带一个时间戳,需要指定数据中那一个属性中获取。
在按业务发生时间统计数据时,面临一个问题,当接收数据的时间是无序的时候,我们什么时间去触发聚合计算,不可能无限制的等待。Flink 引入了 Watermark 的概念,它是给窗口看的,是告诉窗口最长等待的时间是多久,超过这个时间的数据就抛弃不再处理。
3. 提取时间 Ingestion Time,指的是数据进入 Flink 当时的系统时间。
订单统计的例子
第一步:构建环境
第二步:添加数据源
其中为了实现 20 秒发送一次数据,通过调用 setParallelism(1),将数据源的实例数设置为 1(仅有 1 个实例);
第三步:数据校验
第四步:设置时间戳和 Watermarks
前面已经设置了使用 EventTime 来处理数据,那么在进行时间窗口计算前必须给数据分配获取时间戳的字段,这里设置了 Order 的 timestamp 字段为 EventTime,同时也设置了一个 1 分钟的 Watermarks,表示最多等待 1 分钟(为了不无限的等待下去),业务发生时间超过系统时间 1 分钟的数据都不进行统计。
第五步:数据分组
这里设置了以 Order 中 biz 字段进行分组,就意味着所有 biz 相同的数据会进入到同一个时间窗口中进行计算。
第六步:指定时间窗口、聚合计算
这里设置了一个以 1 分钟为单位的不重叠的 TumblingEventTimeWindow。然后使用 OrderSumAggregator 来进行聚合计算。需要注意的是如果最前面设置的是使用 ProcessTime 来处理数据,这里的窗口就会变成 TumblingProcessTimeWinwow,前后必须是一一对应。
聚合计算
上面例子中比较核心的部分就是聚合计算,也就是 OrderSumAggregator。它只需实现 Flink 给我们提供 AggregateFunction 接口重写其方法即可。
ACC createAccumulator();//创建一个数据统计的容器,提供给后续操作使用。
ACC add(IN in, ACC acc);//每个元素被添加进窗口的时候调用。
第一个参数是添加进窗口的元素,第二个参数是统计的容器(上面创建的那个)。
OUT getResult(ACC acc);//窗口统计事件触发时调用来返回出统计的结果。
ACC merge(ACC acc1, ACC acc2);//只有在当窗口合并的时候调用,合并 2 个容器。
其中这个容器根据情况也可以是在内存里提供,也可以是在其他存储设备中提供。通过上面的例子就实现了按照业务时间来统计每分钟内的订单数量,订单最多可以延迟 1 分钟上报。但是为了等待 1 分钟内上报的数据,造成了数据会延迟 1 分钟进行统计,例如 8 点 02 分才能统计到 8 点到 8 点 01 分上报的数据。
为了解决这个问题,可以给 window 再设置一个自定义的统计触发器,这个触发器可以在整点触发统计事件(也就是调用上面的 getResults 方法),这样就达到了 8 点到 8 点 01 分这个时间段的数据,在 8 点 01 分统计一次,在 8 点 02 分再重新统计一次(加上后面 1 分钟上报的数据)。
总结
在开发运维上来讲,Flink 相较于其他计算引擎有比较强大和灵活的窗口 api 支持,而且自身提供的大量 Function 能覆盖日常数据转换中的大部分需求,但目前来说相关资料还是较少,学习只能依赖官网的 API 和阅读源码。最后,感谢大家一如既往的对数聚力产品和团队的支持!我们会加倍努力,为大家做出更好用的数据工具!
评论