AICon上海|与字节、阿里、腾讯等企业共同探索Agent 时代的落地应用 了解详情
写点什么

通过 Java 来学习 Apache Beam

作者:Fabio Hiroki

  • 2022-06-28
  • 本文字数:5945 字

    阅读完需:约 20 分钟

通过Java来学习Apache Beam

在本文中,我们将介绍 Apache Beam,这是一个强大的批处理和流式处理开源项目,eBay 等大公司用它来集成流式处理管道,Mozilla 用它来在系统之间安全地移动数据。

概览

Apache Beam 是一种处理数据的编程模型,支持批处理和流式处理。


你可以使用它提供的 Java、Python 和 Go SDK 开发管道,然后选择运行管道的后端。

Apache Beam 的优势


Beam 的编程模型


  • 内置的 IO 连接器

  • Apache Beam 连接器可用于从几种类型的存储中轻松提取和加载数据。

  • 主要连接器类型有:

  • 基于文件的(例如 Apache Parquet、Apache Thrift);

  • 文件系统(例如 Hadoop、谷歌云存储、Amazon S3);

  • 消息传递(例如 Apache Kafka、Google Pub/Sub、Amazon SQS);

  • 数据库(例如 Apache Cassandra、Elastic Search、MongoDB)。

  • 作为一个 OSS 项目,对新连接器的支持在不断增长(例如 InfluxDB、Neo4J)。

  • 可移植性:

  • Beam 提供了几个运行管道的 Runner,你可以根据自己的场景选择最合适的,并避免供应商锁定。

  • 分布式处理后端,如 Apache Flink、Apache Spark 或 Google Cloud Dataflow 可以作为 Runner。

  • 分布式并行处理:

  • 默认情况下,数据集的每一项都是独立处理的,因此可以通过并行运行实现优化。

  • 开发人员不需要手动分配负载,因为 Beam 为它提供了一个抽象。

Beam 的编程模型

Beam 编程模型的关键概念:


  • PCollection:表示数据的集合,如从文本中提取的数字或单词数组。

  • PTransform:一个转换函数,接收并返回一个 PCollection,例如所有数字的和。

  • 管道:管理 PTransform 和 PCollection 之间的交互。

  • PipelineRunner:指定管道应该在哪里以及如何执行。

快速入门


一个基本的管道操作包括 3 个步骤:读取、处理和写入转换结果。这里的每一个步骤都是用 Beam 提供的 SDK 进行编程式定义的。


在本节中,我们将使用 Java SDK 创建管道。你可以创建一个本地应用程序(使用 Gradle 或 Maven 构建),也可以使用在线沙盒。示例将使用本地 Runner,因为这样使用 JUnit 断言验证结果会更容易些。

Java 本地依赖

  • beam-sdk-java-core:包含所有的 Beam 模型类。

  • beam-runners-direct-java:默认情况下 Beam SDK 将直接使用本地 Runner,也就是说管道将在本地机器上运行。

乘 2 操作

在第一个例子中,管道将接收到一个数字数组,并将每个元素乘以 2。


第一步是创建管道实例,它将接收输入数组并执行转换函数。因为我们使用 JUnit 运行 Beam,所以可以很容易地创建 TestPipeline 并将其作为测试类的一个字段。如果你更喜欢通过 main 方法来运行,需要设置管道配置参数


@Rulepublic final transient TestPipeline pipeline = TestPipeline.create();
复制代码


现在,我们可以创建作为管道输入的 PCollection。它是一个直接在内存中实例化的数组,但它也可以从支持 Beam 的任何地方读取。


PCollection<Integer> numbers =                pipeline.apply(Create.of(1, 2, 3, 4, 5));
复制代码


然后我们应用我们的转换函数,将每个元素乘以 2。


PCollection<Integer> output = numbers.apply(                MapElements.into(TypeDescriptors.integers())                      .via((Integer number) -> number * 2)      );
复制代码


为了验证结果,我们可以写一个断言。


PAssert.that(output)                .containsInAnyOrder(2, 4, 6, 8, 10);
复制代码


注意,结果不排序,因为 Beam 将每一个元素作为独立的项进行并行处理。


测试到这里就完成了,我们通过调用下面的方法运行管道:


pipeline.run();
复制代码

Reduce 操作

Reduce 操作将多个输入元素进行聚合,产生一个较小的集合,通常只包含一个元素。

MapReduce


现在我们来扩展上面的示例,将所有项乘以 2 后求和,产生一个 MapReduce 转换操作。


每一个 PCollection 转换都会产生一个新的 PCollection 实例,这意味着我们可以使用 apply 方法将转换链接起来。对于这个示例,将在每个元素乘以 2 后使用 Sum 操作:


PCollection<Integer> numbers =            pipeline.apply(Create.of(1, 2, 3, 4, 5));
PCollection<Integer> output = numbers .apply( MapElements.into(TypeDescriptors.integers()) .via((Integer number) -> number * 2)) .apply(Sum.integersGlobally());
PAssert.that(output) .containsInAnyOrder(30);
pipeline.run();
复制代码

FlatMap 操作

FlatMap 先对每个输入元素应用映射,返回一个新集合,从而产生一个集合的集合。然后再应用 Flat 操作将所有嵌套的集合合并,最终生成一个集合。


下一个示例将把字符串数组转换成包含唯一性单词的数组。


首先,我们声明将作为管道输入的单词列表:


final String[] WORDS_ARRAY = new String[] {          "hi bob", "hello alice", "hi sue"};
final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
复制代码


然后,我们使用上面的列表创建输入 PCollection:


PCollection<String> input = pipeline.apply(Create.of(WORDS));
复制代码


现在,我们进行 FlatMap 转换,它将拆分每个嵌套数组中的单词,并将结果合并成一个列表:


PCollection<String> output = input.apply(      FlatMapElements.into(TypeDescriptors.strings())            .via((String line) -> Arrays.asList(line.split(" "))));
PAssert.that(output) .containsInAnyOrder("hi", "bob", "hello", "alice", "hi", "sue");
pipeline.run();
复制代码

Group 操作

数据处理的一个常见的任务是根据特定的键进行聚合或计数。我们将计算上一个例子中每个单词出现的次数。


在有了扁平的字符串数组之后,我们可以链接另一个 PTransform:


PCollection<KV<String, Long>> output = input            .apply(                  FlatMapElements.into(TypeDescriptors.strings())                      .via((String line) -> Arrays.asList(line.split(" ")))            )            .apply(Count.<String>perElement());
复制代码

产生结果:

PAssert.that(output).containsInAnyOrder(       KV.of("hi", 2L),       KV.of("hello", 1L),       KV.of("alice", 1L),       KV.of("sue", 1L),       KV.of("bob", 1L));
复制代码

从文件中读取

Beam 的一个原则是可以从任何地方读取数据,所以我们来看看在实际当中如何使用文本文件作为数据源。


下面的示例将读取包含“An advanced unified programming model”文本的文件“words.txt”。然后转换函数将返回一个包含每一个单词的 PCollection。


PCollection<String> input =      pipeline.apply(TextIO.read().from("./src/main/resources/words.txt"));
PCollection<String> output = input.apply( FlatMapElements.into(TypeDescriptors.strings()) .via((String line) -> Arrays.asList(line.split(" "))));
PAssert.that(output) .containsInAnyOrder("An", "advanced", "unified", "programming", "model");
pipeline.run();
复制代码

将结果写入文件

从前面的输入示例可以看到,Beam 提供了多个内置的输出连接器。在下面的例子中,我们将计算文本文件“words.txt”(只包含一个句子“An advanced unified programming model")中出现的每个单词的数量,输出结果将写入一个文本文件。


PCollection<String> input =      pipeline.apply(TextIO.read().from("./src/main/resources/words.txt"));
PCollection<KV<String, Long>> output = input .apply( FlatMapElements.into(TypeDescriptors.strings()) .via((String line) -> Arrays.asList(line.split(" "))) ) .apply(Count.<String>perElement());;
PAssert.that(output) .containsInAnyOrder( KV.of("An", 1L), KV.of("advanced", 1L), KV.of("unified", 1L), KV.of("programming", 1L), KV.of("model", 1L) );
output .apply( MapElements.into(TypeDescriptors.strings()) .via((KV<String, Long> kv) -> kv.getKey() + " " + kv.getValue())) .apply(TextIO.write().to("./src/main/resources/wordscount"));
pipeline.run();
复制代码


默认情况下,文件写入也针对并行性进行了优化,这意味着 Beam 将决定保存结果的最佳分片(文件)数量。这些文件位于 src/main/resources 文件夹中,文件名包含了前缀“wordcount”、碎片序号和碎片总数。


在我的笔记本电脑上运行它生成了 4 个分片:


第一个分片(文件名:wordscount-00001-of-00003):


An 1advanced 1
复制代码


第二个分片(文件名:wordscount-00002-of-00003):


unified 1model 1
复制代码


第三个分片(文件名:wordscount-00003-of-00003):


programming 1
复制代码


最后一个分片是空的,因为所有的单词都已经被处理完了。

扩展 Beam

我们可以通过编写自定义转换函数来扩展 Beam。自定义转换器将提高代码的可维护性,并消除重复工作。


基本上,我们需要创建一个 PTransform 的子类,将输入和输出的类型声明为 Java 泛型。然后重写 expand 方法,加入我们的逻辑,它将接受单个字符串并返回包含每个单词的 PCollection。


public class WordsFileParser extends PTransform<PCollection<String>, PCollection<String>> {
@Override public PCollection<String> expand(PCollection<String> input) { return input .apply(FlatMapElements.into(TypeDescriptors.strings()) .via((String line) -> Arrays.asList(line.split(" "))) ); } }
复制代码


用 WordsFileParser 来重构测试场景就变成了:


public class FileIOTest {
@Rule public final transient TestPipeline pipeline = TestPipeline.create();
@Test public void testReadInputFromFile() { PCollection<String> input = pipeline.apply(TextIO.read().from("./src/main/resources/words.txt"));
PCollection<String> output = input.apply( new WordsFileParser() );
PAssert.that(output) .containsInAnyOrder("An", "advanced", "unified", "programming", "model");
pipeline.run(); }
@Test public void testWriteOutputToFile() { PCollection<String> input = pipeline.apply(TextIO.read().from("./src/main/resources/words.txt"));
PCollection<KV<String, Long>> output = input .apply(new WordsFileParser()) .apply(Count.<String>perElement());
PAssert.that(output) .containsInAnyOrder( KV.of("An", 1L), KV.of("advanced", 1L), KV.of("unified", 1L), KV.of("programming", 1L), KV.of("model", 1L) );
output .apply( MapElements.into(TypeDescriptors.strings()) .via((KV<String, Long> kv) -> kv.getKey() + " " + kv.getValue())) .apply(TextIO.write().to ("./src/main/resources/wordscount"));
pipeline.run(); }}
复制代码


结果变成了更清晰和更模块化的管道。

时间窗口


Beam 的时间窗口


流式处理中一个常见的问题是将传入的数据按照一定的时间间隔进行分组,特别是在处理大量数据时。在这种情况下,分析每小时或每天的聚合数据比分析数据集的每个元素更有用。


在下面的例子中,我们将假设我们身处金融科技领域,我们正在接收包含金额和交易时间的事件,我们希望获取每天的交易总额。


Beam 提供了一种用时间戳来装饰每个 PCollection 元素的方法。我们可以通过这种方式创建一个代表 5 笔交易的 PCollection:


  • 金额 10 和 20 是在 2022 年 02 月 01 日转账的;

  • 金额 30、40 和 50 是在 2022 年 02 月 05 日转账的。


PCollection<Integer> transactions =      pipeline.apply(            Create.timestamped(                  TimestampedValue.of(10, Instant.parse("2022-02-01T00:00:00+00:00")),                  TimestampedValue.of(20, Instant.parse("2022-02-01T00:00:00+00:00")),                  TimestampedValue.of(30, Instant.parse("2022-02-05T00:00:00+00:00")),                  TimestampedValue.of(40, Instant.parse("2022-02-05T00:00:00+00:00")),                  TimestampedValue.of(50, Instant.parse("2022-02-05T00:00:00+00:00"))               )       );
复制代码


接下来,我们将应用两个转换函数:


  • 使用一天的时间窗口对交易进行分组;

  • 把每组的数量加起来。


PCollection<Integer> output =      Transactions             .apply(Window.into(FixedWindows.of(Duration.standardDays(1))))             .apply(Combine.globally(Sum.ofIntegers()).withoutDefaults());
复制代码


在第一个时间窗口(2022-02-01)中,预计总金额为 30(10+20),而在第二个窗口(2022-02-05)中,我们应该看到总金额为 120(30+40+50)。


PAssert.that(output)                   .inWindow(new IntervalWindow(                       Instant.parse("2022-02-01T00:00:00+00:00"),                       Instant.parse("2022-02-02T00:00:00+00:00")))                 .containsInAnyOrder(30);
PAssert.that(output) .inWindow(new IntervalWindow( Instant.parse("2022-02-05T00:00:00+00:00"), Instant.parse("2022-02-06T00:00:00+00:00"))) .containsInAnyOrder(120);
复制代码


每个 IntervalWindow 实例需要匹配所选时间段的确切开始和结束时间戳,因此所选时间必须是“00:00:00”。

总结

Beam 是一个强大的经过实战检验的数据框架,支持批处理和流式处理。我们使用 Java SDK 进行了 Map、Reduce、Group 和时间窗口等操作。


Beam 非常适合那些执行并行任务的开发人员,可以简化大规模数据处理的机制。


它的连接器、SDK 和对各种 Runner 的支持为我们带来了灵活性,你只要选择一个原生 Runner,如 Google Cloud Dataflow,就可以实现计算资源的自动化管理。


作者简介


Fabio Hiroki 是一位在 Mollie 公司从事金融服务的软件工程师。


原文链接


Introduction to Apache Beam Using Java

2022-06-28 09:015335

评论

发布
暂无评论
发现更多内容

世界读书日:我想推荐这几本书

宇宙之一粟

书籍推荐 书单 4月月更

k8s client-go源码分析 informer源码分析(1)-概要分析

良凯尔

Kubernetes 容器 云原生 Client-go

初探 Lambda Powertools TypeScript

亚马逊云科技 (Amazon Web Services)

typescript Serverless Lambda AWS

我是如何用 Amazon Serverless 创建一个门铃的

亚马逊云科技 (Amazon Web Services)

Serverless Lambda AWS showdev

自动化的艺术

俞凡

架构 大厂实践 PayPal

kotlin 如何解决 java 开发痛点,让程序员 happier

爱好编程进阶

Java 面试 后端开发

krpano全景之vtour文件夹和tour

爱好编程进阶

Java 面试 后端开发

顶级元宇宙游戏Plato Farm,近期动作不断利好频频

小哈区块

2021年秋招,薪资排行NO

爱好编程进阶

Java 面试 后端开发

AtomicIntegerArray源码分析与感悟

爱好编程进阶

Java 面试 后端开发

Java语言特点

爱好编程进阶

Java 面试 后端开发

采用百度飞桨EasyDL完成指定目标识别

DS小龙哥

4月月更

Plato Farm-以柏拉图为目标的农场元宇宙游戏

西柚子

22道Java Spring Boot高频面试题

爱好编程进阶

Java 面试 后端开发

Netty 核心源码解读 —— ServerBootstrap 篇

爱好编程进阶

Java 面试 后端开发

Java 线程池原理分析

爱好编程进阶

Java 面试 后端开发

Java单例模式实现,一次性学完整,面试加分项

爱好编程进阶

Java 面试 后端开发

Java岗大厂面试百日冲刺 - 日积月累,每日三题【Day39

爱好编程进阶

Java 面试 后端开发

Java 结合实例学会使用 静态代理、JDK动态代理、CGLIB动态代理

爱好编程进阶

Java 面试 后端开发

JVM+分布式+算法

爱好编程进阶

Java 面试 后端开发

在docker上编译openjdk8

程序员欣宸

Java JVM 4月月更

将新增和编辑的数据同步更新到列表

岛上码农

flutter ios开发 安卓开发 4月月更 跨平台开发

Choreographer全解析

爱好编程进阶

Java 面试 后端开发

解决方案架构师的小锦囊 - 架构图的 5 种类型

亚马逊云科技 (Amazon Web Services)

技术 职业 亚马逊云科技

Java泛型机制详解;这些你都知道吗?

爱好编程进阶

Java 面试 后端开发

Netty学习之旅------高仿Dubbo服务调用模型、私有协议实现、编码解码器使用实践

爱好编程进阶

Java 面试 后端开发

MySQL-InnoDB-事务

爱好编程进阶

Java 面试 后端开发

redis优化系列(二)Redis主从原理、主从常用配置

乌龟哥哥

4月月更

[Day23]-[数据结构]手写LRU

方勇(gopher)

LeetCode LRU 数据结构算法

解锁OpenHarmony技术日!年度盛会,即将揭幕!

OpenHarmony

大会 OpenHarmony

通过Java来学习Apache Beam_语言 & 开发_InfoQ精选文章