当前位置:   article > 正文

大数据Spark实战第四集 spark优化和使用 Spark Streaming_spark gpu stream

spark gpu stream

Tungten 和 Hydrogen:Spark 性能提升与优化计划

在前面的课时中,我们学习了 Spark 的用法和原理,今天这个课时主要介绍 Spark 两个比较重要的优化提升项目,从这两个项目中可以看出 Spark 的优化思路。

这节课与前面的课时有所不同,主要介绍一些比较细的优化思路,其中很多也与开发数据库的思路不谋而合,你可以换换脑筋,无法完全理解也没关系,可以作为阅读一些论文、参与开源社区讨论的基础。

Tungsten 项目

Tungsten 项目产生的原因是由于固态硬盘和万兆交换机的普及和应用,I/O 性能的大幅提升使得 CPU 和内存成了大数据处理中的新瓶颈。例如一个中等规模的集群(50~100台),在某些大型作业执行过程中,网络 I/O 和硬盘 I/O 经常会接近其性能理论值,而 CPU 的使用率却很难长期维持在一个很高水平。基于此,Spark 开发团队希望开发一个新的 Spark 核心执行引擎来尽可能地压榨出 CPU 和内存的性能极限。2015 年,Tungsten 项目诞生了。

1.内存管理

Tungsten 旨在利用应用程序语义显式管理内存,消除 JVM 对象模型和垃圾回收的开销。Spark 选择 JVM 来负责内存管理,JVM 的垃圾回收器(Garbage Collector,GC)会不停地监控某个对象是否还有活跃的引用,如果没有,垃圾回收器会回收该对象并释放为其分配的内存。而对对象的引用通常存在于堆中的某些对象里或者作为变量存放于栈里,前者存活时间较长,后者存活时间较短。

另外,JVM 对象开销一向是很大的,例如字符串采用 UTF-8 编码,还有一些对象 header 等信息,这样就容易引起内存不足,降低 CPU 访问数据的吞吐量。JVM 的垃圾回收器是一个非常复杂的组件,同时它的设计思路和面对的场景通常都是针对在线事务处理(OLTP)系统,如交易系统,而 Spark 的使用场景则偏向于在线分析处理(OLAP)系统和离线计算系统,这两种场景对于性能需求差别非常大,因此利用 JVM 的垃圾回收器来应对 Spark 面对的场景,必然无法令人满意。

Tungsten 的目的就是摆脱 JVM 的垃圾回收器,自己管理内存。尽管在过去十几年中,对于那些普通用途的字节码,JVM 的垃圾回收器在预测对象生命周期方面取得了很好的效果,但是 Spark 比谁都清楚哪些数据需要留在内存中,哪些需要从内存中移除,这种情况下选择 JVM 管理内存,无疑不是最好的选择,这也是利用应用语义显式管理内存的意义所在,因此,Tungsten 绕过了 JVM 提供的安全内存托管系统,而使用了 sun.misc.Unsafe 包中的类,它允许 Tungsten 自主管理其内存。使用 Unsafe 类构建的数据结构在存储和访问性能上也大大优于 JVM 对象模型。

2.缓存感知计算

现代计算机系统使用 64 位地址指针指向 64 位内存块。而 Tungsten 也总是使用 8 字节的数据集来和 64 位内存块对齐。在 CPU 内核和内存之间,有一个 L1、L2 和 L3 高速分层存储,它们随着 CPU 数量增加而增加。通常,L3 在所有核心之间共享。如果你的 CPU 内核要求将某个主存储器地址加载到 CPU 内核的寄存器(寄存器是 CPU 内核中的一个存储区),那么首先会在 L1~L3 缓存中检查是否包含请求的内存地址。我们将与这种地址相关联的数据称为存储器页。如果是这种情况,则略过主存储器访问,并且从 L1,L2 或 L3 高速缓存中直接加载该页。否则,该页从主存储器加载,会导致更高的延迟。延迟太高,CPU 内核会等待(或执行其他工作)多个 CPU 时钟周期,直到主存储器页被传送到 CPU 内核的寄存器中。此外,该页也被放入所有高速缓存中,并且如果它们是满的,则从高速缓存中删除较不频繁访问的存储器页。因此我们得出两个结论:

  • 在计算过程中多次访问存储页,缓存才有意义。
  • 由于缓存远小于主存,它们只包含主存储器页的子集,因此,为了从缓存中受益,需要一个暂时结束的访问模式,因为如果在计算任务的末期才访问同一个页面,那么它可能已经从缓存中被去除掉了。

基于上述结论,缓存淘汰和预取策略十分关键。当然,现代计算机系统不仅使用最近最少使用算法(Least Recently Use,LRU)从缓存中删除缓存的存储页,还会保留下那些虽然缓存时间长但很有可能被再次请求的存储页。另外,现代 CPU 还会预测将来的存储页请求,从而将该存储页预取至缓存中。不管怎样,应始终避免随机存储访问模式,通常越顺序存储访问执行得越快。

那么我们应该如何避免随机存储访问模式呢?让我们来看看 java.util.HashMap 。顾名思义,键(key)对象的散列值(value)会被用来将对象分组到桶中。散列(hash)的副作用是:哪怕键值差别非常细微,散列值也会不一样,并会导致被分组到相应的桶中。每个桶可以被看成指向存储在映射表中的链表指针(pointer)。这些指针指向的是随机内存区域。因此,顺序扫描是不可能的,如下图所示:

图片1.png

你可能会发现这些指针指向的对象都位于主存储(Java 堆)的随机区域中。为了提升顺序扫描性能,Tungsten 采取了不同的办法:指针不仅存储目标值内存地址,还会保存键本身。在前面,我们已经了解了 UnsafeRow 的概念,8 字节的存储区域用来保存两个整型值,例如,键和指向值的指针。这种存储布局如下图所示。

图片2.png

这样,就可以运行具有顺序存储访问模式的排序算法(如快速排序)。当排序时,键和指针的组合存储区域会被到处移动,存储值的地方却不会变。虽然这些值可以随机分布在存储器中,但是键和指针的组合存储区域被以顺序布局,如下图所示。

图片3.png

3.代码生成

下面这段代码的逻辑很简单,可以理解为做了一个向量的内积,i、j 都来自某个向量。

val i = 23
val j = 5
var z = i*x+j*y
  • 1
  • 2
  • 3

假设 x 和 y 都来自表中的某一行。现在,假设将表达式应用到表中的每一行中,而这个表有数十亿行,JVM 只能执行这个表达式数十亿次,这是一个非常大的开销。因此 Tungsten 实际做的是将这个表达式转换为字节码,并将其发送到执行者线程中。
你可能知道,每个类在 JVM 上执行的都是字节码,这是针对不同微处理器架构的机器代码的一个中间层,这是 Java 的特点之一。因此 JVM 的工作流如下:

1、Java 源代码被编译为字节码;

2、Java 字节码被 JVM 翻译;

3、JVM 将字节码转换成特定平台的机器指令,并将其发送到目标 CPU。

目前,还没有人想过在运行时直接生成字节码,这就是代码生成想要实现的。Tungsten 分析将要被执行的任务生成由人编写的,在 JVM 上执行的特定的高性能字节码,而不是依赖预编译组件。

Tungsten 还有助于加速序列化与反序列化对象,JVM 提供的原生框架性能较差。而分布式数据处理框架的性能瓶颈通常在 Shuffle 阶段,在这个阶段中,数据通过网络传输,对象的序列化与反序列化是主要瓶颈(而不是 I/O 带宽),它同时也增加了 CPU 负担。因此提高这里的性能有助于消除计算瓶颈。

4、Catalyst 优化器

Catalyst 优化器是 Spark SQL 的重要组成部分,它是一个函数式可扩展的查询优化器,贯穿于查询计划的生成到最后执行计划的生成,对 Spark SQL 优化起到了至关重要的作用,在Tungsten 中,Catalyst 优化器也得到了优化和提升。在关系型数据库系统中,通常认为查询优化器是其最为复杂的核心组件,Spark SQL 也是如此,在 Catalyst 优化器的帮助下,Spark 开发者只需要编写简单的 SQL 就能驱动非常复杂的查询作业,并能获得最佳性能表现。

那么 Catalyst 优化器是如何工作的?下面这张图展示了优化器核心组件和顺序调优的过程:

图片4.png

首先,无论是 DataFrame API、Dataset API 还是 Spark SQL,它们都会被转换为 ULEP(Unresolved Logical Execution Plan,未解析的逻辑执行计划)。ULEP 本质上就是一棵 SQL 语法树,生成了 ULEP 后还不能直接执行,而是通过一系列工作对 ULEP 进行处理。当ULEP 在数据目录(Catalog)中补齐了字段类型、列名等时,就会成为 RLEP(Resolved Logical Execution Plan,解析好的逻辑执行计划)。

RLEP 会经过多次转换生成 Optimised Logical Plan(优化的逻辑计划),该计划不会包含如何计算的描述,而只包含必须被计算的内容。根据一些策略,优化的 LEP 会转化为 PEP(Physical Execution Plans,物理执行计划)。PEP 是完全解析的执行计划。这意味着一个 PEP 包含生成期望结果的详细指令。生成 PEP 的策略会对连接算法进行优化。此外,对那些在一个 RDD 上执行的多个操作,会根据规则简化为一个复杂操作。在生成了很多 PEP 后(它们都会返回相同的结果),最好的选择是基于启发式算法来最小化执行时间。最后,执行操作会作用于 RDD 上。

在数据源支持的情况下,某些操作可以被下压到数据源,如过滤(谓词)或者属性选择(投影)。谓词下压的主要思路是,某些抽象语法树的部分不由 Spark 来执行,而是由数据源本身来执行,这样就减少了 Spark 与数据库的数据传输。

从 Tungsten 可以看出,使用 DataFrame、Dataset 和 Spark SQL 处理数据,可以看成是一种从底层高度优化的 RDD 执行方案。 这种优化是全方位的,不仅仅体现在执行计划上,还体现存储、计算方式上。

Hydrogen 项目

Hydrogen 项目与 Tungsten 项目一样,都是对 Spark 有巨大提升的前沿探索项目,Hydrogen 项目从 Spark 2.3 开始,历经 Spark 2.4 以及 Spark 3.0。Hydrogen 项目出现的背景是,目前机器学习框架与深度学习框架开始井喷,而 Spark 的野心在于一统整个数据科学领域,所以也乐见其成,Spark 对这些框架的态度是“拥抱机器学习生态系统,并将其视为一等公民”,由此,Spark 需要将涉及数据预处理以及模型训练等整个流程深度地与这些机器学习、深度学习框架进行集成,这也是 Hydrogen 项目的目标。

为了实现这个目标,也就是高效地支持绝大部分机器学习框架,Spark 面临两大挑战:数据交换与执行模型。我们来看看 Hydrogen 项目是如何解决的。

1、数据交换

数据交换指的是在 Spark 与机器学习框架之间高吞吐地传输数据。Spark 提出了一种用户自定义函数(UDF),用来执行用户任意的代码。这种 UDF 通常用来与机器学习框架进行集成,例如使用 TensorFlow 对测试数据进行预测。UDF 支持各种语言,如 Scala、Python、R 等,UDF 可以很方便地使 Spark 与机器学习框架进行集成,用户可以在 UDF 中写一段代码来调用机器学习库。在使用 UDF 时,我们可以采用一次一行的方式执行,如下图所示:

图片5.png

上图中包含一个简单的 Python UDF,对输入进行 +1 操作,它将对每一行的第一列进行 +1 操作,数据首先被 Spark 一次一行地读取,并在 Spark 中进行列切分,将第一列发送给 Python 进程,Python 进程接收到输入以后,对输入进行 +1 操作,并返回给 Spark,Spark 得到结果并将其和原来的两列拼接成新的一行,也就是上图中右边的那一行。Spark 一共要执行 3 次操作,直到所有数据读取完毕。如果我们深入分析这种交换方式,会发现这种交换方式的性能非常糟糕,原因是大部分的时间花在了 Spark 将数据传输给 Python,Python 又把数据传输给 Spark 上,据统计,92% 的 CPU 周期被浪费了。这当然不是我们想要的,来看看下一种交换方式:向量化的数据传输,如下图所示:

图片6.png

与一次一行的数据交换方式不同,我们采取了列式存储的小批量传输,也就是说,数据本来就是按列存储,如 ORC 或者 Parquet 这种格式,而非按行存储。Spark 会选取第一列(需要进行 +1 的列)的一个切片发送给 Python 进程,而 Python 收到的则会是一个 numpy 数组或者 panda 序列,在 UDF 中我们可以直接通过向量化操作对向量进行 +1 操作,这种计算无疑是高效的,例如 numpy 底层的数组操作由 C 语言编写,效率较原生 Python 大大提升。Spark 得到结果会按照固有的列式存储格式发送给下游。

向量化的数据交换方式在两个环节性能都有提升,其一是与 Python 进程的数据传输,其二是 UDF 的执行效率,根据 Databricks 的测试结果,整体效率较一次一行的数据交换方式有 3~240 倍的提升,效果极其明显。

2、执行模型

执行模型要解决的是,一旦 Spark 与机器学习框架进行深度融合,就会导致它与计算模型之间天生的不相容性。如果不解决这个问题,那么“一等公民”始终是一句空话。Spark 的计算模型是高度并行的,作业被划分为任务,任务与任务之间相互独立,没有依赖,如下图左边所示。

而常见的分布式机器学习框架的执行模型通常是统一调度,互相协调的,这是为了优化通信,在模型训练过程中,任务之间通常会有高吞吐和大带宽的数据交互,如下图右边所示。

图片7.png

这两种模式看起来没有什么冲突,但是一旦某个任务失败,Spark 只需重新执行该任务即可,但分布式机器学习框架通常会执行所有相关的任务。在 Hydrogen 项目的第 2 部分,Spark 在一个更高的层次提出了一种带有同步栅的执行模型(及其配套的 API),统一了 Spark 与机器学习框架的执行模型,如下图所示。

图片8.png

在这种模型中,Spark 将整个作业切分成 3 个 Stage,其中虚线表示的就是同步栅,在每个 Stage 中,并行的方式可以不同,以 Stage 2 的并行方式为例,一旦某个任务失败,将会重新执行所有任务。这种执行模型很好地融合了 Spark 与机器学习框架。

从 Hydrogen 项目的这两个部分来看,Hydrogen 项目的关键词是融合,数据交换从数据边界的层面进行了融合,而执行引擎在执行逻辑上将两种不同的分布式计算理念进行了融合,从上图中可以看出数据交换是执行引擎的基础,Stage 之间的数据交换就是利用了 Hydrogen 的数据交换的能力。

在 Spark 后面版本的迭代中,Hydrogen 项目的主要内容就是 SPARK-24579。SPARK-24579 的主要内容是标准化 Spark 和人工智能、深度学习框架,如 TensorFlow、MXNet 之间的数据交换过程,并优化其传输性能。SPARK-24579 的出发点在于,目前大数据与人工智能的结合是很多业务与应用成功的关键,而这两个领域的顶级开源社区也多次尝试整合,但由于 Spark SQL、DataFrame、Structured Streaming 的日趋成熟,Spark 仍然是大数据社区的首选,因此人工智能框架如何与 Spark 进行集成是整合的关键。当然,目前已经存在一些解决方案,如 TensorFlowOnSpark、TensorFrames 等,但是还没有一种标准化传输方案,所以性能优化只能根据具体情况来实现,SPARK-24579 所探讨的正是如何降低整个过程的复杂性:标准化 Spark 和人工智能、深度学习框架之间的数据交换接口。这样,人工智能、深度学习框架就可以利用 Spark 从任何地方加载数据,而无须花费额外的精力来构建复杂的数据解决方案。

在 JIRA 上我们还可以通过 Hydrogen 项目的标签对 issue 进行过滤,目前有 3 个没有关闭的史诗级 issue,除了SPARK-24579、SPARK-24374,还有一个 SPARK-24615,如下图所示,该 issue 也是 Hydrogen 项目的一个重要改进,将会为 Spark 添加原生的 GPU 调度支持。

image (1).png

目前,GPU 已经广泛应用于分布式深度学习与训练加速,但通常用户需要用 Spark 加载大量数据,最新的 Spark 版本已经在 YARN 和 Kubernetes 中支持 GPU 了,虽然如此,但是 Spark 本身并不知道它们暴露的 GPU,所以 Spark 用户无法正常请求和调度,SPARK-24615 将会为这类训练加速任务添加调度支持,该 issue 的目标如下。

  • 让 Spark 3.0 在 Standalone 模式、YARN 模式和 Kubernetes 模式中具有 GPU 感知能力。
  • 保证普通作业的调度性能。

未来,该 issue 希望达到的目标如下。

  • GPU 计算卡的细粒度调度。
  • 将 GPU 计算卡和它的内存看成一个不可分割的单元。
  • 支持 TPU。
  • 支持 Mesos。

该 issue 想要做到的是在资源层面实现 Spark 和人工智能框架的融合和统一,同步栅模型则在执行层面上实现 Spark 和人工智能框架的融合和统一。

小结

本课时主要介绍了 Spark 开发过程中的两个重要大型优化项目,这两个项目很大程度上体现了 Spark 设计者的思路,并影响着 Spark 未来的发展方向,其中可以看到,向量化执行和代码生成都是数据库引擎的优化技术,说明 Spark 借鉴了这一部分思想为己用。

本课时的内容偏理论,所以这里给你留一个任务,你可以去 Spark Jira 看板上跟踪 Hydrogen 项目的最新进展以及 Spark 未来的发展方向,链接如下:

https://issues.apache.org/jira/projects/SPARK/

你不要小看了这个过程,未来你也可以向社区提出自己所需的 issue,提出 issue 也是参与社区的一种重要形式。


实战:探索葡萄牙银行电话调查的结果

本课时我们进入实战课程的演练:探索葡萄牙银行电话调查的结果。本课时主要用真实数据构建了一个数据探索场景,这既不是一个项目也不是一个应用,只是一个探索的过程。

这个过程在实际应用中是非常常见的,无论是对分析师还是工程师来说,对数据的探索都是必要的,为此,Spark 也创新地开发了 Spark Shell 应用,让编译型语言 Scala 用起来像脚本语言一样,提高了实践效率。

数据(下载链接为:https://pan.baidu.com/s/1up25t-HQF16Sx4-naC0rJw,密码为 jzke)来源于葡萄牙银行电话调查的结果,本课时会通过一些分析手段逐步对数据展开探索,直到用户得到想要的信息。本课时的内容完全是实践,没有理论,希望你动手一起做。由于案例中用到了 Dataset API,故采用 Scala 版本。

下面这段代码读取了葡萄牙银行通过电话访问进行市场调查得到的数据集,并统计了数据条数:

import org.apache.spark.sql.types._
import org.apache.spark.sql.{SparkSession}
import org.apache.spark.sql.functions._
 
case class Call(age: Double, job: String, marital: String, edu: String, 
credit_default: String, housing: String, loan: String, 
contact: String, month: String, day: String, 
dur: Double, campaign: Double, pdays: Double, 
prev: Double,pout: String, emp_var_rate: Double, 
cons_price_idx: Double, cons_conf_idx: Double, euribor3m: Double, 
nr_employed: Double, deposit: String)
 
//葡萄牙银行通过电话访问进行市场调查得到数据集,以下为21个字段
//受访者年龄
val age = StructField("age", DataTypes.IntegerType)
//受访者职业
val job = StructField("job", DataTypes.StringType)
//婚姻状态
val marital = StructField("marital", DataTypes.StringType)
//受教育程度
val edu = StructField("edu", DataTypes.StringType)
//是否信贷违约
val credit_default = StructField("credit_default", DataTypes.StringType)
//是否有房屋贷款
val housing = StructField("housing", DataTypes.StringType)
//是否有个人贷款
val loan = StructField("loan", DataTypes.StringType)
//联系类型(移动电话或座机)
val contact = StructField("contact", DataTypes.StringType)
//当天访谈的月份
val month = StructField("month", DataTypes.StringType)
//当天访谈时间的是星期几
val day = StructField("day", DataTypes.StringType)
//最后一次电话联系持续时间
val dur = StructField("dur", DataTypes.DoubleType)
//此次访谈的电话联系的次数
val campaign = StructField("campaign", DataTypes.DoubleType)
//距离早前访谈最后一次电话联系的天数
val pdays = StructField("pdays", DataTypes.DoubleType)
//早前访谈电话联系的次数
val prev = StructField("prev", DataTypes.DoubleType)
//早前访谈的结果,成功或失败
val pout = StructField("pout", DataTypes.StringType)
//就业变化率(季度指标)
val emp_var_rate = StructField("emp_var_rate", DataTypes.DoubleType)
//消费者物价指数(月度指标)
val cons_price_idx = StructField("cons_price_idx", DataTypes.DoubleType)
//消费者信心指数(月度指标)
val cons_conf_idx = StructField("cons_conf_idx", DataTypes.DoubleType)
//欧元银行间3月拆借率
val euribor3m = StructField("euribor3m", DataTypes.DoubleType)
//员工数量(季度指标)
val nr_employed = StructField("nr_employed", DataTypes.DoubleType)
//目标变量,是否会定期存款
val deposit = StructField("deposit", DataTypes.StringType)
 
val fields = Array(age, job, marital, 
edu, credit_default, housing, 
loan, contact, month, 
day, dur, campaign, 
pdays, prev, pout, 
emp_var_rate, cons_price_idx, cons_conf_idx, 
euribor3m, nr_employed, deposit)
 
val schema = StructType(fields)
 
val spark = SparkSession
.builder()
.appName("data exploration")
.master("local")
.getOrCreate()
import spark.implicits._
 
//该数据集中的记录有些字段没用采集到数据为unknown
val df = spark
.read
.schema(schema)
.option("sep", ";")
.option("header", true)
.csv("./bank/bank-additional-full.csv")
 
println(df.count())
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82

运行之后的结果为:41188。接下来我们再来根据婚姻情况统计各类人群的数量和缺失值的数量。

//该数据集将bank-additional-full.csv中原本是unknown的字段置为null
val dm = spark
.read
.schema(schema)
.option("sep", ";")
.option("header", true)
.csv("./bank/bank-additional-full-missing.csv")
//根据婚姻情况统计各类人群的数量和缺失值的数量
dm.groupBy("marital").count().show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

结果为:

Drawing 0.png

现在我们再根据职业统计各类人群的数量和缺失值的数量:

//根据职业统计各类人群的数量和缺失值的数量
dm.groupBy("job").count().show()
  • 1
  • 2

结果为:

Drawing 1.png

接下来根据教育情况统计各类人群的数量和缺失值的数量:

//根据教育情况统计各类人群的数量和缺失值的数量
dm.groupBy("edu").count().show()
  • 1
  • 2

结果为:

Drawing 2.png

下面我们选取数值类字段作为数据子集,进行描述性统计:

//选数值类字段作为数据子集,进行描述性统计(包括频次统计,平均值,标准差,最小值,最大值)
val dsSubset=dm.select("age","dur","campaign","prev","deposit").cache()
//通过描述性统计,可以对数据进行快速地检查。比如,频次统计可以检查数据的有效行数,年龄的平均值和范围可以判断数据样本是不是符合预期。通过均值和方差可以对数据进行更深入地分析,比如,假设数据服从正态分布,年龄的均值和标准差表明了受访者的年龄大多在30~50 之间。
dsSubset.describe().show()
  • 1
  • 2
  • 3
  • 4

结果为:

Drawing 3.png

下面这段代码判断了变量间的相关性:

//判断变量间相关性,计算变量间的协方差和相关系数,协方差表示两变量的变化方向相同或相反。age和dur的协方差为-2.3391469421265874,表示随着受访者的年龄增加,上一次访问时长减少。
println(dsSubset.stat.cov("age","dur"))
  • 1
  • 2

结果为:-2.3391469421265874。

接下来计算相关系数:

//相关系数(Pearson系数)表示变量间的相关程度。age和dur的相关系数为-8.657050101409117E-4,呈较弱的负相关性。
println(dsSubset.stat.corr("age","dur"))
  • 1
  • 2

结果为:8.657050101409117E-4。

下面计算每个年龄段的婚姻状态分布:

//交叉表,通过交叉表可以知道在每个年龄段的婚姻状态分布
 ds.stat.crosstab("age","marital").orderBy("age_marital").show(20)
  • 1
  • 2

结果为:

Drawing 4.png

下面这段代码展示了所有受访人的学历背景出现频率超过 0.3 的学历:

//所有受访人的学历背景出现频率超过0.3的学历
println(ds.stat.freqItems(Seq("edu"),0.3).collect()(0))
  • 1
  • 2

结果为:
[WrappedArray(high.school, university.degree, professional.course)]。

下面计算受访用户年龄的分位数:

//四分位数,第三个参数0.0表示相对误差
df.stat.approxQuantile("age",Array(0.25,0.5,0.75),0.0)
.foreach(println)
  • 1
  • 2
  • 3

结果为:

Drawing 5.png

接下来则需要根据定期存款意愿将客户分组,并进一步进行分析:

//聚合函数分析
//根据定期存款意愿将客户分组,并统计各组客户的客户总数,此次访谈的电话联系的平均次数,最后一次电话联系的平均持续时间,早前访谈电话联系的平均次数
dsSubset
.groupBy("deposit")
.agg(count("age").name("Total customers"),
round(avg("campaign"),2).name("Avgcalls(curr)"),
round(avg("dur"),2).name("Avg dur"),	round(avg("prev"),2).name("AvgCalls(prev)")).withColumnRenamed("value","TDSubscribed?")
.show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

结果为:

Drawing 6.png

这段代码根据年龄将客户分组,并进一步进行分析:

//根据年龄将客户分组,并统计各组客户的客户总数,此次访谈的电话联系的平均次数,最后一次电话联系的平均持续时间,早前访谈电话联系的平均次数
dsSubset
.groupBy("age")
.agg(count("age").name("Total customers"),
round(avg("campaign"),2).name("Avgcalls(curr)"),
round(avg("dur"),2).name("Avg dur"),
round(avg("prev"),2).name("AvgCalls(prev)")).orderBy("age")
.show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

结果为:

Drawing 7.png

小结

数据分析师通过上面这个过程,就可以对数据大致的质量、分布、基本统计信息有了一个基本的印象,这样探索的目的也就达到了。在很多情况下,一些分析师在数据量不大的情况下也喜欢用 Spark 来分析数据,这得益于 DataFrame API 与 Datasets API 的简洁与功能完善。这份数据中,有趣的地方还有很多,你不妨用 Python DataFrame API 与 Spark SQL 尽情探索吧。


流处理:什么是流处理?以及你必须考虑的消息送达保证问题

本课时是流处理模块的第一课时,通过前面的模块实战,相信你对 Spark 数据处理能力已经有了一个感性的认识。在本模块,将着重介绍另一类处理场景:流处理与相应的解决办法。

本课时的主要内容有:

  • 什么是流处理;
  • 消息送达保证。

什么是流处理

在前面说到,Spark 为大数据处理提供了一整套解决方案,当然流处理也在其中。大数据的 4V 特征之一就是“Velocity(速度)”,它说明数据产生和流动的速度与以往不可同日而语,但数据所蕴含的价值却会随着时间的流逝而迅速降低,如监测预警、实时反欺诈、实时风险管理、网络攻击、计算广告等业务场景,如何快速而精确地捕获数据中所蕴含的价值是流处理面临的挑战。

流处理的概念从一诞生起,就得到了业界的热捧。从最开始雅虎 S4,到 Apache Storm、Spark Streaming,再到目前的 Structured Streaming、Flink、Apex,开源社区起到了决定性作用。纵观整个流处理的发展历程,似乎并没有一种技术像 Spark、MapReduce 那样在各自的时期独霸业界,更多的是用户根据自己的需求采用不同的技术,这或多或少都对功能和性能进行了一定取舍,这也是流处理技术非常有意思的一点,详细的内容我们将在后面进行介绍。

在开始介绍流处理相关概念以及具体技术前,先用一个例子让你对流处理有一个感性认识。还是以单词计数为例,输入的数据不再是 HDFS 上的文本文件,而来自网络套接字的数据流。

代码如下:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
 
object SparkStreamingWordCount {
  • 1
  • 2
  • 3
  • 4
  • 5

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
    .setMaster(“local[2]”)
    .setAppName(“SparkStreamingWordCount”)

    val ssc = new StreamingContext(conf, Seconds(1))

    // Spark Streming将套接字作为其输入源
    val lines = ssc.socketTextStream(“localhost”9999)
    // 按照空格切分生成单词集合
    val words = lines.flatMap(.split(" "))
    val pairs = words.map(word => (word, 1))
    // 按单词计数
    val wordCounts = pairs.reduceByKey(
 + _)
    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

如上所示,我们以 local 方式启动一个 Spark Streaming 作业,监听 9999 端口的输入,并按单词进行计数。运行此作业,会发现 Spark Streaming 已经在等待 9999 端口的输入了,如下图所示:

image.png

接下来还需要对 9999 端口进行输入,我们选用 netcat 命令(nc,在 Windows 环境需要额外安装),打开命令行执行下面的命令:

Windows: nc -l -p 9999
Linux:   nc -lk 9999
  • 1
  • 2

然后我们就可以在命令行界面下输入 hello world,这时 Spark Streaming 会实时输出:

image (1).png

这样,一个简单的流处理作业就完成了。注意,在以 local 方式提交时必须用 local[n] 的形式,且 n 需要大于 2。当 n = 1 时,Spark Streaming 的 Receiver 会消耗掉所有的计算资源,从而无法开始真正的业务处理。

下面是同样逻辑的 Python 版代码:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext("local[2]", "SparkStreamingWordCountPython")
ssc = StreamingContext(sc, 1)
lines = ssc.socketTextStream("localhost", 9999)
words = lines.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
wordCounts.pprint()
ssc.start()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

消息送达保证

在上一个例子中,数据(消息)在计算节点随着处理过程在不停流动,计算节点从它的上游收到数据,经过处理又发向下游计算节点,在这个过程中,如何保证当前节点一定能收到上游计算节点发送的处理结果是一个非常重要的问题,因为它直接影响了流处理结果的正确性。我们也称其为消息送达保证(delivery guarantee)问题,对于消息送达保证,业界一般有以下 3 种语义。

  • 至少送达一次(at least once),下游节点一定会收到一次上游节点发过来的消息,但也可能会接收到重复的消息。
  • 至多送达一次(at most once),下游节点不一定会收到上游节点发来的消息。这意味着,有可能上游节点发送的消息,下游节点丢失了,但上游节点不会重发。
  • 恰好送达一次(exact once),下游节点一定会且只会收到一次上游节点发来的消息。这当然是所有用户最希望的,但对于某些用户来说可能不是必需的。

对于这 3 种解决方案,当然流处理技术应该以恰好送达一次为目标进行设计,因为如果没有这种消息送达保证,那么对于支付场景、计算广告场景(点击计费)等来说就无法保证结果的正确性,这对于业务方来说是不可接受的。但也有很多时候,业务场景可能只需要至少送达一次的保证,那么这个时候就需要进行取舍。

在谈论消息送达保证这个话题时,其实不能孤立地看待它,我们可以将其看成两个问题:即发送消息的可靠性保障和消费消息的可靠性保障。前者指的其实是可靠发送,而后者指的是可靠接收并处理。想要达到“恰好送达一次”的效果,需要这两者同时满足。很多系统声称自己提供“恰好一次”的解决方案,但当我们仔细研究其原理时,会发现并不准确,因为它们没有解释消费者(下游计算节点,消息接收者)或者生产者(上游计算节点,消息发送者)在发送或接收失败时,还如何能保证消息“恰好一次”地传递。

在大数据架构中,经常会使用到消息队列,从某种意义上说,消息队列是一种最简单的流处理系统,先以 Kafka 为例,来理解下消息送达保证的实现。Kafka 的数据流如下图所示。

18.png

如前所述,我们将其分为两个阶段:生产者生产消息,消费者消费消息。只有这两个阶段都满足了“恰好一次”的语义,整个过程才能满足“恰好一次”的语义。我们先从生产者的角度来看,目前的 Kafka 采用异步方式发送消息,当消息被提交后,Kafka 会异步将其发送给 Broker,发送成功后会回调发送 ACK,如果没有则重试,直到收到 ACK 为止,消息会有一个主键,在Broker 端会做幂等处理,不会导致数据出现重复,这样就算在提交过程中,出现了网络故障,导致消息不能及时发送,最终还是能保证发送消息“恰好一次”的语义效果。我们现在再从消费者的角度来看,所有副本都保存有相同的日志以及偏移量(offset),假设由消费者控制偏移量在日志中的位置。当消费者读取了几条数据时,有下面两种情况:

  1. 读取消息,然后在日志中保存偏移量位置,最后处理消息。但有可能消费者保存了偏移量位置之后,在处理消息输出之前崩溃了。在这种情况下,接管处理的进程会在已保存的位置开始,即使该位置之前有几个消息尚未处理。这就达到了“至多一次”的效果,在消费者处理失败消息的情况下,不进行处理。
  2. 读取消息,处理消息,最后保存消息的位置。在这种情况下,可能消费进程处理消息之后,在保存偏移位置之前崩溃了。当新的进程重新接管时,将接收已经被处理的前几个消息。这就达到了“至少一次”的效果。

如果想实现“恰好一次”的语义,那么需要做的其实是将输出处理消费数据的结果和修改偏移量这两个操作放在一个事务里,就可以保证“恰好一次”语义,最完美的解决方案是采取经典的两段式提交,但很多消费者不支持两段式提交,并且两段式提交会影响性能表现。有一个变通的办法就是将消费者的输出与消费者的偏移量存储在一个地方,这就避免了分布式事务,当然这会引起一些不必要的麻烦,更常见的会使用幂等输出,来实现“恰好一次”的效果。

这里假设 Kafka Broker 永远是可用的,不会有丢数据的风险,考虑到日志的副本机制,这个假设是合理的。

上图其实展示了一个最简单的流处理实例,也说明了其实如果要很完美地做到“恰好一次”,需要从两个方面努力:发送消息可靠性和接收处理消息可靠性。而接收消息可靠性要做到“恰好一次”,往往又需要对消息的状态进行持久化(此处指的是偏移量)。

对于上图的数据流程,站在消费者的角度,其实是一个输入-处理-输出的过程,这里其实涉及 3 个部分:数据源(数据输入)、计算框架(处理)和数据存储(数据输出),而当我们说到 Storm、Spark Streaming 实现“恰好一次”消息送达语义时,都只是停留在计算框架这个层面,但是一旦涉及整个流程,那么问题就必须重新审视了,这也提出了一个新的问题:端到端(end-to-end)的消息送达保证,而只有解决了端到端的消息送达保证,才是真正解决了“恰好一次的消息送达语义”。

小结

在企业和组织里,流处理的场景越来越普遍。本课时通过一个小例子介绍了流处理,最后引出了一个流处理中非常重要的问题:消息一致性,端到端的一致性通常来说涉及输入-处理-输出的全过程。在后面的课时,我们可以看到 Spark 是如何解决这个问题的。

最后给你留一个思考题:

如果希望统计一段时间内(比如说 5s)每个单词的数量,上面的程序应该怎么写?


批处理还是流处理:Spark Streaming 抽象、架构与使用方法

Spark Streaming 是 Spark 0.7 推出的流处理库,代表 Spark 正式进入流处理领域,距今已有快 6 年的时间。在这段时间中,随着 Spark 不断完善,Spark Streaming 在业界已得到广泛应用,应该算是目前最主要的流处理解决方案之一。随着 Spark 2.2 的 Structured Streaming 正式推出,Spark 下一代流处理技术已经呼之欲出。但是由于目前的客观情况,Structured Streaming 的成熟度还不太高,大量的流处理应用不可能也没有必要马上迁移至 Structured Streaming,所以 Spark Streaming 在今后一段时间还将继续活跃。另外,它的架构和它的抽象也值得我们学习和深思。在本节我们将会学习 Spark Streaming,主要内容有:

  • Spark Streaming 关键抽象与架构;

  • 转换算子。

关键抽象与架构

要想深入理解 Spark Streaming,首先还是要了解 Spark Streaming 的关键抽象 DStream,DStream 意指 Discretized Stream(离散化流),它大体上来说是一个 RDD 流(序列),其元素(RDD)可以理解为从输入流生成的批。在流处理的过程中,用户其实就是在对 DStream 进行各种变换,最后再输出。如下图所示,可以看出 Spark Streaming 的输入是连续的,经过 Spark Streaming 接收后,会变成一个 RDD 序列,之后的处理逻辑是基于 DStream 来操作的。

image

DStream 的生成依据是按照时间间隔切分,该时间间隔的数据会生成一个微批(mini-batch),即一个 RDD,所以该间隔也被称为批次间隔,DStream 里面持有对所有产生的 RDD 的引用,虽然 RDD 和 DStream 非常像,在种类上基本都是一一对应的,如 UnionDStream 与 UnionRDD,但是 DStream 还是和 RDD 有本质不同,如下图所示。

image

具体表现为:

RDD = DStream @ batch T
DStream = RDD range (tn-1,tn)
  • 1
  • 2
  • 1
  • 2

RDD 是 DStream 中某个批次的数据,而 DStream 代表了一段时间所产生的 RDD。所以通过这种方式,Spark Streaming 把对连续流的处理,变成了对批序列 DStream 的处理。我们会在 StreamingContext 设置批次间隔大小,一般大于 200ms。

如果我们仔细思考的话,会发现,这种抽象将连续的流看成了某一时刻静止的批,所以在提到 Spark Streaming 中的 RDD 时,一定要注意它还有个时间维度,最准确的说法是某段时间内的 RDD。

最后,可以看到 Spark Streaming 对流的抽象本质上还是流,只是处理是基于批来处理的。这与 Structured Streaming 来说是不同的,我们在后面会讲到。

Spark Streaming 在架构上与 Spark 离线计算架构非常相似,主要分为 Driver 与 Executor,同样可以运行在 Yarn、Mesos 上,也能够以 standalone 和 local 模式运行。它们之间的关系仍旧是 Driver 负责调度,Executor 执行任务。如下图所示。

image

在 Driver 中,有几个关键模块,SparkStreamingContext 、DStreamGraph、JobScheduler、Checkpoint、ReceiverTracker,下面我将就这几个模块分别介绍。

  • SparkStreamingContext:SparkStreamingContext是一开始在用户代码中初始化完成的。它主要的工作是对作业进行一些配置,例如 DStream 切分的批次间隔(Duration),以及与其他模块进行交互,如 DStreamGraph 和 JobScheduler 等。

  • DStreamGraph:既然 Spark Streaming 最后还是对批的处理,那么批处理中根据计算逻辑生成的 RDD DAG 也是存在的,它由 DStreamGraph 生成。DStreamGraph 维护了输入 DStream 与输出 DStream 的实例,还会通过 generateJobs 方法生成一个作业集合(RDD DAG),它会由 JobScheduler 调度启动执行任务。

  • JobScheduler:JobScheduler 顾名思义是 Spark Streaming 的作业调度器,在创建 SparkStreamingContext 的同时,JobScheduler 也会作为它的一部分被创建,所有的任务都是最后由它调度 Executor 来执行。

  • Checkpoint:在 Spark 中,任何一个 RDD 丢失,都可以通过依赖关系重新计算得到, Checkpoint 是 Spark Streaming 容错机制的核心,会定时对已算好的中间结果以及其他中间状态进行存储,避免了依赖链过长的问题。这样就算某个 DStream 丢失了,也不用从头开始计算,只需从最近的依赖关系开始计算即可。

  • ReceiverTracker:ReceiverTracker 通过 Executor 上的 ReceiverSupvisor 来管理所有的 Receiver。主要功能是把需要计算的数据发送给 Executor。当 Executor 接收完毕后,也会将数据块的元数据上报给 ReceiverTracker。

从上面这几个组件的关系上来说,SparkStreamingContext 负责与其他组件交互,DStreamGraph 与 JobScheduler 负责调度,Checkpoint 负责容错,ReceiverTracker 负责与 Executor 进行数据交互。

Executor 是具体的任务执行者,其中重要的组件有 Receiver、ReceiverSupvisor、ReceiveredBlockHandler,ReceiverTracker 会和 Executor 通信,启动 ReceiverSupvisor 实例,ReceiverSupvisor 会马上启动 Receiver 开始接收数据。Receiver 接收到数据后,用 ReceiverdBlockHandler 以块的方式写到 Executor 的磁盘或者内存,对应的实现是 BlockManagerBasedBlockHandler 和 WriteAheadLogBasedBlockHandler,前者是根据 Executor 的 StorageLevel 写到相应的存储层,后者会先进行预写日志(Write Ahead Log),其中,后者能对流式数据源提供更好的容错性。数据接收完毕后,会根据调度开始计算任务。

Spark Streaming 的作业初始化与提交和 Spark SQL 作业有些不同,我们还是通过初始化 SparkSession 的方式得到 StreamingContext 的引用,再对其设置一个关键参数:批次间隔后,就可以进行数据接收和数据处理的动作。

无状态的转换算子

基于上面的抽象,对流进行处理与批处理就没什么不同了,我们只着眼于此刻正在处理的这个时间范围内的 RDD,所以数据处理方式与批处理并没有什么不同,算子也与批处理没多大区别,算子作用与数据流中的每个 RDD,这类算子我们称之为无状态算子,如下图所示:

image

我们在使用无状态算子时,仍然要注意,每次处理的结果都隐含着“这是...时间范围内数据的处理结果”的含义。

这类算子与前面介绍的转换算子没什么不同,如 map、mapPartitions、reduceByKey、reduce、flatmap、glom、filter、repartition、union 等等,这里就不重复描述了。

有状态的转换算子

在实际工作场景中,默认的时间间隔很难满足流处理的业务需要,比如想对 DStream 中的某几个 RDD 进行操作,或者是想保存一些中间结果做增量计算,就需要运用到另一类转换算子:有状态的转换算子。

有状态的转换算子主要分为两种,一种是基于时间窗口,另一种是基于整个时间跨度。本课时将对其进行介绍。

基于时间窗口的概念其实早就深植于 Spark Streaming 中,我们在设置批次间隔时间(如 1 s)时,本质上就是设置了一个时间窗口,在用户代码中的计算逻辑其实是作用在每一个在该批次间隔中形成的 RDD 上,上一个 RDD 和这一个 RDD 的计算结果不会互相影响。当我们需要对若干批次的数据处理结果进行聚合的时候,就需要设置一个更大的时间窗口,如下图所示。

image

时间窗口是由批次间隔组成的有限时间跨度,基于窗口的操作对窗口中所有数据进行处理。此外,窗口是一个逻辑的概念,它可以进行滑动,图中所示的窗口跨度为 3,滑动步长为 2,滑动意味着每隔多少时间,窗口会被触发一次,每批次数据与窗口的对应关系为一对多,意味着某个批次的数据可以存在于多个窗口中。这里注意窗口间隔与滑动步长都必须是 DStream 批次间隔的整数倍。

基于窗口的转换算子主要有 slice、window、countByWindow、reduceByWindow、reduceByKeyAndWindow、countByValueAndWindow 等。

slice 算子
  • def slice(interval: Interval): Seq[RDD[T]] 和 def slice(fromTime: Time, toTime: Time): Seq[RDD[T]]:slice 算子返回该时间跨度内的 RDD 集合,批次间隔可以用 Interval 进行定义,也可以用起始时间与结束时间来定义,注意,开始时间与结束时间需要是批次间隔的倍数,否则系统会自动进行取整。该算子相当于在整个 DStream 流中截取了一段。

window 算子
  • window(windowDuration: Duration): DStream[T] 和 window(windowDuration: Duration, slideDuration: Duration): DStream[T]:window 算子定义了窗口的属性,如跨度(windowDuration)和滑动步长(slideDuration),并返回一个新的 DStream,默认的滑动步长为批次间隔。当我们通过 window 算子定义了滑动窗口以后,可以用使用 join 算子进行连接操作,例:

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.dstream.ConstantInputDStream
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
	 
	 
object SparkStreamingJoin {
&nbsp;&nbsp; <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">main</span></span>(args: <span class="hljs-type">Array</span>[<span class="hljs-type">String</span>]): <span class="hljs-type">Unit</span> = {

&nbsp;&nbsp;&nbsp;&nbsp; <span class="hljs-keyword">val</span> spark = <span class="hljs-type">SparkSession</span>
&nbsp;&nbsp;&nbsp;&nbsp; .builder
&nbsp;&nbsp;&nbsp;&nbsp; .master(<span class="hljs-string">"local[2]"</span>)
&nbsp;&nbsp;&nbsp;&nbsp; .appName(<span class="hljs-string">"SparkStreamingJoin"</span>)
&nbsp;&nbsp;&nbsp;&nbsp; .getOrCreate()

&nbsp;&nbsp;&nbsp;&nbsp; <span class="hljs-keyword">val</span> sc = spark.sparkContext

&nbsp;&nbsp;&nbsp;&nbsp; <span class="hljs-keyword">val</span> ssc = <span class="hljs-keyword">new</span> <span class="hljs-type">StreamingContext</span>(sc, batchDuration = <span class="hljs-type">Seconds</span>(<span class="hljs-number">2</span>))

&nbsp;&nbsp;&nbsp;&nbsp; <span class="hljs-keyword">val</span> leftData = sc.parallelize(<span class="hljs-number">0</span> to <span class="hljs-number">3</span>)

&nbsp;&nbsp;&nbsp;&nbsp; <span class="hljs-keyword">val</span> leftStream = <span class="hljs-keyword">new</span> <span class="hljs-type">ConstantInputDStream</span>(ssc, leftData)
&nbsp;
&nbsp;&nbsp;&nbsp;&nbsp; <span class="hljs-keyword">val</span> rightData = sc.parallelize(<span class="hljs-number">0</span> to <span class="hljs-number">2</span>)
&nbsp;
&nbsp;&nbsp;&nbsp;&nbsp; <span class="hljs-keyword">val</span> rightStream = <span class="hljs-keyword">new</span> <span class="hljs-type">ConstantInputDStream</span>(ssc, rightData)

&nbsp;&nbsp;&nbsp;&nbsp; <span class="hljs-comment">// 连接的DStream窗口需要有相同的Duration或者其中一个DStream的Duration是另一个的整数倍</span>
&nbsp;&nbsp;&nbsp;&nbsp; <span class="hljs-keyword">val</span> rightWindow = rightStream.map(f =&gt; (f,f)).window(<span class="hljs-type">Seconds</span>(<span class="hljs-number">2</span>),<span class="hljs-type">Seconds</span>(<span class="hljs-number">4</span>))

&nbsp;&nbsp;&nbsp;&nbsp; <span class="hljs-keyword">val</span> leftWindow = leftStream.map(f =&gt; (f,f)).window(<span class="hljs-type">Seconds</span>(<span class="hljs-number">6</span>),<span class="hljs-type">Seconds</span>(<span class="hljs-number">4</span>))

&nbsp;&nbsp;&nbsp;&nbsp; leftWindow.join(rightWindow).print()

&nbsp;&nbsp;&nbsp;&nbsp; ssc.start()

&nbsp;&nbsp;&nbsp;&nbsp; ssc.awaitTermination()

&nbsp;}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

}

这里的 join 要求两个窗口的滑动步长必须一致。

window 算子很重要的用法是与无状态算子配合使用,使其结果满足需要的时间跨度限制。

reduceByWindow 算子

●     def reduceByWindow(reduceFunc: (T, T) => T,windowDuration: Duration,slideDuration: Duration): DStream[T]和def reduceByWindow(reduceFunc: (T, T) => T,invReduceFunc: (T, T) => T,windowDuration: Duration,slideDuration: Duration): DStream[T]:按照 reduceFunc 的逻辑对滑动窗口中的数据进行聚合。

后一个 reduceByWindow 是前一个的重载版本,不同之处在于增加了反函数(invReduceFunc)作为参数。反函数存在的作用在于优化那些增量计算的逻辑,如下图所示,假设 reduceFunc 的作用是对窗口内数据进行累计求和,那么在没有 invReduceFunc 的情况下,计算逻辑是 DStream 每个批次的 RDD 先按照 reduceFunc 的逻辑做一次 reduce,然后在达到窗口触发条件时再做一次同样逻辑的 reduce 操作,但是我们可以发现两个窗口互相重叠的时间区间的数据(此处为 RDD@time3)在之前的窗口已经聚合过了,是没有必要再重新计算的,而反函数版本的 reduceByWindow 则针对此处做了优化,反函数的根本作用在于求重复计算部分的值(此处为 RDD@time3)。

image

反函数的参数 (a, b) 所代表的含义为:a 为之前的时间窗口(此处为 window@time3)聚合的结果,b 为之前的时间窗口与当前时间窗口没有重叠部分的聚合结果(此处为 RDD@time1 与 RDD@time2),那么按照反函数的作用,正确反函数应该为 (a,b) => a - b,Spark Streaming 最后再将反函数的计算结果(RDD@time3)与当前时间窗口剩余的数据(此处为 RDD@time4 与 RDD@time5)进行聚合,得到当前窗口的聚合结果(此处为 window@time5)。不难发现,在时间窗口本身跨度很大,且两个时间窗口互相重叠的部分也很大时,反函数版本的 reduceByWindow在计算时性能会大大优于普通版本的 reduceByWindow。

reduceByKeyAndWindow算子
  • def reduceByKeyAndWindow(reduceFunc: (V, V) => V, windowDuration: Duration): DStream[(K, V)]、def reduceByKeyAndWindow(reduceFunc: (V, V) => V, windowDuration: Duration,slideDuration: Duration,partitioner: Partitioner): DStream[(K, V)]和def reduceByKeyAndWindow(reduceFunc: (V, V) => V, invReduceFunc: (V, V) => V,windowDuration: Duration,slideDuration: Duration = self.slideDuration,numPartitions: Int = ssc.sc.defaultParallelism,filterFunc: ((K, V)) => Boolean = null): DStream[(K, V)]:通过 reduceFunc 的化简逻辑,reduceByKey 的算子会根据 K 对窗口的数据进行分组聚合,返回化简结果。同时,还可以指定 reduce 任务的个数与 Shuffle 的逻辑。另外 reduceByBeyAndWindow 也有反函数的版本。

countByWindow 算子
  • def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long]:countByWindow 算子计算滑动窗口中数据的数量。

下面我们来看看该算子的实现:

	def countByWindow(windowDuration: Duration,slideDuration: Duration): DStream[Long] = ssc.withScope {
	 this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)
	}
  • 1
  • 2
  • 3

可以看到 countByWindow 先将每行数据转换成 1,最后再用 reduceByWindow 进行累计求和,它默认就采取了反函数的版本,这也是官方推荐的。

countByValueAndWindow 算子
  • def countByValueAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int = ssc.sc.defaultParallelism)(implicit ord: Ordering[T] = null): DStream[(T, Long)]:对每个滑动窗口的数据执行 countByValue 的操作。底层实现也是调用了 reduceByKeyAndWindow 的反函数版本。

介绍了基于窗口的转换算子,我们发现基于窗口的转换操作还是有其局限性,当我们想要对某个键的状态进行整个时间段追踪时,基于窗口就不是那么方便了。

所以我们还需要另外一种有状态的转换操作:mapWithState 与 updateStateByKey,mapWithState 是 Spark 1.6 以后的新特性,官方宣称性能是 updateStateByKey 的十倍,可以认为是 updateStateByKey 的升级版。这两种算子类似于定义一个全局累加器,每个批次的数据处理结果都会将其更新,这样就能得到整个时间段下该 key 的状态值(中间结果)。以 wordcount 为例:

import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
import org.apache.spark.sql.SparkSession
	 
object StatefulNetworkWordCount {
&nbsp; <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">main</span></span>(args: <span class="hljs-type">Array</span>[<span class="hljs-type">String</span>]) {

&nbsp;&nbsp;&nbsp; <span class="hljs-keyword">val</span> spark = <span class="hljs-type">SparkSession</span>
&nbsp;&nbsp;&nbsp;&nbsp; .builder
&nbsp;&nbsp;&nbsp;&nbsp; .master(<span class="hljs-string">"local[2]"</span>)
&nbsp;&nbsp;&nbsp;&nbsp; .appName(<span class="hljs-string">"StatefulNetworkWordCount"</span>)
&nbsp;&nbsp;&nbsp;&nbsp; .getOrCreate()

&nbsp; &nbsp;&nbsp;<span class="hljs-keyword">val</span> sc = spark.sparkContext

&nbsp;&nbsp;&nbsp; <span class="hljs-keyword">val</span> ssc = <span class="hljs-keyword">new</span> <span class="hljs-type">StreamingContext</span>(sc, <span class="hljs-type">Seconds</span>(<span class="hljs-number">1</span>))
&nbsp;&nbsp;&nbsp; ssc.checkpoint(<span class="hljs-string">"."</span>)
&nbsp;
&nbsp;&nbsp;&nbsp; <span class="hljs-keyword">val</span> initialRDD = ssc.sparkContext.parallelize(<span class="hljs-type">List</span>((<span class="hljs-string">"hello"</span>, <span class="hljs-number">1</span>), (<span class="hljs-string">"world"</span>, <span class="hljs-number">1</span>)))
&nbsp;
&nbsp;&nbsp;&nbsp; <span class="hljs-keyword">val</span> lines = ssc.socketTextStream(args(<span class="hljs-number">0</span>), args(<span class="hljs-number">1</span>).toInt)
&nbsp;&nbsp;&nbsp; <span class="hljs-keyword">val</span> words = lines.flatMap(_.split(<span class="hljs-string">" "</span>))
&nbsp;&nbsp;&nbsp; <span class="hljs-keyword">val</span> wordDstream = words.map(x =&gt; (x, <span class="hljs-number">1</span>))
&nbsp;
&nbsp;&nbsp;&nbsp; <span class="hljs-comment">// 该函数定义了状态更新的逻辑</span>
&nbsp;&nbsp;&nbsp; <span class="hljs-keyword">val</span> mappingFunc = (word: <span class="hljs-type">String</span>, one: <span class="hljs-type">Option</span>[<span class="hljs-type">Int</span>], state: <span class="hljs-type">State</span>[<span class="hljs-type">Int</span>]) =&gt; {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <span class="hljs-keyword">val</span> sum = one.getOrElse(<span class="hljs-number">0</span>) + state.getOption.getOrElse(<span class="hljs-number">0</span>)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <span class="hljs-keyword">val</span> output = (word, sum)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; state.update(sum)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; output
&nbsp;&nbsp;&nbsp; }
&nbsp;
&nbsp;&nbsp;&nbsp; <span class="hljs-keyword">val</span> stateDstream = wordDstream.mapWithState(
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <span class="hljs-type">StateSpec</span>.function(mappingFunc).initialState(initialRDD))

&nbsp;&nbsp;&nbsp; stateDstream.print()

&nbsp;&nbsp;&nbsp; ssc.start()
&nbsp;&nbsp;&nbsp; ssc.awaitTermination()

&nbsp; }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

}

例子中的核心就是 mappingFunc 的函数,定义了状态更新的逻辑。updateStateByKey 的用法大同小异,也是通过定义状态更新函数来体现状态的变化。用法如下:

	……
	val updateFunc = (values: Seq[Int], state: Option[Int]) => {
	val currentCount = values.sum
	val previousCount = state.getOrElse(0)
	Some(currentCount + previousCount)
	}
	 
	val newUpdateFunc = (iterator: Iterator[(StringSeq[Int], Option[Int])]) => {
	iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s)))
	}
	 
	val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc,  new HashPartitioner(ssc.sparkContext.defaultParallelism), true, initialRDD)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

这两种实现同样功能的算子在性能上差异巨大的原因在于,updateStateByKey 的原理是将上次计算结果与新批次数据采用 cogroup 操作再进行聚合,而 mapWithState 则是通过维护一个中间状态表,存储上一次计算的结果与当前批次的计算结果,所以直接进行聚合处理即可。

小结

本课时介绍了 Spark Streaming 的关键抽象与架构,Spark Streaming 沿用了 RDD 原有的抽象,将流处理变成了连续的微批处理。如果把原有的数据处理看成是一维的,那么流处理无疑是二维的:它加入了一个很重要的时间维度,并且处理的需求往往与时间维度紧密相关,这就使得虽然 Spark Streaming 仍然使用了 RDD 的抽象,但转换算子分为了有状态和无状态之分。

最后给你留一个思考题:

如何每 2 分钟统计一次最近 5 分钟出现过的每个单词的数量?


如何在生产环境中使用 Spark Streaming

在上一课时中,我们学习了 Spark Streaming 的抽象、架构以及数据处理方式,但是流处理的输入是动态的数据源,假设在出错是常态的情况下,如何在动态的数据流中仍然兼顾恰好一次的消息送达保证(结果正确性),是生产环境中必须考虑的问题。

本课时的主要内容有:

  • 输入和输出

  • 容错与结果正确性

输入和输出

Spark Streaming 作为一个流处理系统,对接了很多输入源,除了一些实验性质的输入源,如ConstantInputDStream(每批次都为常数集合)、socketTextStream(监听套接字作为输入)、textFileStream(本地文件作为输入,常常用来监控文件夹),在生产环境中用得最多的还是 Kafka 这类消息队列。在本课时中,我们选用 Kafka 0.8 版本,介绍 Spark Streaming 与 Kafka 集成,这是在生产环境中最常见的一种情况。

在 Spark 中,为连接 Kafka 提供了两种方式,即基于 Receiver 的方式和 Kafka Direct API。这两种方式在使用上大同小异,但原理却截然不同,先来看看基于 Receiver 的方式:

...
val kafkaParams = Map[StringObject](
  "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "groupId",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (true: java.lang.Boolean)
)
 
val topics = Array("topicA""topicB")
val messages = KafkaUtils.createDirectStream[StringString](
  ssc,
  PreferConsistent,
  Subscribe[StringString](topics, kafkaParams)
)
 
messages.map(record => (record.key, record.value))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

用这种方式来与 Kafka 集成,配置中设置了 enable.auto.commit 为 true,表明自己不需要维护 offset,而是由 Kafka 自己来维护(在 Kafka 0.10 后,默认的 offset 存储位置改为了 Kafka,实际上就是 Kafka 的一个 topic),Kafka 消费者会周期性地(默认为 5s)去修改偏移量。这种方式接收的数据都保存在 Receiver 中,一旦出现意外,数据就有可能丢失,要想避免丢失的情况,就必须采用 WAL(Write Ahead Log,预写日志)机制,在数据写入内存前先进行持久化。

现在我们来试想一种情况,数据从 Kafka 取出后,进行了 WAL,在这个时候,Driver 与 Executor 因为某种原因宕机,这时最新偏移量还没来得及提交,那么在 Driver 恢复后,会从记录的偏移量继续消费数据并处理 WAL 的数据,这样一来,被 WAL 持久化的数据就会被重复计算一次。因此,开启了 WAL 后,这样的容错机制最多只能实现“至少一次”的消息送达语义。而且开启 WAL 后,增加了 I/O 开销,降低了 Spark Streaming 的吞吐量,还会产生冗余存储。这个过程如下图所示。

11.png

如果业务场景对“恰好一次”的消息送达语义有着强烈的需求,那么基于 Receiver 的方式是无法满足的,基于 Spark Streaming 提供的 Direct API 形式,克服了这一缺点。Direct API 是 Spark 1.3 后增加的新特性,相比基于 Receiver 方法的“间接”,这种方式更加“直接”。

在这种方式中,消费者会定期轮询 Kafka,得到在每个 topic 中每个分区的最新偏移量,根据这个偏移量来确定每个批次的范围,这个信息会记录在 Checkpoint 中,当作业启动时,会根据这个范围用消费者 API 直接获取数据。这样的话,就相当于把 Kafka 变成了一个文件系统,而 offset 的范围就是文件地址,Direct API 用这种方式将流式数据源变成了静态数据源,再利用 Spark 本身的 DAG 容错机制,使所有计算失败的数据均可溯源,从而实现了“恰好一次”的消息送达语义。**请注意,Direct API 不需要采用WAL预写日志机制,因为所有数据都相当于在 Kafka 中被持久化了,作业恢复后直接从 Kafka 读取即可,**如下图所示:

12.png

这种方式带来的优点显而易见,不仅克服了 WAL 带来的效率缺陷,还简化了并行性,使用 Direct API,Spark Streaming 会为每个 Kafka 的分区创建对应的 RDD 分区,这样就不需要使用 ssc.union() 方法来进行合并了,这也便于理解和调优。另外,这样的架构还保证了输入-处理阶段的“恰好一次”的消息送达语义,这就类似于消息的“回放”,虽然目前 Kafka 本身不支持消息回放,但用这种方式间接地实现了消息回放的功能。下面我们来看一个使用 Direct API 的完整例子:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka010.LocationStrategies
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.ConsumerStrategies
import org.apache.spark.streaming.kafka010.HasOffsetRanges
import org.apache.spark.streaming.kafka010.CanCommitOffsets
import org.apache.spark.sql.SparkSession
 
object SparkStreamingKafkaDirexct {
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

  def main(args: Array[String]) {

    val spark = SparkSession
    .builder
    .master(“local[2]”)
    .appName(“SparkStreamingKafkaDirexct”)
    .getOrCreate()

    val sc = spark.sparkContext

    val ssc = new StreamingContext(sc, batchDuration = Seconds(2))

    // Kafka的topic
    val topics = args(2)

    val topicsSet: Set[String] = topics.split(“,”).toSet

    // Kafka配置参数
    val kafkaParams: Map[StringObject] = Map[StringString](
      “metadata.broker.list” -> “kafka01:9092,kafka02:9092,kafka03:9092”,
      “group.id” -> “apple_sample”,
      “serializer.class” -> “kafka.serializer.StringEncoder”
      // 自动将偏移重置为最新的偏移,如果是第一次启动程序,应该为smallest,从头开始读
      “auto.offset.reset” -> “latest”
    ) 

    // 用Kafka Direct API直接读数据
    val messages = KafkaUtils.createDirectStream[StringString](
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.SubscribeStringString
     )

    // 在该批次数据处理完之后,将该offset提交给Kafka,“…”代表用户自己定义的处理逻辑
    messages.map(…).foreachRDD(mess => { 
       // 获取offset集合
       val offsetsList = mess.asInstanceOf[HasOffsetRanges].offsetRanges
         asInstanceOf[CanCommitOffsets].commitAsync(offsetsList)
       }
    )

    ssc.start()
    ssc.awaitTermination()

  }

}

从上面这段代码中我们可以发现,首先关闭了自动提交偏移量,改由手动维护。然后再从最新的偏移量开始生成 RDD,经过各种转换算子处理后输出结果,最后用 commitAsync 异步向 Kafka 提交最新的偏移量。一旦使用了 Direct API,用户需要追踪到结果数据输出完成后,再提交偏移量的改动,否则会造成不确定的影响。使用这种方式,无法在事务层面保证处理-输出这个阶段做到“恰好一次”,因此只能采用输出幂等的方式来达到同样的效果。

如果想要在事务的层面,让处理-输出这个阶段做到“恰好一次”,那么可以将 Kafka 的偏移量与最终结果存储在同一个数据库实例上,这就需要修改代码,一开始,需要从外部数据库上获取最新的偏移量:

...
//从外部数据库获取偏移量
val fromOffsets: Map[TopicPartitionLong] = setFromOffsets(offsetList)
...
// 用最新的offset得到初始化RDD
val messages = KafkaUtils.createDirectStream[StringString](ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[StringString](topicsSet, kafkaParams, fromOffsets))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

在最后输出的操作里,由于偏移量与最终数据处理结果要保存到同一个数据库,因此可以利用外部数据库的事务特性,完成最后的工作:

messages.map(…).foreachRDD(mess => {
   // 获取offset集合
   val offsetsList = mess.asInstanceOf[HasOffsetRanges].offsetRanges
   // 将修改offset与输出最后结果作为一个事务提交
   //    transaction{
   //       yourUpdateTheOffset(offsetsList)
   //       yourOutputToDatabase(mess)
   //    }
}
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

这样一来,Spark Streaming 才算是真正实现了端到端的消息送达保证。

在实际开发中,将偏移量和输出结果存储到同一个外部数据库的方式用得并不多,因为这会使业务数据与消息数据耦合在一起,结构不够优雅,反而幂等输出更加流行。

最后,来看看 Spark Streaming 的输出操作。Spark Streaming 也是懒加载模式,同样需要类似于 RDD 的行动算子才能真正开始运行,在 Spark Streaming 中,我们称其为输出算子,一共有下面这几种。

  • print():打印 DStream 中每个批次的前十个元素。

  • saveAsTextFiles(prefix, [suffix]):将 DStream 中的内容保存为文本文件。

  • saveAsObjectFiles(prefix, [suffix]):将 DStream 中的内容保存为 Java 序列化对象的 SequenceFile。

  • saveAsHadoopFiles(prefix, [suffix]):将 DStream 中的内容保存为 Hadoop 序列化格式(Writable)的文件,可以指定 K、V 类型。

  • foreachRDD(func):该算子是 Spark Streaming 独有的,与 transform 算子类似,都是直接可以操作 RDD,我们可以利用该算子来做一些处理工作,例如生成 Parquet 文件写入 HDFS、将数据插入到外部数据库中,如 HBase、Elasticsearch。

容错与结果正确性

介绍了 Spark Streaming 的架构、用法之后,在本课时中,将会讨论 Spark Streaming 的容错机制,以及结果的正确性保证。要想 Spark Streaming 应用能够全天候无间断地运行,需要利用 Spark 自带的 Checkpoint 容错机制。Checkpoint 会在 Spark Streaming 运行过程中,周期性地保存一些作业相关信息,这样才能让 Spark Streaming 作业从故障(例如系统故障、JVM 崩溃等)中恢复。值得注意的是,作为 Checkpoint 的存储系统,是必须保证高可用的,常见的如 HDFS 就很可靠,更优的选择则是 Alluxio。

Checkpoint 主要保存了以下两类信息,其中元数据检查点主要用来恢复 Driver 程序,数据检查点主要用来恢复 Executor 程序。下面我们来分别介绍一下这两类信息:

  1. 元数据检查点。元数据主要包括。

  • 配置:创建该 Spark Streaming 应用的配置。

  • DStream 算子:Spark Streaming 作业中定义的算子。

  • 未完成的批次:那些还在作业队列里未完成的批次。

Checkpoint 会周期性地将这些信息保存至外部可靠存储(如 HDFS、Alluxio)。

  1. 数据检查点。

将中间生成的 RDD 保存到可靠的外部存储中。我们在上一节中讨论过,如果要使用状态管理的算子,如 updateStateByKey、mapWithState 等,就必须开启 Checkpoint 机制,因为这类算子必须保存中间结果,以供下次计算使用。另外,我们知道 Spark 本身的容错机制是依靠 RDD DAG 的依赖关系通过计算恢复的,但是这也会造成依赖链过长、恢复时间过长的问题,因此我们必须周期性地存储中间结果(状态)至可靠的外部存储来缩短依赖链。我们也可以手动调用 DStream 的 checkpoint 算子进行缓存。如下:

val ds: DStream[Int] = ...
val cds: DStream[Int] = ds.checkpoint(Seconds(5))
  • 1
  • 2

Checkpoint 机制会按照设置的间隔对 DStream 进行持久化。如果需要启用 Checkpoint 机制,需要对代码做如下改动:

val checkpointDirectory = "/your_cp_path" 
 
def functionToCreateContext(): StreamingContext = {
      val conf = new SparkConf().setMaster("local[*]").setAppName("Checkpoint")
      val ssc = new StreamingContext(conf,Seconds(1))
      val lines = ssc.socketTextStream("localhost",9999) 
      ssc.checkpoint(checkpointDirectory)
      ssc
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

val context = StreamingContext.getOrCreate(checkpointDirectory, 
functionToCreateContext _)

StreamingContext 需要以 getOrCreate 的方式初始化。这样就能保证,如果从故障中恢复,会获取到上一个检查点的信息。

如果数据源是文件,那么上面的方法可以保证完全不丢数据,因为所有的状态都可以根据持久化的数据源复现出来,但如果是流式数据源,想要保证不丢数据是很困难的。因为当 Driver 出故障的时候,有可能接收的数据会丢失,并且不能找回。为了解决这个问题,Spark 1.2 之后引入了预写日志(WAL),Spark Streaming WAL 指的是接收到数据后,在数据处理之前,先对数据进行持久化,完成这个工作的是 BlockManagerBasedBlockHandler 类的实现类WriteAheadLog BasedBlockHandler 。如果开启了 WAL,那么数据会先进行持久化再写到 Executor 的内存中。这样即使内存数据丢失了,在 Driver 恢复后,丢失的数据还是会被处理,这就实现了“至少一次”的消息送达语义。打开 WAL 的方式为:设置spark.streaming.receiver.writeAheadLog.enable 为 true。这种方法其实是对 Receiver 的容错。

那么 Checkpoint 就以这种形式完成了 Driver、Executor 和 Receiver 的容错。下面我们来讨论一下 Spark Streaming 计算结果的正确性。

先来看看消息送达保证,Spark Streaming 框架本身实现“恰好一次”的消息送达语义比较容易,因为 Spark Streaming 本质上还是进行的批处理,所以它只需在批的层面通过 BatchId 追踪数据处理情况,这和 Spark 是完全一致的,因此它完全能够保证一个批只被处理一次,当一个批没有被成功处理时,肯定就是发生了故障,这时 Checkpoint 机制能够保证从最近持久化的中间结果与待执行的计算任务(DStreamGraph)开始重新计算,保证数据只被处理一次,从而得到正确的结果,该阶段可以认为是处理阶段的消息送达保证

在流处理场景下,容错问题与结果正确性问题不能孤立地来看待,而是需要考虑在出现故障的情况下如何能够保证结果的正确性。

小结

在本课时中,将流式处理流程抽象为输入-处理-输出,而基于这个流程,又将流程拆分为三个部分:

  • 输入-处理

  • 处理

  • 处理-输出

而这每个部分,都需要假设错误会经常发生的情况下,还要保证“恰好一次”的消息送达保证,才是真正的端到端的消息送达保证,这也是生产环境中必须考虑的问题。 本课时从三个部分出发,给出了 Spark Streaming 的答案,其中处理-输出这个过程,通常会以幂等的方式解决,这也是在生产环境中非常常用的做法。

最后给你留一个思考题:

用偏移量与最终数据处理结果保存到同一个数据库,这么做的缺点是什么?


本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/一键难忘520/article/detail/827660
推荐阅读
相关标签
  

闽ICP备14008679号