写点什么

实时计算框架 Flink 在教育行业的应用实践(下)

  • 2019-11-07
  • 本文字数:3118 字

    阅读完需:约 10 分钟

实时计算框架 Flink 在教育行业的应用实践(下)

1.3.2 Spark 基于 Structured Streaming 的实现

Spark 发送数据到 Kafka,及最后的执行分析计划,与 Flink 无区别,不再展开。下面简述差异点。

1. 编写 Spark 任务分析代码

(1)构建 SparkSession


如果需要使用 Spark 的 Structured Streaming 组件,首先需要创建 SparkSession 实例,代码如下所示:


val sparkConf = new SparkConf()  .setAppName("StreamingAnalysis")  .set("spark.local.dir", "F:\\temp")  .set("spark.default.parallelism", "3")  .set("spark.sql.shuffle.partitions", "3")  .set("spark.executor.instances", "3")
val spark = SparkSession .builder .config(sparkConf) .getOrCreate()
复制代码


(2)从 Kafka 读取答题数据


接下来,从 Kafka 中实时读取答题数,并生成 streaming-DataSet 实例,代码如下所示:


val inputDataFrame1 = spark  .readStream  .format("kafka")  .option("kafka.bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092")  .option("subscribe", "test_topic_learning_1")  .load()
复制代码


(3)进行 JSON 解析


从 Kafka 读取到数据后,进行 JSON 解析,并封装到 Answer 实例中,代码如下所示:


val keyValueDataset1 = inputDataFrame1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
val answerDS = keyValueDataset1.map(t => { val gson = new Gson() val answer = gson.fromJson(t._2, classOf[Answer]) answer})
复制代码


其中 Answer 为 Scala 样例类,代码结构如下所示:


case class Answer(student_id: String,                  textbook_id: String,                  grade_id: String,                  subject_id: String,                  chapter_id: String,                  question_id: String,                  score: Int,                  answer_time: String,                  ts: Timestamp) extends Serializable
复制代码


(4)创建临时视图


创建临时视图代码如下所示:


answerDS.createTempView("t_answer")
复制代码


(5)进行任务分析


仅以需求 1(统计题目被作答频次)为例,编写代码如下所示:


  • 实时:统计题目被作答频次


//实时:统计题目被作答频次val result1 = spark.sql(  """SELECT    |  question_id, COUNT(1) AS frequency    |FROM    |  t_answer    |GROUP BY    |  question_id  """.stripMargin).toJSON
复制代码


(6)实时输出分析结果


仅以需求 1 为例,输出到 Kafka 的代码如下所示:


result1.writeStream .outputMode("update") .trigger(Trigger.ProcessingTime(0)) .format("kafka") .option("kafka.bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092") .option("topic", "test_topic_learning_2") .option("checkpointLocation", "./checkpoint_chapter11_1") .start()
复制代码

1.3.3 使用 UFlink SQL 加速开发

通过上文可以发现,无论基于 Flink 还是 Spark 通过编写代码实现数据分析任务时,都需要编写大量的代码,并且在生产集群上运行时,需要打包程序,然后提交打包后生成的 Jar 文件到集群上运行。


为了简化开发者的工作量,不少开发者开始致力于 SQL 模块的封装,希望能够实现只写 SQL 语句,就完成类似上述的需求。UFlink SQL 即是 UCloud 为简化计算模型、降低用户使用实时计算 UFlink 产品门槛而推出的一套符合 SQL 语义的开发套件。通过 UFlink SQL 模块可以快速完成这一工作,实践如下。

1. 创建 UKafka 集群

在 UCloud 控制台 UKafka 创建页,选择配置并设置相关阈值,创建 UKafka 集群。



提示:此处暂且忽略在 Kafka 集群中创建 Topic 的操作。

2. 创建 UFlink 集群

  • 在 UCloud 控制台 UFlink 创建页,选择配置和运行模式,创建一个 Flink 集群。



  • 完成创建


3. 编写 SQL 语句

完成之后,只需要在工作空间中创建如下形式的 SQL 语句,即可完成上述 3 个需求分析任务。


(1)创建数据源表


创建数据源表,本质上就是为 Flink 当前上下文环境执行 addSource 操作,SQL 语句如下:


CREATE TABLE t_answer(    student_id VARCHAR,    textbook_id VARCHAR,    grade_id VARCHAR,    subject_id VARCHAR,    chapter_id VARCHAR,    question_id VARCHAR,    score INT,    answer_time VARCHAR,    ts TIMESTAMP )WITH(    type ='kafka11',    bootstrapServers ='ukafka-mqacnjxk-kafka001:9092,ukafka-mqacnjxk-kafka002:9092,ukafka-mqacnjxk-kafka003:9092',    zookeeperQuorum ='ukafka-mqacnjxk-kafka001:2181/ukafka',    topic ='test_topic_learning_1',    groupId = 'group_consumer_learning_test01',    parallelism ='3' );
复制代码


(2)创建结果表


创建结果表,本质上就是为 Flink 当前上下文环境执行 addSink 操作,SQL 语句如下:


CREATE TABLE t_result1(    question_id VARCHAR,    frequency INT)WITH(    type ='kafka11',    bootstrapServers ='ukafka-mqacnjxk-kafka001:9092,ukafka-mqacnjxk-kafka002:9092,ukafka-mqacnjxk-kafka003:9092',    zookeeperQuorum ='ukafka-mqacnjxk-kafka001:2181/ukafka',    topic ='test_topic_learning_2',    parallelism ='3');
CREATE TABLE t_result2( grade_id VARCHAR, frequency INT)WITH( type ='kafka11', bootstrapServers ='ukafka-mqacnjxk-kafka001:9092,ukafka-mqacnjxk-kafka002:9092,ukafka-mqacnjxk-kafka003:9092', zookeeperQuorum ='ukafka-mqacnjxk-kafka001:2181/ukafka', topic ='test_topic_learning_3', parallelism ='3');
CREATE TABLE t_result3( subject_id VARCHAR, question_id VARCHAR, frequency INT)WITH( type ='kafka11', bootstrapServers ='ukafka-mqacnjxk-kafka001:9092,ukafka-mqacnjxk-kafka002:9092,ukafka-mqacnjxk-kafka003:9092', zookeeperQuorum ='ukafka-mqacnjxk-kafka001:2181/ukafka', topic ='test_topic_learning_4', parallelism ='3');
复制代码


(3)执行查询计划


最后,执行查询计划,并向结果表中插入查询结果,SQL 语句形式如下:


INSERT INTO    t_result1  SELECT      question_id, COUNT(1) AS frequency    FROM      t_answer    GROUP BY      question_id;
INSERT INTO t_result2 SELECT grade_id, COUNT(1) AS frequency FROM t_answer GROUP BY grade_id;
INSERT INTO t_result3 SELECT subject_id, question_id, COUNT(1) AS frequency FROM t_answer GROUP BY subject_id, question_id;
复制代码


SQL 语句编写完毕后,将其直接粘贴到 UFlink 前端页面对话框中,并提交任务,即可快速完成上述 3 个需求。如下图所示:


1.3.4. UFlink SQL 支持多流 JOIN

Flink、Spark 目前都支持多流 JOIN,即 stream-stream join,并且也都支持 Watermark 处理延迟数据,以上特性均可以在 SQL 中体现,得益于此,UFlink SQL 也同样支持纯 SQL 环境下进行 JOIN 操作、维表 JOIN 操作、自定义函数操作、JSON 数组解析、嵌套 JSON 解析等。更多细节欢迎大家参考 UFlink SQL 相关案例展示https://docs.ucloud.cn/analysis/uflink/dev/sql

1.4 总结

UFlink 基于 Apache Flink 构建,除 100%兼容开源外,也在不断推出 UFlink SQL 等模块,从而提高开发效率,降低使用门槛,在性能、可靠性、易用性上为用户创造价值。 今年 8 月新推出的 Flink 1.9.0,大规模变更了 Flink 架构,能够更好地处理批、流任务,同时引入全新的 SQL 类型系统和更强大的 SQL 式任务编程。UFlink 预计将于 10 月底支持 Flink 1.9.0,敬请期待。


本文转载自公众号 UCloud 技术(ID:ucloud_tech)。


原文链接:


https://mp.weixin.qq.com/s/JFcANUK_Vfa7ZMXnn7sruQ


2019-11-07 23:44901

评论

发布
暂无评论
发现更多内容

GreptimeDB 设计原则 — 云原生时序数据库,解决海量数据管理挑战

Greptime 格睿科技

数据库 分布式数据库 时序数据库 云原生数据库

屏幕调节亮度:Lunar pro 最新激活版下载

真大的脸盆

Mac Mac 软件 屏幕亮度调节

小米基于 Flink 的实时数仓建设实践

Apache Flink

大数据 flink 实时计算

浅谈EOS区块链性能测试

BSN研习社

通过FP&A实践,释放企业深度价值

智达方通

全面预算管理 财务规划和分析 FP&A

软件测试/测试开发丨Pytest参数化用例学习笔记

测试人

程序员 软件测试 自动化测试 测试开发 pytest

人脸识别图像技术的发展与挑战

数据堂

数据隐私为先:EMQX Cloud BYOC 架构解析

EMQ映云科技

物联网 云服务 mqtt

软件测试/测试开发丨Pytest测试框架学习笔记

测试人

程序员 软件测试 pytest

推进产业发展健全服务体系,中国信通院数字员工评测工作正式启动

王吉伟频道

RPA 机器人流程自动化 信通院 数字员工 数字员工评测

天翼云胡志强:依靠科技创新驱动高质量发展之路

说山水

Amazon CodeWhisperer 初体验

Coder9527

全国信安标委“标准周”在昆明召开,腾讯安全受邀分享标准实践经验

说山水

官宣!Databend 和 XSKY星辰天合达成合作

Databend

数据结构校验得心应手:Apifox 最佳实践

Liam

程序员 开发 Apifox API 接口工具

又双叒叕种草了新家装风格?AI帮你家居换装

华为云开发者联盟

人工智能 华为云 华为云开发者联盟 企业号 6 月 PK 榜

「焱融科技」获中关村国际前沿科技创新大赛·大数据与云计算领域 TOP10

焱融科技

高性能 #文件存储 #分布式存储

2个场景实例讲解GaussDB(DWS)基表统计信息估算不准的处理方案

华为云开发者联盟

数据库 后端 华为云 华为云开发者联盟 企业号 6 月 PK 榜

数字赋农:数字农业新时代,致富之路宽又阔!

加入高科技仿生人

低代码 智慧农业 数字赋能 科技兴农

羽山数据SaaS平台新增供应商API自动上架功能

羽山数据

自动 自主研发 上架

Spring Boot 启动注解分析

江南一点雨

Java spring springboot

5月《中国数据库行业分析报告》正式发布,首发时序、实时数据库两大【全球产业图谱】

墨天轮

数据库 tdengine 时序数据库 国产数据库 实时数据库

中移链资源管理介绍

BSN研习社

执行计划缓存,Prepared Statement性能跃升的秘密

华为云开发者联盟

数据库 后端 华为云 华为云开发者联盟 企业号 6 月 PK 榜

海汽集团:业财共享服务中心建设推进集团数字治理

用友BIP

财务共享

巨梦征文 | 2023年第一期征文大赛| 报名请评论本文章

巨梦科技

可持续发展的企业数智化底座究竟是什么样的?

用友BIP

白皮书 数智底座 数智平台 数智平台白皮书

圣邦股份:品类持续深挖,高端加速推进,模拟龙头稳健发展

华秋电子

分享几个索引创建的小 Tips

江南一点雨

MySQL

LeetCode:2665. 计数器 II,闭包详解

Lee Chen

LeetCode

又裁员25%?!金三银四好像消失了

引迈信息

程序员 面试 低代码 金三银四

实时计算框架 Flink 在教育行业的应用实践(下)_文化 & 方法_刘景泽_InfoQ精选文章