赞
踩
/** * 数据分析: * uid,startTime, endTime, downFlow, lag() over , flag , sum_over * 1,2020-02-18 14:20:30,2020-02-18 14:46:30,20, 2020-02-18 14:20:30 0 0 * 1,2020-02-18 14:47:20,2020-02-18 15:20:30,30, 2020-02-18 14:46:30 0 0 * 1,2020-02-18 15:37:23,2020-02-18 16:05:26,40, 2020-02-18 15:20:30 1 1 * 1,2020-02-18 16:06:27,2020-02-18 17:20:49,50, 2020-02-18 16:05:26 0 1 * 1,2020-02-18 17:21:50,2020-02-18 18:03:27,60, 2020-02-18 17:20:49 0 1 * * 根据uid分组,每个用户的数据按开始时间排序, * 判断下一行的开始时间减去上一行的结束时间大于10分钟,开一个窗口flag 设为1,否则为0 * 再开一个窗口sum_over,将flag累加 根据相同的uid,sum_over 聚合流量 */ object FlowCountTest { def main(args: Array[String]): Unit = { val sc = SparkUtils.createContext(true) val lines: RDD[String] = sc.textFile("data/flow.txt") //按uid分组 val groupd: RDD[(String, Iterable[(Long, Long, Long)])] = lines.mapPartitions(it => { val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") it.map(e => { val fields = e.split(",") val uid = fields(0) //将日期转换为时间戳 val startTime = dateFormat.parse(fields(1)).getTime val endTime = dateFormat.parse(fields(2)).getTime val downFlow = fields(3).toLong //uid作为key,方便排序 (uid, (startTime, endTime, downFlow)) }) }).groupByKey() //按开始时间排序 val sumOver: RDD[(String, (Long, Long, Long, Int))] = groupd.flatMapValues(it => { val sorted: List[(Long, Long, Long)] = it.toList.sortBy(_._1) //定义一个中间变量,接收上一行的结束时间,相当于sql的lag var temp = 0L var flag = 0 // 0 或 1 var sum = 0 // 累加flag 0 0 一组 1 1 1 一组 sorted.map(e => { val startTime = e._1 val endTime = e._2 val flow = e._3 //如果不是第一行数据,比较这一行的开始时间和temp(上一行的结束时间),大于10分钟,将flag设为1,否则为0 if (temp != 0) { //如果这一行的开始时间减去上一行的结束时间大于10分钟,开一个窗口设为1,否则为0 if ((startTime - temp) / 60000 > 10) { flag = 1 } else { flag = 0 } } //如果是第一行数据,将结束时间赋给temp temp = endTime //返回这一行的开始时间,结束时间,流量,sumOver sum += flag (startTime, endTime, flow, sum) }) }) val res: RDD[((String, Int), (Long, Long, Long))] = sumOver.map { case (uid, (startTime, endTime, flow, sum)) => { //把startTime和endTime不超过10分钟的放到一组 ((uid, sum), (flow, startTime, endTime)) } }.reduceByKey((a, b) => { //根据uid,sum 聚合流量, (a._1 + b._1, Math.min(a._2, b._2), Math.max(a._3, b._3)) }) //整理数据,把时间戳转换为日期 val result = res.mapPartitions(it => { val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") it.map { case ((uid, _), (flow, startTime, endTime)) => { (uid, dateFormat.format(new Date(startTime)), dateFormat.format(new Date(endTime)), flow) } } }).collect() println(result.toBuffer) sc.stop() } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。