写点什么

实时计算框架 Flink 在教育行业的应用实践(下)

  • 2019-11-07
  • 本文字数:3118 字

    阅读完需:约 10 分钟

实时计算框架 Flink 在教育行业的应用实践(下)

1.3.2 Spark 基于 Structured Streaming 的实现

Spark 发送数据到 Kafka,及最后的执行分析计划,与 Flink 无区别,不再展开。下面简述差异点。

1. 编写 Spark 任务分析代码

(1)构建 SparkSession


如果需要使用 Spark 的 Structured Streaming 组件,首先需要创建 SparkSession 实例,代码如下所示:


val sparkConf = new SparkConf()  .setAppName("StreamingAnalysis")  .set("spark.local.dir", "F:\\temp")  .set("spark.default.parallelism", "3")  .set("spark.sql.shuffle.partitions", "3")  .set("spark.executor.instances", "3")
val spark = SparkSession .builder .config(sparkConf) .getOrCreate()
复制代码


(2)从 Kafka 读取答题数据


接下来,从 Kafka 中实时读取答题数,并生成 streaming-DataSet 实例,代码如下所示:


val inputDataFrame1 = spark  .readStream  .format("kafka")  .option("kafka.bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092")  .option("subscribe", "test_topic_learning_1")  .load()
复制代码


(3)进行 JSON 解析


从 Kafka 读取到数据后,进行 JSON 解析,并封装到 Answer 实例中,代码如下所示:


val keyValueDataset1 = inputDataFrame1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
val answerDS = keyValueDataset1.map(t => { val gson = new Gson() val answer = gson.fromJson(t._2, classOf[Answer]) answer})
复制代码


其中 Answer 为 Scala 样例类,代码结构如下所示:


case class Answer(student_id: String,                  textbook_id: String,                  grade_id: String,                  subject_id: String,                  chapter_id: String,                  question_id: String,                  score: Int,                  answer_time: String,                  ts: Timestamp) extends Serializable
复制代码


(4)创建临时视图


创建临时视图代码如下所示:


answerDS.createTempView("t_answer")
复制代码


(5)进行任务分析


仅以需求 1(统计题目被作答频次)为例,编写代码如下所示:


  • 实时:统计题目被作答频次


//实时:统计题目被作答频次val result1 = spark.sql(  """SELECT    |  question_id, COUNT(1) AS frequency    |FROM    |  t_answer    |GROUP BY    |  question_id  """.stripMargin).toJSON
复制代码


(6)实时输出分析结果


仅以需求 1 为例,输出到 Kafka 的代码如下所示:


result1.writeStream .outputMode("update") .trigger(Trigger.ProcessingTime(0)) .format("kafka") .option("kafka.bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092") .option("topic", "test_topic_learning_2") .option("checkpointLocation", "./checkpoint_chapter11_1") .start()
复制代码

1.3.3 使用 UFlink SQL 加速开发

通过上文可以发现,无论基于 Flink 还是 Spark 通过编写代码实现数据分析任务时,都需要编写大量的代码,并且在生产集群上运行时,需要打包程序,然后提交打包后生成的 Jar 文件到集群上运行。


为了简化开发者的工作量,不少开发者开始致力于 SQL 模块的封装,希望能够实现只写 SQL 语句,就完成类似上述的需求。UFlink SQL 即是 UCloud 为简化计算模型、降低用户使用实时计算 UFlink 产品门槛而推出的一套符合 SQL 语义的开发套件。通过 UFlink SQL 模块可以快速完成这一工作,实践如下。

1. 创建 UKafka 集群

在 UCloud 控制台 UKafka 创建页,选择配置并设置相关阈值,创建 UKafka 集群。



提示:此处暂且忽略在 Kafka 集群中创建 Topic 的操作。

2. 创建 UFlink 集群

  • 在 UCloud 控制台 UFlink 创建页,选择配置和运行模式,创建一个 Flink 集群。



  • 完成创建


3. 编写 SQL 语句

完成之后,只需要在工作空间中创建如下形式的 SQL 语句,即可完成上述 3 个需求分析任务。


(1)创建数据源表


创建数据源表,本质上就是为 Flink 当前上下文环境执行 addSource 操作,SQL 语句如下:


CREATE TABLE t_answer(    student_id VARCHAR,    textbook_id VARCHAR,    grade_id VARCHAR,    subject_id VARCHAR,    chapter_id VARCHAR,    question_id VARCHAR,    score INT,    answer_time VARCHAR,    ts TIMESTAMP )WITH(    type ='kafka11',    bootstrapServers ='ukafka-mqacnjxk-kafka001:9092,ukafka-mqacnjxk-kafka002:9092,ukafka-mqacnjxk-kafka003:9092',    zookeeperQuorum ='ukafka-mqacnjxk-kafka001:2181/ukafka',    topic ='test_topic_learning_1',    groupId = 'group_consumer_learning_test01',    parallelism ='3' );
复制代码


(2)创建结果表


创建结果表,本质上就是为 Flink 当前上下文环境执行 addSink 操作,SQL 语句如下:


CREATE TABLE t_result1(    question_id VARCHAR,    frequency INT)WITH(    type ='kafka11',    bootstrapServers ='ukafka-mqacnjxk-kafka001:9092,ukafka-mqacnjxk-kafka002:9092,ukafka-mqacnjxk-kafka003:9092',    zookeeperQuorum ='ukafka-mqacnjxk-kafka001:2181/ukafka',    topic ='test_topic_learning_2',    parallelism ='3');
CREATE TABLE t_result2( grade_id VARCHAR, frequency INT)WITH( type ='kafka11', bootstrapServers ='ukafka-mqacnjxk-kafka001:9092,ukafka-mqacnjxk-kafka002:9092,ukafka-mqacnjxk-kafka003:9092', zookeeperQuorum ='ukafka-mqacnjxk-kafka001:2181/ukafka', topic ='test_topic_learning_3', parallelism ='3');
CREATE TABLE t_result3( subject_id VARCHAR, question_id VARCHAR, frequency INT)WITH( type ='kafka11', bootstrapServers ='ukafka-mqacnjxk-kafka001:9092,ukafka-mqacnjxk-kafka002:9092,ukafka-mqacnjxk-kafka003:9092', zookeeperQuorum ='ukafka-mqacnjxk-kafka001:2181/ukafka', topic ='test_topic_learning_4', parallelism ='3');
复制代码


(3)执行查询计划


最后,执行查询计划,并向结果表中插入查询结果,SQL 语句形式如下:


INSERT INTO    t_result1  SELECT      question_id, COUNT(1) AS frequency    FROM      t_answer    GROUP BY      question_id;
INSERT INTO t_result2 SELECT grade_id, COUNT(1) AS frequency FROM t_answer GROUP BY grade_id;
INSERT INTO t_result3 SELECT subject_id, question_id, COUNT(1) AS frequency FROM t_answer GROUP BY subject_id, question_id;
复制代码


SQL 语句编写完毕后,将其直接粘贴到 UFlink 前端页面对话框中,并提交任务,即可快速完成上述 3 个需求。如下图所示:


1.3.4. UFlink SQL 支持多流 JOIN

Flink、Spark 目前都支持多流 JOIN,即 stream-stream join,并且也都支持 Watermark 处理延迟数据,以上特性均可以在 SQL 中体现,得益于此,UFlink SQL 也同样支持纯 SQL 环境下进行 JOIN 操作、维表 JOIN 操作、自定义函数操作、JSON 数组解析、嵌套 JSON 解析等。更多细节欢迎大家参考 UFlink SQL 相关案例展示https://docs.ucloud.cn/analysis/uflink/dev/sql

1.4 总结

UFlink 基于 Apache Flink 构建,除 100%兼容开源外,也在不断推出 UFlink SQL 等模块,从而提高开发效率,降低使用门槛,在性能、可靠性、易用性上为用户创造价值。 今年 8 月新推出的 Flink 1.9.0,大规模变更了 Flink 架构,能够更好地处理批、流任务,同时引入全新的 SQL 类型系统和更强大的 SQL 式任务编程。UFlink 预计将于 10 月底支持 Flink 1.9.0,敬请期待。


本文转载自公众号 UCloud 技术(ID:ucloud_tech)。


原文链接:


https://mp.weixin.qq.com/s/JFcANUK_Vfa7ZMXnn7sruQ


2019-11-07 23:441001

评论

发布
暂无评论
发现更多内容

哈尔滨三级等保测评:合规运营的必由之路

等保测评

构建终极家庭实验室NUC集群 - 第三部分:Docker自动化与媒体堆栈

qife122

Docker 媒体服务器

获取电商平台电子面单 API 打印配置接口:从准备到落地的全流程指南

快递鸟

决战大促之夜:订单洪峰下的“数据速递”如何不宕机?

谷云科技RestCloud

数据处理 数据传输 数据集成平台 ipaas 订单同步

MyEMS:深挖能耗数据,解锁降本与碳管双重 “密码”

开源能源管理系统

开源 开源能源管理系统

AI百舸争流时代,华为如何帮助行业破浪前行?

脑极体

AI

大庆企业为何必须做等保测评?4 大核心原因解析

等保测评

告别 “一刀切” 管理!MyEMS 为不同行业定制专属能源优化方案

开源能源管理系统

开源 开源能源管理系统

MyEMS 如何让企业碳足迹 “可视化”?

开源能源管理系统

开源 开源能源管理系统

免费≠将就!真正能打的招聘系统长这样!

AI得贤招聘官

全国文旅AI整活儿,意外暴露了百度搜索的AIGC创意能力

脑极体

AI

赋值语句

Miracle

社区伙伴活动推荐|半个 AI 圈的百位大咖都来啦!10.17-10.18 上海,密集观点碰撞+超炫 AI 互动!

RTE开发者社区

Huxe 推出主动式 AI 音频服务,无感内容消费;OpenAI 推出 ChatGPT Pulse:主动提供个性化信息丨日报

RTE开发者社区

【RFID工具智能货架选购指南】适合仓库管理的品牌有哪些?

斯科信息

斯科信息 深科物联 RFID工具货架 RFID智能货架

定义工业生产新范式!网易灵动发布全球首款全域智能无人装载机“灵载”

网易伏羲

网易伏羲 具身智能 网易灵动 无人装载机 装载机器人

思维导图用什么软件做?10个主流的导图工具集

职场工具箱

人工智能 思维导图 在线白板 AIGC 思维导图软件

黑龙江等保测评安全技术服务:构建网络安全防线的多面盾牌

等保测评

Gartner报告:可观测性平台魔力象限

乘云数字DataBuff

可观测性 数据治理 IT运维

小企业切入AI 一体机市场还有机会吗

慢点科技SlowTech

飞跃海峡:鲲鹏凿开算力的“米迪运河”

脑极体

AI

社交媒体数据价值爆发:探讨Bright Data、Oxylabs、ThorData海外影音数据采集的潜能 原创

不觉心动

数据分析 数据采集

司马阅全线升级,从“AI文档工具”迈向“AI文档智能体平台”的关键跨越

新消费日报

反向海淘系统核心解析

tbapi

淘宝代购系统 1688代采系统 反向海淘系统 外贸独立站搭建 反向海淘系统搭建

工业管理 项目管理经验总结(14)

万里无云万里天

项目管理 工厂运维

实用的改进

Miracle

大数据-111 Flink 安装部署 On YARN 部署全流程详解:环境变量、配置与资源申请

武子康

Java 大数据 flink spark 分布式

解锁 Windows Server 2025 日志的深入可见性与洞察

运维有小邓

日志管理 日志审计 #日志分析

MongoDB到关系型数据库:JSON字段如何高效转换?

谷云科技RestCloud

MySQL 数据库 mongodb ETL 数据集成平台

可灵活定制的切片规则,才是RAG能够精准的核心关键

UniverAI智宇苍穹

rag 企业级AI基础设施 RAG应用 UniverAI 智宇苍穹

实时计算框架 Flink 在教育行业的应用实践(下)_文化 & 方法_刘景泽_InfoQ精选文章