当前位置:   article > 正文

案例 统计用户上网流量,如果两次上网的时间小于10分钟,合并到一起 --spark程序实现_上网时间为10小时(含)以内,基本网费20元;spark

上网时间为10小时(含)以内,基本网费20元;spark
/**
 * 数据分析:
 * uid,startTime, endTime, downFlow,  lag() over  ,  flag  ,  sum_over
 * 1,2020-02-18 14:20:30,2020-02-18 14:46:30,20, 2020-02-18 14:20:30  0  0
 * 1,2020-02-18 14:47:20,2020-02-18 15:20:30,30, 2020-02-18 14:46:30  0  0
 * 1,2020-02-18 15:37:23,2020-02-18 16:05:26,40, 2020-02-18 15:20:30  1  1
 * 1,2020-02-18 16:06:27,2020-02-18 17:20:49,50, 2020-02-18 16:05:26  0  1
 * 1,2020-02-18 17:21:50,2020-02-18 18:03:27,60, 2020-02-18 17:20:49  0  1
 *
 * 根据uid分组,每个用户的数据按开始时间排序,
 * 判断下一行的开始时间减去上一行的结束时间大于10分钟,开一个窗口flag 设为1,否则为0
 * 再开一个窗口sum_over,将flag累加  根据相同的uid,sum_over 聚合流量
 */
object FlowCountTest {
  def main(args: Array[String]): Unit = {
    val sc = SparkUtils.createContext(true)
    val lines: RDD[String] = sc.textFile("data/flow.txt")

    //按uid分组
    val groupd: RDD[(String, Iterable[(Long, Long, Long)])] = lines.mapPartitions(it => {
      val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
      it.map(e => {
        val fields = e.split(",")
        val uid = fields(0)
        //将日期转换为时间戳
        val startTime = dateFormat.parse(fields(1)).getTime
        val endTime = dateFormat.parse(fields(2)).getTime
        val downFlow = fields(3).toLong

        //uid作为key,方便排序
        (uid, (startTime, endTime, downFlow))
      })
    }).groupByKey()


    //按开始时间排序
    val sumOver: RDD[(String, (Long, Long, Long, Int))] = groupd.flatMapValues(it => {
      val sorted: List[(Long, Long, Long)] = it.toList.sortBy(_._1)
      //定义一个中间变量,接收上一行的结束时间,相当于sql的lag
      var temp = 0L
      var flag = 0  // 0 或 1
      var sum = 0   // 累加flag   0 0 一组  1  1  1 一组
      sorted.map(e => {
        val startTime = e._1
        val endTime = e._2
        val flow = e._3
        //如果不是第一行数据,比较这一行的开始时间和temp(上一行的结束时间),大于10分钟,将flag设为1,否则为0
        if (temp != 0) {
          //如果这一行的开始时间减去上一行的结束时间大于10分钟,开一个窗口设为1,否则为0
          if ((startTime - temp) / 60000 > 10) {
            flag = 1
          } else {
            flag = 0
          }
        }

        //如果是第一行数据,将结束时间赋给temp
        temp = endTime
        //返回这一行的开始时间,结束时间,流量,sumOver
        sum += flag
        (startTime, endTime, flow, sum)
      })
    })

    val res: RDD[((String, Int), (Long, Long, Long))] = sumOver.map {
      case (uid, (startTime, endTime, flow, sum)) => {
        //把startTime和endTime不超过10分钟的放到一组
        ((uid, sum), (flow, startTime, endTime))
      }
    }.reduceByKey((a, b) => {
      //根据uid,sum 聚合流量,
      (a._1 + b._1, Math.min(a._2, b._2), Math.max(a._3, b._3))
    })

    //整理数据,把时间戳转换为日期
    val result = res.mapPartitions(it => {
      val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
      it.map {
        case ((uid, _), (flow, startTime, endTime)) => {
          (uid, dateFormat.format(new Date(startTime)), dateFormat.format(new Date(endTime)), flow)
        }
      }
    }).collect()

    println(result.toBuffer)

    sc.stop()
  }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/黑客灵魂/article/detail/745225
推荐阅读
相关标签
  

闽ICP备14008679号