赞
踩
原创/朱季谦
本文适合入门Spark RDD的计算处理。
在日常工作当中,经常遇到基于Spark去读取存储在HDFS中的批量文件数据进行统计分析的案例,这些文件一般以csv或者txt文件格式存在。例如,存在这样一份消费者行为数据,字段包括消费者姓名,年龄,性别,月薪,消费偏好,消费领域,购物平台,支付方式,单次购买商品数量,优惠券获取情况,购物动机。
基于这份消费者行为数据,往往会有以下一些分析目标:
针对这些需求,就可以使用Spark来读取文件后,进一步分析处理统计。
接下来,就是针对以上分析目标,设计一番Spark代码计算逻辑,由此可入门学习下Spark RDD常用用法。
获取一份具备以下字段的csv随机假样本,总共5246条数据,包括“消费者姓名,年龄,性别,月薪,消费偏好,消费领域,购物平台,支付方式,单次购买商品数量,优惠券获取情况,购物动机”。
- Amy Harris,39,男,18561,性价比,家居用品,天猫,微信支付,10,折扣优惠,品牌忠诚
- Lori Willis,33,女,14071,功能性,家居用品,苏宁易购,货到付款,1,折扣优惠,日常使用
- Jim Williams,61,男,14145,时尚潮流,汽车配件,淘宝,微信支付,3,免费赠品,礼物赠送
- Anthony Perez,19,女,11587,时尚潮流,珠宝首饰,拼多多,支付宝,5,免费赠品,商品推荐
- ......
将样本存放到项目目录为src/main/resources/consumerdata.csv,然后新建一个Scala的object类,创建一个main方法, 模拟从HDSF读取数据,然后通过.map(_.split(","))将csv文件每一行切割成一个数组形式的RDD
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setMaster("local").setAppName("consumer")
- val ss = SparkSession.builder().config(conf).getOrCreate()
- val filePath: String = "src/main/resources/consumerdata.csv"
- val consumerRDD = ss.sparkContext.textFile(filePath).map(_.split(","))
可以写一段代码打印看一下consumerRDD结构——
- consumerRDD.foreach(x => {
- x.foreach(y => print(y +" "))
- println()
- })
打印结果如下——
这个RDD相当于把每一行当作里一个Array[]数组,第一行的Array0是消费者姓名,即Amy Harris,Array1是年龄,即39,以此类推。
消费者姓名 | 年龄 | 性别 | 月薪 | 消费偏好 | 消费领域 | 购物平台 | 支付方式 | 单次购买商品数量 | 优惠券获取情况 | 购物动机 |
---|---|---|---|---|---|---|---|---|---|---|
Amy Harris | 39 | 男 | 18561 | 性价比 | 家居用品 | 天猫 | 微信支付 | 10 | 折扣优惠 | 品牌忠诚 |
Lori Willis | 33 | 女 | 14071 | 功能性 | 家居用品 | 苏宁易购 | 货到付款 | 1 | 折扣优惠 | 日常使用 |
。。。 |
获取到该RDD后,就可以进行下一步的统计分析了。
这行代码意思,x.apply(7)表示取每一行的第八个字段,相当数组Array[7],第八个字段是【支付方式】。
因此就可以按照以上格式,对文本数据里的每一个字段做相应分析,后文其他计算逻辑也是类似。
consumerRDD.map(x => (x.apply(7),1)).reduceByKey(_ + _).sortBy(_._2, false).foreach(println)
打印结果如下:
x.apply(5)表示取每一行的第六个字段,相当数组Array[5],第六个字段是【购物平台】。
同前文的【统计消费者支付方式偏好分布】一样,通过map(x=>(x.apply(5),1))生成(购物平台,1)格式的RDD,然后再通过reduceByKey算子针对相同的key做统计,最后倒序排序。
consumerRDD.map(x => (x.apply(5), 1)).reduceByKey(_ + _).sortBy(_._2, false).foreach(println)
打印结果——
x.apply(4)表示取每一行的第五个字段,相当数组Array[4],第五个字段是【消费领域】。
consumerRDD.map(x => (x.apply(4), 1)).reduceByKey(_ + _).sortBy(_._2, false).foreach(println)
打印结果:
x.apply(10)表示取每一行的第十个字段,相当数组Array[10],第10个字段是【购物动机】。
consumerRDD.map(x => (x.apply(10), 1)).reduceByKey(_ + _).sortBy(_._2, false).foreach(println)
打印结果——
该需求通过将RDD映射成DataFrame数据集,方便用SQL语法处理,按照年龄区间分区,分别为"0-20","21-30","31-40"
......这个分区字符串名,就相当key,value表示落在该分区的用户数量。这时,就可以分组做聚合统计了,统计出各个年龄段的消费者数量。
- //取出consumerRDD每一行数组需要的字段
- val rowRDD = consumerRDD.map{
- x => Row(x.apply(0),x.apply(1).toInt,x.apply(2),x.apply(3).toInt,x.apply(4),x.apply(5),x.apply(6),x.apply(7),x.apply(8).toInt,x.apply(9),x.apply(10))
- }
-
- //设置字段映射
- val schema = StructType(Seq(
- StructField("consumerName", StringType),
- StructField("age", IntegerType),
- StructField("gender", StringType),
- StructField("monthlyIncome", IntegerType),
- StructField("consumptionPreference", StringType),
- StructField("consumptionArea", StringType),
- StructField("shoppingPlatform", StringType),
- StructField("paymentMethod", StringType),
- StructField("quantityOfItemsPurchased", IntegerType),
- StructField("couponAcquisitionStatus", StringType),
- StructField("shoppingMotivation", StringType)
-
- ))
- val df = ss.createDataFrame(rowRDD, schema).toDF()
- //按年龄分布计算
- val agedf = df.withColumn("age_range",
- when(col("age").between(0, 20), "0-20")
- .when(col("age").between(21, 30), "21-30")
- .when(col("age").between(31, 40), "31-40")
- .when(col("age").between(41, 50), "41-50")
- .when(col("age").between(51, 60), "51-60")
- .when(col("age").between(61, 70), "61-70")
- .when(col("age").between(81, 90), "81-90")
- .when(col("age").between(91, 100), "91-100")
- .otherwise("Unknow")
- )
- //分组统计
- val result = agedf.groupBy("age_range").agg(count("consumerName").alias("Count")).sort(desc("Count"))
- result.show()

打印结果:
类似年龄分布的操作。
- val sexResult = agedf.groupBy("gender").agg(count("consumerName").alias("Count")).sort(desc("Count"))
- sexResult.show()
打印结果:
除了以上的统计分析案例之外,还有优惠券获取情况和购物动机的关系、消费领域方式等统计,可以进一步拓展分析。
本文基于分析消费者行为数据,可以入门学习到,Spark如何读取样本文件,通过map(_.split(","))处理样本成一个数组格式的RDD,基于该RDD,可以进一步通过map、reduceByKey、groupBy等算子做处理与统计,最后获取该样本的信息价值。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。