写点什么

实时计算框架 Flink 在教育行业的应用实践(上)

  • 2019-11-07
  • 本文字数:3176 字

    阅读完需:约 10 分钟

实时计算框架 Flink 在教育行业的应用实践(上)

如今,越来越多的业务场景要求 OLTP 系统能及时得到业务数据计算、分析后的结果,这就需要实时的流式计算如 Flink 等来保障。例如,在 TB 级别数据量的数据库中,通过 SQL 语句或相关 API 直接对原始数据进行大规模关联、聚合操作,是无法做到在极短的时间内通过接口反馈到前端进行展示的。若想实现大规模数据的“即席查询”,就须用实时计算框架构建实时数仓来实现。


本文通过一个教育行业的应用案例,剖析业务系统对实时计算的需求场景,并分析了 Flink 和 Spark 两种实现方式的异同,最后通过运用 UCloud UFlink 产品中封装的 SQL 模块,来加速开发效率,更快地完成需求。

1.1 业务场景简述

在这个 K12 教育的业务系统中,学生不仅局限于纸质的练习册进行练习,还可以通过各类移动终端进行练习。基于移动终端,可以更方便地收集学生的学习数据,然后通过大数据分析,量化学习状态,快速定位薄弱知识点,进行查缺补漏。


在这套业务系统中,学生在手机 App 中对老师布置的作业进行答题训练,每次答题训练提交的数据格式如下表所示:



例如,传入到后台的单条答题记录数据格式如下:


{  "student_id": "学生ID_16",  "textbook_id": "教材ID_1",  "grade_id": "年级ID_1",  "subject_id": "科目ID_2_语文",  "chapter_id": "章节ID_chapter_2",  "question_id": "题目ID_100",  "score": 2,  "answer_time": "2019-09-11 12:44:01",  "ts": "Sep 11, 2019 12:44:01 PM"}
复制代码


然后,基于上述实时流入的数据,需要实现如下的分析任务:


  • 实时统计每个题目被作答频次

  • 按照年级实时统计题目被作答频次

  • 按照科目实时统计每个科目下题目的作答频次

1.2 技术方案选型

针对上述几个需求点,设计了如下的方案。首先会将数据实时发送到 Kafka 中,然后再通过实时计算框架从 Kafka 中读取数据,并进行分析计算,最后将计算结果重新输出到 Kafka 另外的主题中,以方便下游框架使用聚合好的结果。


下游框架从 Kafka 中拿到聚合好的数据,并实时录入到 OLTP 的业务库中(例如:MySQL、UDW、HBase、ES 等),以便于接口将想要的结果实时反馈给前端。


中间的实时计算框架,则在 Flink 和 Spark 中选择。2018 年 08 月 08 日,Flink 1.6.0 推出,支持状态过期管理(FLINK-9510, FLINK-9938)、支持 RocksDB、在 SQL 客户端中支持 UDXF 函数,大大加强了 SQL 处理功能,同时还支持 DML 语句、支持基于多种时间类型的事件处理、Kafka Table Sink 等功能。随后推出的 Flink 1.6.x 系列版本中,进行了大量优化。这些使得 Flink 成为一个很好的选择。


早先 Spark 要解决此类需求,是通过 Spark Streaming 组件实现。为此需要先生成 RDD,然后通过 RDD 算子进行分析,或者将 RDD 转换为 DataSet\DataFrame、创建临时视图,并通过 SQL 语法或者 DSL 语法进行分析。相比之下显得不够便捷和高效。后来 Spark 2.0.0 新增了 Structured Streaming 组件,具有了更快的流式处理能力,可达到和 Flink 接近的效果。


架构如下图所示:



本篇将省略下游框架的操作,重点介绍 Flink 框架进行任务计算的过程(虚线框中的内容),并简述 Spark 的实现方法,便于读者理解其异同。

1.3 实时计算在学情分析系统中的具体实现

1.3.1 Flink 实践方案

1. 发送数据到 Kafka

后台服务通过 Flume 或后台接口触发的方式调用 Kafka 生产者 API,实时将数据发送到 Kafka 指定主题中。


例如发送数据如下所示:


{"student_id":"学生ID_16","textbook_id":"教材ID_1","grade_id":"年级ID_1","subject_id":"科目ID_2_语文","chapter_id":"章节ID_chapter_2","question_id":"题目ID_100","score":2,"answer_time":"2019-09-11 12:44:01","ts":"Sep 11, 2019 12:44:01 PM"}………
复制代码


提示:此处暂且忽略在 Kafka 集群中创建 Topic 的操作。

2. 编写 Flink 任务分析代码

使用 Flink 处理上述需求,需要将实时数据转换为 DataStream 实例,并通过 DataStream 算子进行任务分析,另外,如果想使用 SQL 语法或者 DSL 语法进行任务分析,则需要将 DataStream 转换为 Table 实例,并注册临时视图。


(1)构建 Flink env


env(StreamExecutionEnvironment) 是 Flink 当前上下文对象,用于后续生成 DataStream。代码如下所示:


val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(3)
复制代码


(2)从 Kafka 读取答题数据


在 Flink 中读取 Kafka 数据需要指定 KafkaSource,代码如下所示:


val props = new Properties()props.setProperty("bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092")props.setProperty("group.id", "group_consumer_learning_test01")
val flinkKafkaSource = new FlinkKafkaConsumer011[]("test_topic_learning_1", new SimpleStringSchema(), props)val eventStream = env.addSource[](flinkKafkaSource)
复制代码


(3)进行 JSON 解析


这里通过 map 算子实现 JSON 解析,代码示例如下:


val answerDS = eventStream.map(s => {  val gson = new Gson()  val answer = gson.fromJson(s, classOf[Answer])  answer})
复制代码


(4)注册临时视图


创建临时视图的目的,是为了在稍后可以基于 SQL 语法来进行数据分析,降低开发工作量。需要先获取 TableEnv 实例,再将 DataStream 实例转换为 Table 实例,最后将其注册为临时视图。代码如下所示:


val tableEnv = StreamTableEnvironment.create(env)val table = tableEnv.fromDataStream(answerDS)tableEnv.registerTable("t_answer", table)
复制代码


(5)进行任务分析


接下来,便可以通过 SQL 语句来进行数据分析任务了,3 个需求对应的分析代码如下所示:


//实时:统计题目被作答频次val result1 = tableEnv.sqlQuery(  """SELECT    |  question_id, COUNT(1) AS frequency    |FROM    |  t_answer    |GROUP BY    |  question_id  """.stripMargin)
//实时:按照年级统计每个题目被作答的频次val result2 = tableEnv.sqlQuery( """SELECT | grade_id, COUNT(1) AS frequency |FROM | t_answer |GROUP BY | grade_id """.stripMargin)
//实时:统计不同科目下,每个题目被作答的频次val result3 = tableEnv.sqlQuery( """SELECT | subject_id, question_id, COUNT(1) AS frequency |FROM | t_answer |GROUP BY | subject_id, question_id """.stripMargin)
复制代码


此时得到的 result1、result2、result3 均为 Table 实例。


(6)实时输出分析结果


接下来,将不同需求的统计结果分别输出到不同的 Kafka 主题中即可。


在 Flink 中,输出数据之前,需要先将 Table 实例转换为 DataStream 实例,然后通过 addSink 算子添加 KafkaSink 即可。


因为涉及到聚合操作,Table 实例需要通过 RetractStream 来转换为 DataStream 实例。


该部分代码如下所示:


tableEnv.toRetractStream[](result1)  .filter(_._1)  .map(_._2)  .map(new Gson().toJson(_))  .addSink(new FlinkKafkaProducer011[String]("linux01:9092,linux02:9092,linux03:9092",    "test_topic_learning_2",    new SimpleStringSchema()))
tableEnv.toRetractStream[](result2) .filter(_._1) .map(_._2) .map(new Gson().toJson(_)) .addSink(new FlinkKafkaProducer011[String]("linux01:9092,linux02:9092,linux03:9092", "test_topic_learning_3", new SimpleStringSchema()))
tableEnv.toRetractStream[](result3) .filter(_._1) .map(_._2) .map(new Gson().toJson(_)) .addSink(new FlinkKafkaProducer011[String]("linux01:9092,linux02:9092,linux03:9092", "test_topic_learning_4", new SimpleStringSchema()))
复制代码


(7)执行分析计划


Flink 支持多流任务同时运行,执行分析计划代码如下所示:


env.execute("Flink StreamingAnalysis")
复制代码


至此,编译并运行项目后,即可看到实时的统计结果,如下图所示,从左至右的 3 个窗体中,分别代表对应需求的输出结果。



2019-11-07 23:441246

评论

发布
暂无评论
发现更多内容

重识Flutter 用于解决复杂滑动视窗问题的Slivers - part1

编程的平行世界

flutter 前端 an'droid

嘉为蓝鲸携手麒麟软件共建国产化一站式DevOps解决方案

嘉为蓝鲸

DevOps 自动化运维 嘉为蓝鲸

百度APP iOS端内存优化-原理篇

百度Geek说

ios 内存 企业号 2 月 PK 榜

我的快速调优线上服务器CPU利用率通用办法,震惊面试官

KINDLING

Java cpu 服务器 性能调优 ebpf

Apipost自动化测试功能概述

不想敲代码

自动化测试 测试自动化 apipost

Portraiture4最新简体中文li磨皮滤镜插件

茶色酒

Portraiture Portraiture4

极客时间运维进阶训练营第十三周作业

9527

ChatGPT集成之前,让我们复习一下即将过时的知识

newbe36524

搜索引擎; ChatGPT

支付对接常用的加密方式介绍以及java代码实现

京东科技开发者

Java 安全 哈希算法 加密算法 非对称加密算法

MASA Stack 1.0 发布会讲稿 —— 产品篇

MASA技术团队

.net 云原生 MASA MASA Blazor

OpenInfra峰会议程已公布,特色主题演讲,百余场专题会议等你来参与!

极客天地

100 行 shell 写个 Docker

vivo互联网技术

Docker Shell

【Rust学习】内存安全探秘:变量的所有权、引用与借用

京东科技开发者

spring rust slice 企业号 2 月 PK 榜 可变引用

前端图片最优化压缩方案

凉城

前端 图片处理 图片压缩 前端图片压缩

小游戏内测|小游戏脱离微信运行在其它 App

Onegun

微信小程序 小游戏 小游戏开发 微信小程序-游戏

Boom 3D免费电脑环绕音乐软件2023最新版下载

茶色酒

Boom 3D

有关TCP协议,这是我看过讲的最清楚的一篇文章了!

程序员小毕

程序员 TCP 程序人生 计算机网络 架构师

GaussDB(DWS)现网案例:collation报错

华为云开发者联盟

数据库 后端 华为云 企业号 2 月 PK 榜 华为云开发者联盟

嘉为科技蝉联信创工委会“卓越贡献成员”荣誉称号

嘉为蓝鲸

自动化运维 嘉为蓝鲸

《数字经济全景白皮书》出海篇:选对路径下好棋,热点出海行业如何实现增长?

易观分析

数字化 经济 出海

十年老程序员:再见了Navicat,以后多数据库管理就看这款SQL工具

雨果

sql navicat 数据库管理工具

高校数据库/SQL教学用什么样的SQL工具?管理更方便,学习更轻松

雨果

数据库管理工具 :MySQL 数据库 SQL开发工具

对线面试官:浅聊一下 Java 虚拟机栈?

王磊

java面试

第六周作业-拆分电商系统为微服务

不爱学习的程序猿

关于小程序游戏变现方式你还知道哪些?

没有用户名丶

前端开发 小程序游戏

兴业证券打造更“自然”的数字人,火山语音提供技术支持

科技热闻

成熟的自动化运维平台是怎样练成的?

嘉为蓝鲸

自动化运维 嘉为蓝鲸

【活动报名】re:Invent - AI 应用助力企业构建数字战略

亚马逊云科技 (Amazon Web Services)

Java高手速成 | 单例模式实现方式——枚举

TiAmo

单例模式 枚举 Java 开发

全新视觉,升维体验!全栈可观测中心嘉为鲸眼产品全新体验升级

嘉为蓝鲸

可观测 自动化运维 嘉为蓝鲸

状态机的概念与设计

timerring

FPGA

实时计算框架 Flink 在教育行业的应用实践(上)_文化 & 方法_刘景泽_InfoQ精选文章