![Apache Beam 架构原理及应用实践](https://static001.infoq.cn/resource/image/ea/8d/eaaa94e841b1aec46c8afd496615508d.png)
Apache Beam 是什么?
1. Apache Beam 的前世今生
![](https://static001.infoq.cn/resource/image/5e/8e/5e143e348dd975a84c05f07a6844ae8e.png)
大数据起源于 Google 2003 年发布的三篇论文 GoogleFS、MapReduce、BigTable 史称三驾马车,可惜 Google 在发布论文后并没有公布其源码,但是 Apache 开源社区蓬勃发展,先后出现了 Hadoop,Spark,Apache Flink 等产品,而 Google 内部则使用着闭源的 BigTable、Spanner、Millwheel。这次 Google 没有发一篇论文后便销声匿迹,2016 年 2 月 Google 宣布 Google DataFlow 贡献给 Apache 基金会孵化,成为 Apache 的一个顶级开源项目。然后就出现了 Apache Beam,这次不它不是发论文发出来的,而是谷歌开源出来的。2017 年 5 月 17 日 发布了第一个稳定版本 2.0。
2. Apache Beam 的定义
![](https://static001.infoq.cn/resource/image/4d/77/4dbd22cfce1b44a91641f254fa78bc77.png)
Apache Beam 的定义如上图,其定位是做一个统一前后端的模型。其中,管道处理和逻辑处理是自己的,数据源和执行引擎则来自第三方。那么,Apache Beam 有哪些好处呢?
Apache Beam 的优势
1. 统一性
![](https://static001.infoq.cn/resource/image/62/2b/620af50436a53f36dfdd0baba85bd42b.png)
① 统一数据源,现在已经接入的 java 语言的数据源有 34 种,正在接入的有 7 种。Python 的 13 种。这是部分的数据源 logo,还有一些未写上的,以及正在集成的数据源。基本涵盖了整个 IT 界每个时代的数据源,数据库。
![](https://static001.infoq.cn/resource/image/dc/b0/dc3c51a562034d2a3113db1cd1e778b0.png)
② 统一编程模型,Beam 统一了流和批,抽象出统一的 API 接口。
![](https://static001.infoq.cn/resource/image/35/52/35d71075b22784610aeab6bf902ab252.png)
③ 统一大数据引擎,现在支持性最好的是 flink,spark,dataflow 还有其它的大数据引擎接入进来。
2. 可移植性
![](https://static001.infoq.cn/resource/image/86/7d/8628c74966274f8b2f8bdf4523995b7d.png)
Beam 的 jar 包程序可以跨平台运行,包括 Flink、Spark 等。
3. 可扩展性
![](https://static001.infoq.cn/resource/image/fc/b4/fcb5092bf0674c040b34f288131192b4.png)
很多时候,随着业务需求的不断变化,用户的需要也随之变化,原来 Apache Beam 的功能可能需要进行扩展。程序员就会根据不同的需求扩展出新的技术需求,例如我想用 spark 新特性,能不能重写一下 sparkrunner 换个版本。我想重写一下 kafkaIO 可以吗?对于数据的编码,我可以自定义吗?最后干脆我感觉 Pulsar 技术不错,我想自己写个 SDKIO,集成进去可以不?答案都是可以的。Apache Beam 是具有可扩展性的,零部件都可以重塑。
4. 支持批处理和流处理
![](https://static001.infoq.cn/resource/image/c4/b3/c41e57c0068399aff813e97a5a13b3b3.png)
如果在 AIoT 行业,开发过程中,我们可能经常碰到两种数据:
摄像头等传感器的实时报警信息
不同数据库的数据,进行一起处理
Beam 对这两种数据是同时支持的。
5. 支持多语言开发
![](https://static001.infoq.cn/resource/image/8b/25/8b5c74abc3845a0b046cafedb47e4925.png)
此外 Beam 支持 java,Python,go,Scala 语言,大家可以利用自己擅长的语言开发自己的 Beam 程序。
6. DAG 高度抽象
![](https://static001.infoq.cn/resource/image/31/42/311336ef68df7e74213e358b32b65142.png)
DAG,中文名“有向无环图”。“有向”指的是有方向,准确的说应该是同一个方向,“无环”则指够不成闭环。如果做一些去重、统计、分组等,开发人员不用再做 Map Reduce ,Beam 已经封装提供了相应的高级操作。
Apache Beam 的架构设计
我们接下来看一下 Beam 架构是怎样的:
1. Apache Beam 的总体架构
![](https://static001.infoq.cn/resource/image/a7/9e/a7236545cb26d67f7563031a30c3f79e.png)
Apache Beam 的总体架构是这样的,上面有各种语言,编写了不同的 SDKs,Beam 通过连接这些 SDK 的数据源进行管道的逻辑操作,最后发布到大数据引擎上去执行。需要注意的是,Local 虽然是一个 runner 但是不能用于生产上,它是用于调试/开发使用的。
2. Apache Beam 的部署流程图
![](https://static001.infoq.cn/resource/image/1d/4c/1da669649e64caf6840a2faa31252a4c.png)
让我们一起看下 Apache Beam 总体的部署流程。首先我们去构建这个 Beam jobAPI .jar 通过 job 服务器以及设置大数据执行平台,最后提交 flink 或 spark 的任务集群去执行任务。
Apache Beam 的核心组件刨析
1. SDks+Pipeline+Runners (前后端分离)
![](https://static001.infoq.cn/resource/image/7c/a2/7c4d8d89d01dc6a6b1902c7f60333aa2.png)
如上图,前端是不同语言的 SDKs,读取数据写入管道, 最后用这些大数据引擎去运行。可以发现完整的 beam 程序由 SDks+Pipeline+Runners 构成的。
2. 什么是 SDK?
![](https://static001.infoq.cn/resource/image/3e/ba/3e3c5099eec151579251d1b79f77f8ba.png)
什么是 SDK,就是一个编写 beam 管道构成的一部分,一个客户端或一个类库组件也可以,最后提交到大数据运行平台上。
3. Beam 版本和 Kafka-clients 依赖情况表
![](https://static001.infoq.cn/resource/image/cb/a4/cba34beb98cb039046c8d97da65d75a4.png)
我们以 kafka 为例,看一下 Kafka-client 对版本的依赖情况,从图中可以看出 beam 2.6.0 版本的 api 改变基本是稳定的。当然,现在用的比较多的 2.4、2.5 版本。吐个槽,2.6 版本之前的兼容性问题,上个版本还有这个类或方法,下一个版本就没有了,兼容性不是很好。
4. SDK beam-sdks-java-io-kafka 读取源码剖析
![](https://static001.infoq.cn/resource/image/33/57/337844b2b7bc6cd58244bd253f3d9057.png)
![](https://static001.infoq.cn/resource/image/ba/19/ba9f9a20a96dc427f9f5eec631f23219.png)
![](https://static001.infoq.cn/resource/image/4a/26/4a9908d39a7c0b983f004444c48e2226.png)
![](https://static001.infoq.cn/resource/image/5c/89/5cffc4a6406531f693b7ccb4e45d1189.png)
![](https://static001.infoq.cn/resource/image/92/4e/92d743289aaf3ae55c06d6ad5edfe54e.png)
① 指定 KafkaIO 的模型,从源码中不难看出这个地方的 KafkaIO<K,V> 类型是 Long 和 String 类型,也可以换成其他类型。
② 设置 Kafka 集群的集群地址。
③ 设置 Kafka 的主题类型,源码中使用了单个主题类型,如果是多个主题类型则用 withTopics(List)
方法进行设置。设置情况基本跟 Kafka 原生是一样的。
④ 设置序列化类型。Apache Beam KafkaIO 在序列化的时候做了很大的简化,例如原生 Kafka 可能要通过 Properties 类去设置 ,还要加上很长一段 jar 包的名字。
Beam KafkaIO 的写法:
原生 Kafka 的设置:
⑤ 设置 Kafka 的消费者属性,这个地方还可以设置其他的属性。源码中是针对消费分组进行设置。
⑥ 设置 Kafka 吞吐量的时间戳,可以是默认的,也可以自定义。
⑦ 相当于 Kafka 中 “isolation.level” , “read_committed”,指定 KafkaConsumer 只应读取非事务性消息,或从其输入主题中提交事务性消息。流处理应用程序通常在多个读取处理写入阶段处理其数据,每个阶段使用前一阶段的输出作为其输入。通过指定 read_committed
模式,我们可以在所有阶段完成一次处理。针对 “Exactly-once” 语义,支持 Kafka 0.11 版本。
⑧ 设置 Kafka 是否自动提交属性 “AUTO_COMMIT”,默认为自动提交,使用 Beam 的方法来设置。
⑨ 设置是否返回 Kafka 的其他数据,例如 offset 信息和分区信息,不用可以去掉。
⑩ 设置只返回 values 值,不用返回 key。例如 PCollection,而不是 PCollection<Long,String>。
在写入 Kafka 时完全一次性地提供语义,这使得应用程序能够在 Beam 管道中的一次性语义之上提供端到端的一次性保证。它确保写入接收器的记录仅在 Kafka 上提交一次,即使在管道执行期间重试某些处理也是如此。重试通常在应用程序重新启动时发生(如在故障恢复中)或者在重新分配任务时(如在自动缩放事件中)。Flink runner 通常为流水线的结果提供精确一次的语义,但不提供变换中用户代码的副作用。如果诸如 Kafka 接收器之类的转换写入外部系统,则这些写入可能会多次发生。
在此处启用 EOS 时,接收器转换将兼容的 Beam Runners 中的检查点语义与 Kafka 中的事务联系起来,以确保只写入一次记录。由于实现依赖于 runners checkpoint 语义,因此并非所有 runners 都兼容。Beam 中 FlinkRunner 针对 Kafka 0.11+ 版本才支持,然而 Dataflow runner 和 Spark runner 如果操作 kafkaIO 是完全支持的。
关于性能的注意事项:
“Exactly-once” 在接收初始消息的时候,除了将原来的数据进行格式化转换外,还经历了 2 个序列化 - 反序列化循环。根据序列化的数量和成本,CPU 可能会涨的很明显。通过写入二进制格式数据(即在写入 Kafka 接收器之前将数据序列化为二进制数据)可以降低 CPU 成本。
5. Pipeline
![](https://static001.infoq.cn/resource/image/be/cf/be478a0f972cb3cfe9e5f2181d4ba5cf.png)
您输入的数据存储在哪里?
首先要确定你要构造几条数据源,在 Beam 可以构建多条,构建之前可以选择自己的 SDK 的 IO。
您的数据类型是什么样的?
Beam 提供的是键值对的数据类型,你的数据可能是日志文本,格式化设备事件,数据库的行,所以在 PCollection 就应该确定数据集的类型。
您想怎么去处理数据?
对数据进行转换,过滤处理,窗口计算,SQL 处理等。在管道中提供了通用的 ParDo 转换类,算子计算以及 BeamSQL 等操作。
您打算把数据最后输出到哪里去?
在管道末尾进行 Write 操作,把数据最后写入您自己想存放或最后流向的地方。
![](https://static001.infoq.cn/resource/image/11/8f/11100ad7969371c5cd10e0dffc97568f.png)
重要的是要理解变换不消耗 PCollections;相反,他们会考虑 a 的每个元素 PCollection 并创建一个新 PCollection 的输出。这样,您可以对不同的元素执行不同的操作 PCollection。这里是出现了两条管,例如输入 AR,AI,VAR,BT,BMP。
![](https://static001.infoq.cn/resource/image/4b/7b/4ba39a8f3e136ab2edeab8427340c17b.png)
例如不同的数据源,有数据库,文件,以及缓存等输入进行合并。
![](https://static001.infoq.cn/resource/image/77/38/77ce7d0164ee36d7476522e31161f738.png)
一种是收费的拓蓝公司出品叫 Talend Big Data Studio,有没有免费的呢?
![](https://static001.infoq.cn/resource/image/93/73/9308b58d47cf65802f1232fa28567273.png)
有的,它叫 kettle-beam。例如不同的数据源,有数据库,文件,以及缓存等输入进行合并。大家可以去 github 去看一下插件相应的安装及使用说明。从图中可以看出大部分 beam 的输入输出现在都是支持的。
https://github.com/mattcasters/kettle-beam
6. Runners
![](https://static001.infoq.cn/resource/image/f3/2f/f316e4320767a2a292777051833b2f2f.png)
我们在看一下运行平台,这是运行平台支持度的截图。例如不同的数据源,有数据库,文件,以及缓存等输入进行合并。
Runners 在 Beam Model 模型中有 4 个支持的维度:
What,如何对数据进行计算?例如,机器学习中训练学习模型可以用 Sum 或者 Join 等。在 Beam SDK 中由 Pipeline 中的操作符指定。
Where,数据在什么范围中计算?例如,基于 Process-Time 的时间窗口、基于 Event-Time 的时间窗口、滑动窗口等等。在 Beam SDK 中由 Pipeline 的窗口指定。
When,何时输出计算结果?例如,在 1 小时的 Event-Time 时间窗口中,每隔 1 分钟将当前窗口计算结果输出。在 Beam SDK 中由 Pipeline 的 Watermark 和触发器指定。
How,迟到数据如何处理?例如,将迟到数据计算增量结果输出,或是将迟到数据计算结果和窗口内数据计算结果合并成全量结果输出。在 Beam SDK 中由 Accumulation 指定。
① What
![](https://static001.infoq.cn/resource/image/bb/50/bb36d7a81410099e45fce6f28ea86050.png)
对数据如果处理,计算。分组的矩阵图,提到这里说一下,这些运行平台已经集成到 Beam,只是没有更新到官方首页而已。以及或者是官方不打算主推的,就没有写上去。
② Where
![](https://static001.infoq.cn/resource/image/6d/df/6d7fd59639b1315b9a2f09fcc901a9df.png)
窗口处理矩阵能力图,大家从图中可以看出很多都是全部支持的。
③ When
![](https://static001.infoq.cn/resource/image/d4/07/d4f1f37e2c7c67a7433e27901f8d4d07.png)
对于事件处理,流计算引擎 Apache Flink,Google Cloud ,Dataflow 以及 Jstorm 都支持性比较好。
④ How
![](https://static001.infoq.cn/resource/image/74/5b/74605fa489bfa980b1bf59084fd2085b.png)
最后是对迟到数据的数据处理能力矩阵图。
7. FlinkRunner Beam
![](https://static001.infoq.cn/resource/image/41/e9/41bbfb765135102fc10aa32eb04263e9.png)
我们以最近两年最火的 Apache Flink 为例子,帮大家解析一下 beam 集成情况。大家可以从图中看出,flink 集成情况。
![](https://static001.infoq.cn/resource/image/f4/c9/f409c3cb03e284e4723a71735876fcc9.png)
然后看一下,FlinkRunner 具体解析了哪些参数,以及代码中怎样设置。
8. Beam SQL
![](https://static001.infoq.cn/resource/image/4b/64/4b00665ddc54e681e198841f366e3c64.png)
Apache Calcite 是一种保准 SQL 的解析器,用于大数据处理和一些流增强功能,基于它做 SQL 引擎的有很多,例如 spark,Cassandra,druid 和我们的 Beam。
![](https://static001.infoq.cn/resource/image/e8/c0/e86d77ea993c57c47a7ba49504bd22c0.png)
我们看一下 Beam SQL 的设计思路:首先是我们写的 SQL 语句,进行查询解析,验证来源的类型,数据格式,建一个执行计划,然后通过优化,设计计划规则或逻辑,封装在 Beam 管道中,进行编译器编译,最后提交 job 到运行平台执行。
![](https://static001.infoq.cn/resource/image/25/d2/25de0edee4732cacb13cbf272bfc47d2.png)
表中是 beam SQL 和 Calcite 的类型支持度,是把 Calcite 进行映射。
![](https://static001.infoq.cn/resource/image/1f/b0/1f620dd59e8b9ed64207ffd4c1e238b0.png)
Beam SQL 和 Apache Calcite 函数的支持度。里面有一些现在不支持的,需要大家做的时候多多关注,特别是架构师设计时候。
![](https://static001.infoq.cn/resource/image/78/b1/782911ea9b495f4aaa8d7c69178b69b1.png)
从图中可以看出,首先要设置好数据类型,在设置数据,最后填充到管道数据集,最后做 SQL 的操作。其实这样写还是不方便的。有没有很好的解决方式,有。大家继续往下看…
![](https://static001.infoq.cn/resource/image/0b/5f/0b97556260601e0be1548cfdb94b185f.png)
Beam SQL 的扩展。Beam SQL 的 CREATE EXTERNAL TABLE 语句注册一个映射到外部存储系统的虚拟表。对于某些存储系统,CREATE EXTERNAL TABLE 在写入发生之前不会创建物理表。物理表存在后,您可以使用访问表 SELECT,JOIN 和 INSERT INTO 语句。通过虚拟表,可以动态的操作数据,最后写入到数据库就可以了。这块可以做成视图抽象的。
Create 创建一个动态表,tableName 后面是列名。TYPE 是数据来源的类型,限制支持 bigquery,pubsub,kafka,text 等。Location 下面为表的数据类型配置, 这里以 kafka 为例。
AloT PB 级实时数据,怎么构建自己的“AI 微服务”?
在 AIoT 里面,实时性数据比较大,例如视频分析,视频挖掘,合规检测,语音分析等等。130W 路的摄像头每秒写入 300 多 G 的视频,一天就是 25PB,有人说可以晚上用批方式上数据,其实 AIoT 场景跟其他的场景是不一样的,例如做智能儿童手表,我们晚上上报数据的频度可以变低,白天儿童上学放学路上可以正常上报数据。AIoT 场景下摄像头 24 小时监控的,并且宽带主杆线都换成千兆光线,其实也支持不了每秒 300G 的实时写入。我们是怎么处理呢?
![](https://static001.infoq.cn/resource/image/47/bc/4709f7c3532845616778a769633a14bc.png)
首先在设计架构方案的时候,相信很多架构师都会这样想,不想第一个去吃螃蟹,因为稳定性,安全性,及不确定性原因会导致整个项目的成败。那我们看一下 Beam 有哪些大厂在使用。
知道他们使用 Beam ,咱们了解一下他们用 Beam 做了什么?例如:
使用 Apache Beam 进行大规模流分析
使用 Apache Beam 运行定量分析
使用 Apache Beam 构建大数据管道
从迁移到 Apache Beam 进行地理数据可视化
使用 Apache Beam & tf.Transform 对 TensorFlow 管道进行预处理
卫星图像的土地利用分类
智慧城市大数据集成
平安城市及质量实时风控
电商平台双十一活动实时数据处理
国外的可以从官方网站上找到案例的原文,国内可以从新闻或者官方网站找到相应的案例。
在 AloT 场景下我们为什么会选择 Beam 呢?
数据源可以适配,因为平安城市,雪亮工程数据源千奇百怪。
能够进行数据多样处理,连接,过滤,合并,拆分。
具有清洗脏数据功能,例如警情去重误报警,合规检测等。
具有大数据集群虚拟化部署功能,可扩展性,伸缩性。
具有实时处理和离线处理能力。
1. 案列系统架构图
![](https://static001.infoq.cn/resource/image/bd/fa/bd86fbc1848251afbb401de5b567eefa.png)
这是案例的总架构图,底层是 Beam SDK,上层是抽象封装的输入输出组件,以及清洗组件,类型管理,第三方 SDK,在往上层是组件配置管理,及版本控制,最上层是 jar 可视化配置,以及 SQL 可视化,最后把 jar 通过运维一体化平台提交给执行引擎集群,当然这里有个解析器,是我们自己开发的。
2. 示例架构图
![](https://static001.infoq.cn/resource/image/1f/e7/1fc07bc946794fe558545d5a7696b1e7.png)
以下为示例架构图:
① 摄像头以及 AI 智能设备产生的报警以及抓取的信息上报到后端智能设备。
② 智能设备产生的 AI 分析结果进行通过网关集群进行传输,注意网关集群地方要做流控及雪崩控制。
③ 消息通过网关集群发送到消息中间件。注意:这边这个规则下发是针对前段的数据进行 ETL 清洗的清洗规则的下发。
④ Beam 集群接收下发规则的更新,并且根据规则进行数据清洗。
⑤ 对于文档性的数据我们实时存储到实时搜索引擎。
⑥ 需要复杂查询,统计以及报表的数据存储到 ClickHouse。
⑦ 进行 BI 套件的展示以及前端大屏幕的展示。
3. 示例代码
![](https://static001.infoq.cn/resource/image/a8/89/a8b2e40f56f0ce52ad65b27d8622d389.png)
核心示例代码,首先创建管道工厂,然后显示设置执行引擎,根据 SDKIO 进行读取 kafka 的消息。
![](https://static001.infoq.cn/resource/image/5d/1c/5dce255e747a9d490c7e0acf91112a1c.png)
序列化消息,写入 es 进行备份,因为 es 数据是 json 的写入的时候首先要考虑转换成 json 类型。这个地方我设置了一个编码,实体类的编码类型为 AvroCoder ,编码类型是每个管道都要设置的。
![](https://static001.infoq.cn/resource/image/3c/ac/3c8f135350ed5aec041917855a5621ac.png)
把 kafka 的数据转换成 row 类型,这里就是运用了管道设计中的流分支处理。
![](https://static001.infoq.cn/resource/image/01/23/01d36b57b7a213b64db8a32f8878e823.png)
最后一步是写入咱们的 clickhouse,大家可能对 clickhouse 不是很了解,这是俄罗斯的一家高科技公司研发的。查询速度非常快,比 Hive 快 279 倍,比 MySQL 快 801 倍的神器。
4. 示例效果展示
以下为写入 es 的效果。这个字段写入时候自动创建。
![](https://static001.infoq.cn/resource/image/be/df/be62d9afbad2cf843e21715d9a72fbdf.png)
![](https://static001.infoq.cn/resource/image/af/0a/af27161c346f5aeb6a73302722b6ec0a.png)
今天的分享就到这里,谢谢大家。
关于持续问题咨询:
Apache Beam 官方网站
Apache Beam 开源地址
https://github.com/apache/beam
Apache Beam Example 地址
https://github.com/xsm110/Apache-Beam-Example
user-subscribe@beam.apache.org
作者介绍:
张海涛,海康威视金融事业部架构师,国际注册云安全系统认证专家。目前负责全国金融行业 AI 大数据的基础架构工作,主导建设过云基础平台的架构设计及核心开发,并自研大数据组件获得过国家发明专利。专注安防及 AloT 云计算大数据方向,是 Apache Beam 中文社区发起人之一及 ClickHouse 开源社区的核心开发人员。
本文来自 DataFun 社区
原文链接:
评论