当前位置:   article > 正文

Spark3.x-实战之双流join(窗口和redis实现方式和模板代码)_spark range join

spark range join

简介

      在spark实时项目的时候难免会遇到双流join的情况,这里我们根据它的一些实现和代码做简单的说明

join的前提知识

  1. object Test {
  2. def main(args: Array[String]): Unit = {
  3. val conf = new SparkConf().setAppName("join").setMaster("local[*]")
  4. val sc = new SparkContext(conf)
  5. val rdd1 = sc.makeRDD(List((1, "a"), (2, "b"), (3, "c")))
  6. val rdd2 = sc.makeRDD(List((1, "a1"), (1, "a2"), (4, "d1")))
  7. println("join: " + rdd1.join(rdd2).collect().toList)
  8. println("leftOuterJoin: " + rdd1.leftOuterJoin(rdd2).collect().toList)
  9. println("rightOuterJoin: " + rdd1.rightOuterJoin(rdd2).collect().toList)
  10. println("fullOuterJoin: " + rdd1.fullOuterJoin(rdd2).collect().toList)
  11. }
  12. }

结果 

  1. join: List((1,(a,a1)), (1,(a,a2)))
  2. leftOuterJoin: List((1,(a,Some(a1))), (1,(a,Some(a2))), (2,(b,None)), (3,(c,None)))
  3. rightOuterJoin: List((1,(Some(a),a1)), (1,(Some(a),a2)), (4,(None,d1)))
  4. fullOuterJoin: List((1,(Some(a),Some(a1))), (1,(Some(a),Some(a2))), (2,(Some(b),None)), (3,(Some(c),None)), (4,(None,Some(d1))))

方式一(窗口实现)

简介

 

  • 这里的思想主要是滑动窗口的原理,但是就是如果数据不在同一批次,那么可以把划窗尽量的调大一点,尽量让他们在同一批次,那么就可以进行join操作了,但是这种方法会有数据丢失的情况

具体代码例子

核心代码

  1. //开窗 指定窗口大小和滑动步长
  2. val orderInfoWindowDstream: DStream[OrderInfo] =
  3. orderInfoDstream.window(Seconds(50), Seconds(5))
  4. val orderDetailWindowDstream: DStream[OrderDetail] =
  5. orderDetailDstream.window(Seconds(50), Seconds(5))
  6. // join
  7. val orderInfoWithKeyDstream: DStream[(Long, OrderInfo)] =
  8. orderInfoWindowDstream.map(
  9. orderInfo=>{
  10. (orderInfo.id,orderInfo)
  11. }
  12. )
  13. val orderDetailWithKeyDstream: DStream[(Long, OrderDetail)] =
  14. orderDetailWindowDstream.map(
  15. orderDetail=>{
  16. (orderDetail.order_id,orderDetail)
  17. }
  18. )
  19. val joinedDstream: DStream[(Long, (OrderInfo, OrderDetail))] =
  20. orderInfoWithKeyDstream.join(orderDetailWithKeyDstream,4)

方式二(redis缓存的方式实现)

简介

这种方式就是数据来的时候有三种情况,由于不想消息丢失使用的join是fulljoin

  • 订单表到了,订单明细也到了
  • 订单表到了,订单明细表没有到
  • 订单表没有到,订单明细表到了的情况

如果是订单表到了,订单明细也到了,那么这个时候就是join成功,但是为了处理后面还有没有到的数据,那么想处理的话,就把订单表缓存到redis里面

如果订单明细表先到,就保存到redis里面等待订单表的到来,如果订单表到了,就join然后在redis里面删除自己

模板代码示例

前提实现kafka精准一次消费

redis里面保存的数据例子

 代码实现

  1. object DwsStuScoreJoin {
  2. def main(args: Array[String]): Unit = {
  3. val conf = new SparkConf()
  4. //这里的partition的数目要和kafka的分区数一致
  5. conf.setAppName(this.getClass.getSimpleName).setMaster("local[4]")
  6. val ssc = new StreamingContext(conf, Seconds(1))
  7. // 分别读取两条流
  8. val stuCourseTopicName = "dwd_tb_stu_course"
  9. val stuCourseGroupName = "dwd_tb_stu_course"
  10. val stuTopicName = "dwd_tb_stu"
  11. val stuGroupName = "dwd_tb_stu"
  12. //得到dwd_tb_stu_course的DStream
  13. val stuCourseOffset: mutable.Map[TopicPartition, Long] = OffsetManagerUtil.getOfferSet(stuCourseTopicName, stuCourseGroupName)
  14. var stuCourseKafkaInputDStream: InputDStream[ConsumerRecord[String, String]] = null
  15. if (stuCourseOffset != null && stuCourseOffset.size > 0) {
  16. stuCourseKafkaInputDStream = MyKafkaUtilDwsStuScore.getInputDStreamByMapTopicPartition(stuCourseTopicName, stuCourseOffset, ssc)
  17. } else {
  18. stuCourseKafkaInputDStream = MyKafkaUtilDwsStuScore.getInputDStreamByDefault(stuCourseTopicName, ssc)
  19. }
  20. var stuCourseOffsetRanges: Array[OffsetRange] = Array.empty[OffsetRange]
  21. val stuCourseTransformDStream: DStream[ConsumerRecord[String, String]] = stuCourseKafkaInputDStream.transform(
  22. rdd => {
  23. val ranges: HasOffsetRanges = rdd.asInstanceOf[HasOffsetRanges]
  24. stuCourseOffsetRanges = ranges.offsetRanges
  25. rdd
  26. }
  27. )
  28. val stuCourseKafkaValue: DStream[String] = stuCourseTransformDStream.map(_.value())
  29. //得到dwd_tb_stu的DStream
  30. val stuOffset: mutable.Map[TopicPartition, Long] = OffsetManagerUtil.getOfferSet(stuTopicName, stuGroupName)
  31. var stuKafkaInputDStream: InputDStream[ConsumerRecord[String, String]] = null
  32. if (stuOffset != null && stuOffset.size > 0) {
  33. stuKafkaInputDStream = MyKafkaUtilDwsStuScore.getInputDStreamByMapTopicPartition(stuTopicName, stuOffset, ssc)
  34. } else {
  35. stuKafkaInputDStream = MyKafkaUtilDwsStuScore.getInputDStreamByDefault(stuTopicName, ssc)
  36. }
  37. var stuOffsetRanges: Array[OffsetRange] = Array.empty[OffsetRange]
  38. val stuTransformDStream: DStream[ConsumerRecord[String, String]] = stuKafkaInputDStream.transform(
  39. rdd => {
  40. val ranges: HasOffsetRanges = rdd.asInstanceOf[HasOffsetRanges]
  41. stuOffsetRanges = ranges.offsetRanges
  42. rdd
  43. }
  44. )
  45. val stuKafkaValue: DStream[String] = stuTransformDStream.map(_.value())
  46. // stuCourseKafkaValue.foreachRDD(rdd => {
  47. // rdd.foreach(item => {
  48. // println(item)
  49. {"courseId":"1","courseName":"数学","id":"069428","score":"120","stuId":"6be3f5","tearchName":"小李老师"}
  50. // })
  51. // // 如果都操作完了这里就是保存偏移量
  52. // OffsetManagerUtil.saveOffset(stuCourseTopicName, stuCourseGroupName, stuCourseOffsetRanges)
  53. // // 手动提交kafka的偏移量
  54. // stuCourseKafkaInputDStream.asInstanceOf[CanCommitOffsets].commitAsync(stuCourseOffsetRanges)
  55. //
  56. // })
  57. //
  58. // stuKafkaValue.foreachRDD(rdd => {
  59. // rdd.foreach(item => {
  60. // println(item)
  61. {"aearName":"华东地区","areaCode":"0001","id":"6be3f5","stuName":"小同学"}
  62. // })
  63. //
  64. // // 如果都操作完了这里就是保存偏移量
  65. // OffsetManagerUtil.saveOffset(stuTopicName, stuGroupName, stuOffsetRanges)
  66. // // 手动提交kafka的偏移量
  67. // stuKafkaInputDStream.asInstanceOf[CanCommitOffsets].commitAsync(stuOffsetRanges)
  68. // })
  69. // 上面能够得到两条流的数据
  70. //第一条流是stuCourseKafkaValue:{"courseId":"1","courseName":"数学","id":"069428","score":"120","stuId":"6be3f5","tearchName":"小李老师"}
  71. //第二条是stuKafkaValue{"aearName":"华东地区","areaCode":"0001","id":"6be3f5","stuName":"小同学"}
  72. // 下面是使用双流join的操作
  73. // 这里有三种情况使用redis缓存join,假设学生一次可以输入多条成绩信息的情况
  74. // 1. 如果stu到了,stuCourse也到了
  75. // 2. 如果stu到了,stuCourse没有到
  76. // 3. 如果stu没到 ,stuCourse 到了
  77. // 注意由于用学生关联成绩那么不管stuCourse到了,还是没有到都会在redis缓存防止有晚到的数据
  78. // 如果stuCourse缓存在redis里面,如果stu到了那么就把他删除
  79. // 1.先把数据变成key,value结构才能join
  80. val stuIdAndStu: DStream[(String, TbStu)] = stuKafkaValue.map(item => {
  81. val stu: TbStu = JSON.toJavaObject(JSON.parseObject(item), classOf[TbStu])
  82. (stu.id, stu)
  83. })
  84. val stuIdAndCourse: DStream[(String, TbStuCourse)] = stuCourseKafkaValue.map(item => {
  85. val course: TbStuCourse = JSON.toJavaObject(JSON.parseObject(item), classOf[TbStuCourse])
  86. (course.stuId, course)
  87. })
  88. //这里得到的是fullJoin的结果
  89. val fullJoin: DStream[(String, (Option[TbStu], Option[TbStuCourse]))] = stuIdAndStu.fullOuterJoin(stuIdAndCourse)
  90. val resStuWide: DStream[StuWide] = fullJoin.mapPartitions(iter => {
  91. val jedis: Jedis = OffsetManagerUtil.jedisPool.getResource
  92. val res: ListBuffer[StuWide] = ListBuffer[StuWide]()
  93. for ((stuId, (stu, course)) <- iter) {
  94. if (stu.isDefined) {
  95. //stu来了
  96. if (course.isDefined) {
  97. //如果course来了
  98. val resItem: StuWide = StuAndStuScore.getStuWide(stu.get, course.get)
  99. res.append(resItem)
  100. }
  101. //由于course来还是没有来stu保存在redis里面,目的就是等待晚来的数据
  102. //这里选用存储stu的数据格式为string,keuy=FullJoin:Stu:stuid
  103. val stuKey = s"FullJoin:Stu:${stu.get.id}"
  104. //把json数据传入进去,默认保存一天,根据自己的情况来定
  105. val stuJsonCache: String = JSON.toJSONString(stu.get, JSON.DEFAULT_GENERATE_FEATURE)
  106. jedis.setex(stuKey, 3600 * 24, stuJsonCache)
  107. //stu先到还要看下缓存里面有没有之前到的course
  108. val couKey = s"FullJoin:Course:${stu.get.id}"
  109. val courseCacheDatas: util.Set[String] = jedis.smembers(couKey)
  110. val scala: mutable.Set[String] = courseCacheDatas.asScala
  111. for (elem <- scala) {
  112. val courseItem: TbStuCourse = JSON.toJavaObject(JSON.parseObject(elem), classOf[TbStuCourse])
  113. //这里在把数据加入进去
  114. val stuRes: TbStu = stu.get
  115. val wide: StuWide = StuAndStuScore.getStuWide(stuRes, courseItem)
  116. res.append(wide)
  117. }
  118. //删除掉处理完的course数据
  119. jedis.del(couKey)
  120. } else {
  121. //如果stu没有来,我们选用set存储分数
  122. val courseKey = s"FullJoin:Course:${course.get.stuId}"
  123. val courseJsonCache: String = JSON.toJSONString(course.get, JSON.DEFAULT_GENERATE_FEATURE)
  124. jedis.sadd(courseKey, courseJsonCache)
  125. }
  126. }
  127. //关闭资源
  128. jedis.close()
  129. res.iterator
  130. })
  131. resStuWide.foreachRDD(rdd => {
  132. rdd.foreach(item=>{
  133. println("===========")
  134. println(item)
  135. val stuWide: String = JSON.toJSONString(item, JSON.DEFAULT_GENERATE_FEATURE)
  136. MyKafkaSinkUtil.send("dws_tb_stuwide",stuWide)
  137. })
  138. // 如果都操作完了这里就是保存偏移量
  139. OffsetManagerUtil.saveOffset(stuCourseTopicName, stuCourseGroupName, stuCourseOffsetRanges)
  140. // 手动提交kafka的偏移量
  141. stuCourseKafkaInputDStream.asInstanceOf[CanCommitOffsets].commitAsync(stuCourseOffsetRanges)
  142. // 如果都操作完了这里就是保存偏移量
  143. OffsetManagerUtil.saveOffset(stuTopicName, stuGroupName, stuOffsetRanges)
  144. // 手动提交kafka的偏移量
  145. stuKafkaInputDStream.asInstanceOf[CanCommitOffsets].commitAsync(stuOffsetRanges)
  146. })
  147. ssc.start()
  148. ssc.awaitTermination()
  149. }
  150. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/运维做开发/article/detail/771936
推荐阅读
相关标签
  

闽ICP备14008679号