点击围观!腾讯 TAPD 助力金融行业研发提效、敏捷转型最佳实践! 了解详情
写点什么

Spark Streaming 中流式计算的困境与解决之道

  • 2017-05-17
  • 本文字数:4099 字

    阅读完需:约 13 分钟

Spark streaming  在各种流程处理框架生态中占着举足轻重的位置, 但是不可避免地也会面对网络波动带来的数据延迟的问题,所以必须要进行增量数据的累加。 在更新 Spark 应用的时候或者其他不可避免的异常宕机的时候,增量累加会带来重复消费的问题,在一些需要严格保证 exact once 的场景下, 这个时候我们就需要进行离线修复,从而保证 exact once 语义, 本文将针对这个问题,提供一些常见的解决方案和处理方式。

下图中展示了数据延迟的一个场景:

(点击放大图像)

在讨论解决消息乱序问题之前,需先定义时间和顺序。在流处理中,时间的概念有两个:

  • Event time :Event time 是事件发生的时间,经常以时间戳表示,并和数据一起发送。带时间戳的数据流有,Web 服务日志、监控 agent 的日志、移动端日志等;
  • Processing time  :Processing time 是处理事件数据的服务器时间,一般是运行流处理应用的服务器时钟。

上图中 time1,time2, time3 等是我们 Spark straming 拿到消息将要处理的时间, 图中方块中的数字代表这个 event 产生的时间, 有可能因为网络抖动导致部分机器上的日志收集产生了延迟, 在 time3 的 batch 中包含 event time 为 2 的日志, 特别说明一下, kafka 中的不同分区的消息也是没有顺序的。

在实时处理过程中也就产生了两个问题:

  • Spark streaming 从 Kafka 中拉取到的一批数据,我们可能认为里面包含多个时间区间的数据
  • 同一个时间的数据可能出现在多个 batch 中

针对第一个问题, 一个 batch 中包含多个时间区间的数据, 加入我们的区间粒度是 5 分钟, 那么一个 batch 钟有可能包含 0~5 时间区间中的部分数据,  也有可能包含 5~10 时间区间中的部分数据, 这个很好处理,我们先对时间进行向下 5 分钟取整,然后使用取整后的时间分为多组, 然后计算出来指标,` select time, count(*) group by  取整(time),就算出来了这个 batch 中每个时间区间中的数据。

但是对于第二个问题,就很麻烦, 图中举例, 时间区间中 2 出现在了 time2 和 time3,  我们需要在两个 batch 中计算出 2 的指标, 然后进行累计,  这个累计的过程, 你可以在内存中保存状态, 使用 Spark streaming 中的 UpdateStateByKey等算子, 但是不推荐这样使用, 这样就在你的应用中引入了状态和 Checkpoint 机制, 还有一个方法, 就是把这个状态放在持久化存储中, 比如每次都在 Redis, 或者 Hbase 中进行累计,Spark 从 Kafka 拉取日志是可以做到 至少消费一次,但是这种模式 很难保证 exact once 。

假如有下面一种情形,

(点击放大图像)

就会存在这种情况, 我们对 job1 执行 Checkpoint 操作, 然后 job1 被调度执行, 从 Kafka 拉取数据处理, 然后结果保存在 HBase 中, 保存了一半, 机器挂了, 如果重启,recover, 这时候 job1 就会被重复执行, Kafka 中的数据就会被重复消费, HBase 中的部分指标也就多加了一份,虽然我们可以使用 Spark 或者 Flink 中提供的 Watermark 功能。

(点击放大图像)

也就是维护一个窗口, 然后设置一个最大等待时间, T1 ~T4 中的数据到了最大等待时间后就会触发计算,但是这样也会有问题, 如果部分数据的延迟超过了最大等待时间,  这部分数据也就永远的丢失了。

当然如果业务可以容忍, 那么使用这个功能也是可以的,每次都使用 全量覆盖操作。

解决方案

以上我们面临的问题是 Spark streaming + Kafka 组合可以保证 at lease once ,但是很难保证 exact once, 也就是会重复消费, 我们得想办法做到去重, 计算结果 落地存储会有两种模式:

  • append 增量的模式, 也就是每次都做累加
  • complete 的模式, 也即是保证幂等性, 每次都是覆盖, 保证没有副作用

因为同一个时间的数据可能出现在多个 batch 中,所以我们在准实时计算中, 只能是 append 模式, 上文我们已经论证过了,这种模式会出现重复消费的问题。

由于机器挂了的现象是偶发的, 所以我们可以在挂掉后,  对数据进行离线修复, 也就是我们要保证有一份全量的离线数据。

这份数据我们要保证是不漏不多, 而且是按照 event time 时间区间分开的, 这样我们就可以针对出问题的时间区间, 加载这个时间区间的离线数据, 算出结果, 然后进行覆盖。这样就保证了数据的准确性。

我们落地的数据的特点是:

  • 全量的,不漏不多
  • 按照定义的时间区间分片

因为从 Kafka 中拉取存储能保证不丢, 这里我们考虑如何去重, 首先我们要对消息能有一个唯一 ID, 我们使用 Kafka 的 partition 加 offset 作为这个消息的唯一 ID, 如果存储到 HBase,  这样的话在生成一个消息的时候,我们的 ID 就不会重复,即使你重跑很多次,HBase 会自动把它去重。

如果存储到 hdfs,  我们可以每行数据前面都用 ID 作为头字段, 离线处理的时候根据这个字段先进行去重处理,这样也能保证了 exact once 语义。

输出流程

我们看下 Spark streaming 存储到 HDFS 或者 HBase 都会调用 saveAsHadoopDataset。

val writer = new SparkHadoopWriter(hadoopConf)
writer.open()
 Utils.tryWithSafeFinallyAndFailureCallbacks {
        while (iter.hasNext) {
          val record = iter.next()
          writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
        }
      }(finallyBlock = writer.close())
writer.commit()

这里根据你传入的 OutFormat 调用 getwriter。

(点击放大图像)

然后再 writer 上调用 open write close commit 方法。

这里如果是 HBase  就是调用 HBase client 的写入方法:

  • 用户提交 put 请求后,HBase 客户端会将 put 请求添加到本地 buffer 中,符合一定条件就会通过 AsyncProcess 异步批量提交。HBase 默认设置 autoflush=true,表示 put 请求直接会提交给服务器进行处理;用户可以设置 autoflush=false,这样的话 put 请求会首先放到本地 buffer,等到本地 buffer 大小超过一定阈值(默认为 2M,可以通过配置文件配置)之后才会提交。很显然,后者采用 group commit 机制提交请求,可以极大地提升写入性能,但是因为没有保护机制,如果客户端崩溃的话会导致提交的请求丢失。
  • 在提交之前,HBase 会在元数据表.meta. 中根据 rowkey 找到它们归属的 region server,这个定位的过程是通过 HConnection 的 locateRegion 方法获得的。如果是批量请求的话还会把这些 rowkey 按照 HRegionLocation 分组,每个分组可以对应一次 RPC 请求。
  • HBase 会为每个 HRegionLocation 构造一个远程 RPC 请求 MultiServerCallable,然后通过 rpcCallerFactory.newCaller() 执行调用,忽略掉失败重新提交和错误处理,客户端的提交操作到此结束。

这里如果是 HDFS 文件写入:

  • 首先根据 TaskAttemptID 构造出来一个临时写入路径,构造一个文件流
  • 写入临时写入路径
  • commit 的时候调用 commitTask 根据目标路径是否存在, 如果已经存在就删除临时文件,报错, 如果不存在就直接 rename, 把临时文件名, 改为目标文件名, 这里主要是防止多个分区写入同一个目标文件,导致的冲突。
  • 多文件分组输出

如果有一个需求,需要把数据根据不同的 key 输出到不同的文件中, 上文中,我们先根据 batch 进行分组, 然后不同分组的文件输出到不同的文件,这时候就需要用到MultipleOutputFormat

 TreeMap<String, RecordWriter<K, V>> recordWriters = new TreeMap<String, RecordWriter<K, V>>();
 K actualKey = generateActualKey(key, value);
 V actualValue = generateActualValue(key, value);
 RecordWriter<K, V> rw = this.recordWriters.get(finalPath);
 if (rw == null) {
     rw = getBaseRecordWriter(myFS, myJob, finalPath, myProgressable);
     this.recordWriters.put(finalPath, rw);
 }
 rw.write(actualKey, actualValue);

这里就是维护了一个 TreeMap, 里面每个不同的 key, 构造一个 writer,  这个 writer  是getBaseRecordWriter -> theTextOutputFormat.getRecordWriter根据临时路径构造出一个输出流, 包装为一个   LineRecordWriter 最终的 writer 就是在这个 DataOutputStream 上进行输出,

上层多文件输出根据不同的 key, 从 treeMap 上获取到不同的文件输出流, 然后进行多文件输出。

这里会存在一个问题,  同一个时间的数据可能出现在多个 batch 中, 就是会产生很多小文件,HDFS 对小文件支持很差,我们需要合并小文件,但是我们也可以直接在输出的时候进行 append 操作,就直接避免了产生小文件。

这里就需要改源码了。

(点击放大图像)

上面的类图可以清楚的显示类图的关系,  MultipleOutputFormat 的 writer 会调用子类的 getBaseRecordWriter, 我们可以在这里改写一下,  使用我们自己的 TextOutputFormatNew 的 getRecordWriterNew 方法, 在方法里面构造输出流的时候, 如果文件已经存在,就进行 append 操作。

val fileOut: FSDataOutputStream = if (HDFSFileService.existsPath(file)) {
        println("appendfile")
        fs.append(file)
      } else {
        println("createfile")
        fs.create(file, progress)
      }
def getTaskOutputPath(job: JobConf, iname: String): Path = {
    val name: String = job.get(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR)
    val completePath = name + "/" + iname
    val path = new Path(completePath)
    path
  }

把构造临时路径的方法也修改了, 强制不产生临时路径, 每次都往同一个文件中进行 append, 这样就达到了目的。

小结

本文提供的解决方案, 在不修改 Spark 源码本身的前提下, 进行了一些必要的扩展,  其实本质上来讲, 就是我们假定异常状况是经常发生的, 我们就要面对它,就是要对输入流 kafka 中的原始数据进行唯一标识,保证可以去重,然后持久化。 对发生异常的时间区间, 进行数据重放,就像数据中用 redo 日志进行重放一样。

作者介绍

孙彪彪,目前在七牛云存储从事 Spark 相关工作,对 Spark 有深入的研究和实践,微信公众号:Spark 技术分享。email: 1319027852@qq.com。

2017-05-17 06:587637

评论 2 条评论

发布
用户头像
你好,讨论下落地到hdfs的场景,我也尝试过getBaseRecordWriter方法,把增加一个append模式,这样程序运行起来没什么问题,但再实际场景中其实会丢数据的,究其原因是 writer.write其实在写在try{ ... } catch() commit() 里面的,当你把write重写之后(create模式变为create/append),你的commit() 失败的情况会导致你append整个文件失败,这样就会丢数据。

提供一个新的思路:用你之前说的complete模式,生成临时目录在做转移,但这样的后果是会有很多临时小文件,hadoop负载很大,需要在做一次 压缩操作,我们系统现在就是用的这种方案。还有一个好处是,这个方案可用性会比之前的那个大大提高。
展开
2018-11-23 10:32
回复
没有更多了
发现更多内容

线程范围内共享数据

武哥聊编程

Java 多线程 28天写作

产品经理训练营第三周作业 - 利益相关方(二)

Denny-xi

产品经理 产品经理训练营

极客时间产品经理训练营第 3 次作业

待注册

极客大学产品经理训练营

第三周笔记

Ashley.

第三周作业

Ashley.

程序员如何打破35岁魔咒

数据社

一带一路上的中国品牌!AWS 助力中国新能源车企走向世界!

亚马逊云科技 (Amazon Web Services)

CSS(十一)——用CSS设置超链接样式

程序员的时光

七日更 28天写作 2月春节不断更

CSS(十二)——用CSS设置列表样式

程序员的时光

七日更 28天写作 2月春节不断更

产品训练营第三周

克比

大数据两万年

大伟

大数据 GFS

作业:游戏的利益相关者

嫉妒的耗子

利益相关者的问题

沈弋

价值投资学习笔记

JiangX

28天写作

站出来打造真正开源的 Elasticsearch

亚马逊云科技 (Amazon Web Services)

学计算机的都是傻子?《打工人的点点思考》

谙忆

产品经理训练营 Week3 作业

Mai

产品经理训练营第二章作业(二)

新盛

《期权合同》常见的一个大坑,99%中招 | 视频号28天(26)

赵新龙

28天写作

产品训练营第三周作业-利益相关者关注的问题

jpcr987i

第三周作业

Geek_971380

产品经理第三周作业

朱琴

话题讨论 | 工作之外的时间怎样分配

程序员架构进阶

时间分配 自我提升 话题讨论 2月春节不断更

入网指南:一文读懂你身边的网络

🍉 别再恐惧 IP 协议(万字长文 | 多图预警)

飞天小牛肉

面试 计算机网络 IP TCP/IP 2月春节不断更

批判性思维自修课(七)

石君

28天写作 批判性思维

LeetCode题解:69. x 的平方根,牛顿迭代法+迭代,JavaScript,详细注释

Lee Chen

算法 大前端 LeetCode

翻译:《实用的Python编程》01_00_Overview

codists

Python

K8S原生存储持续进化,Longhorn 1.1迎来ARM支持

Rancher

作业 - 第二章 产品思维和产品意识 (二)

hao hao

产品训练营 第三周作业

万顷湖天碧

产品训练营

Spark Streaming中流式计算的困境与解决之道_语言 & 开发_孙彪彪_InfoQ精选文章