赞
踩
解决方案
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vzgMEIVN-1658840308113)(C:/Users/18446/AppData/Roaming/Typora/typora-user-images/image-20220726183305228.png)]
使用时机
spark streaming的两种数据源
反压原理
Spark Streaming的反压机制中的几个重要组件
RateController
override def onBatchCompleted(batchCompleted:
StreamingListenerBatchCompleted) {
val elements = batchCompleted.batchInfo.streamIdToInputInfo
for {
// 处理结束时间
processingEnd <-
batchCompleted.batchInfo.processingEndTime
// 处理时间,即`processingEndTime` - `processingStartTime`
workDelay <- batchCompleted.batchInfo.processingDelay
// 在调度队列中的等待时间,即`processingStartTime` -
`submissionTime`
waitDelay <- batchCompleted.batchInfo.schedulingDelay
// 当前批次处理的记录数
elems <- elements.get(streamUID).map(_.numRecords)
} computeAndPublish(processingEnd, elems, workDelay,
waitDelay)
}
RateEstimator
private def computeAndPublish(time: Long, elems: Long,
workDelay: Long, waitDelay: Long): Unit = Future[Unit] {
// 根据处理时间、调度时间、当前Batch记录数,预估新速率
val newRate = rateEstimator.compute(time, elems, workDelay,
waitDelay)
newRate.foreach { s =>
// 设置新速率
rateLimit.set(s.toLong)
// 发布新速率
publish(getLatestRate())
}
}
RateLimiter
令牌桶机制
数据流进入下一个阶段之前需要消耗令牌桶中的令牌,如果令牌桶中没有令牌,数据就暂时不能进入下一个阶段,只有令牌桶中有令牌时才可以进入下一个阶段
反压相关参数设置
参数名称 | 默认值 | 说明 |
---|---|---|
spark.streaming.backpressure.enabled | false | 是否启用反压机制 |
spark.streaming.backpressure.initialRate | 无 | 初始最大接收速率。只适用于Receiver Stream,不适用于Direct Stream。 |
spark.streaming.backpressure.rateEstimator | pid | 速率控制器,Spark 默认只支持此控制器,可自定义。 |
spark.streaming.backpressure.pid.proportional | 1.0 | 只能为非负值。当前速率与最后一批速率之间的差值对总控制信号贡献的权重。用默认值即可。 |
spark.streaming.backpressure.pid.integral | 0.2 | 只能为非负值。比例误差累积对总控制信号贡献的权重。用默认值即可 |
spark.streaming.backpressure.pid.derived | 0 | 只能为非负值。比例误差变化对总控制信号贡献的权重。用默认值即可 |
spark.streaming.backpressure.pid.minRate | 100 | 只能为正数,最小速率 |
//启用反压
conf.set("spark.streaming.backpressure.enabled","true")
//最小摄入条数控制
conf.set("spark.streaming.backpressure.pid.minRate","1")
//最大摄入条数控制
conf.set("spark.streaming.kafka.maxRatePerPartition","12")
注意
反压机制真正起作用时需要至少处理一个批:
如何保证反压机制真正起作用前应用不会崩溃:
要保证反压机制真正起作用前应用不会崩溃,需要控制每个批次最大摄入速率。
若为Direct Stream,如Kafka Direct Stream,则可以通过spark.streaming.kafka.maxRatePerPartition参数来控制。
此参数代表了 每秒每个分区最大摄入的数据条数。
该参数也代表了整个应用生命周期中的最大速率,即使是背压调整的最大值也不会超过
该参数。
Spark 容错分为:Driver级别的容错和Executor级别的容错
如果在 Spark 作业中允许丢弃某些数据, 那么可以考虑将可能导致数据倾斜的 key 进行过滤,滤除可能导致数据倾斜的 key 对应的数据,这样,在 Spark 作业中就不会发生数据倾斜了。
理解
当使用了类似于 groupByKey、reduceByKey 这样的算子时,可以考虑使用随机
key 实现双重聚合,如图所示:
实现步骤
具体实现步骤
这个方案的核心实现思路就是进行两阶段聚合。
适用场景
理解 reduce join
对于一个较小的RDD的处理思路
实现思路
实现步骤
适用场景
不适合场景分析
对于RDD中大量的key导致的数据倾斜用扩容解决
解决思路
实现步骤
局限性
使用方案七对方案六进一步优化分析
当 RDD 中有几个 key 导致数据倾斜时,方案六不再适用,而方案七又非常消耗资源,此时可以引入方案七的思想完善方案六
对包含少数几个数据量过大的 key 的那个 RDD,通过 sample 算子采样出一份样本来,然后统计一下每个 key 的数量,计算出来数据量最大的是哪几个 key。
然后将这几个 key 对应的数据从原来的 RDD 中拆分出来, 形成一个单独的 RDD,并给每个 key 都打上 n 以内的随机数作为前缀,而不会导致倾斜的大部分 key形成另外一个 RDD。
接着将需要 join 的另一个 RDD, 也过滤出来那几个倾斜 key 对应的数据并形成一个单独的 RDD,将每条数据膨胀成 n 条数据,这 n 条数据都按顺序附加一个 0~n 的前缀, 不会导致倾斜的大部分 key 也形成另外一个 RDD。
再将附加了随机前缀的独立 RDD 与另一个膨胀 n 倍的独立 RDD 进行 join,此时就可以将原先相同的 key 打散成 n 份, 分散到多个 task 中去进行 join 了。
而另外两个普通的 RDD 就照常 join 即可。
最后将两次 join 的结果使用 union 算子合并起来即可,就是最终的 join 结果
Kylin介绍
Kylin是一个框架的集合
Apache Kylin 的主要特点包括支持 SQL 接口、支持超大数据集、亚秒级响应、可伸缩性、高吞吐率、
BI 工具集成等
因为使用了 Cube 预计算技术,在理论上, Kylin 可以支撑的数据集大小没有上限,仅受限于存储系
统和分布式计算系统的承载能力,并且查询速度不会随数据集的增大而减慢。 Kylin 在数据集规模上的
局限性主要在于维度的个数和基数。它们一般由数据模型来决定,不会随着数据规模的增长而线性增
长,这也意味着 Kylin 对未来数据的增长有着更强的适应能力、
预计算
Apache Kylin 拥有优异的查询响应速度,这点得益于预计算,很多复杂的计算,比如连接、聚合,在
离线的预计算过程中就已经完成,这大大降低了查询时刻所需要的计算量,提高了响应速度。
Apache Kylin 提供了丰富的 API ,以与现有的 BI 工具集成,具体包括如下内容。
ODBC 接口:与 Tableau 、 Excel 、 Power BI 等工具集成。
JDBC 接口:与 Saiku 、 BIRT 等 Java 工具集成。
Rest API :与 JavaScript 、 Web 网页集成。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。