当前位置:   article > 正文

Spark SQL的基本操作_sparksql中如何根据

sparksql中如何根据

简介

Spark SQL是用于结构化数据处理的Spark模块。与基本的Spark RDD API不同,Spark SQL提供的接口为Spark提供了有关数据结构和正在执行的计算的更多信息。在内部,Spark SQL使用这些额外的信息来执行额外的优化。与Spark SQL交互的方法有多种,包括SQL和Dataset API。计算结果时,将使用相同的执行引擎,而与要用来表达计算的API /语言无关。这种统一意味着开发人员可以轻松地在不同的API之间来回切换,从而提供最自然的方式来表达给定的转换。

Spark SQL的一种用途是执行SQL查询。Spark SQL还可以用于从现有的Hive安装中读取数据。有关如何配置此功能的更多信息,请参考Hive Tables部分。当从另一种编程语言中运行SQL时,结果将作为Dataset / DataFrame()返回。您还可以使用命令行 或通过JDBC / ODBC与SQL接口进行交互。

DataFrame只是类型别名Dataset[Row]。而在Java API中,用户需要使用Dataset来代表DataFrame。

与RDD的对比

性能比RDD好,原因两点
  定制化内存管理
    数据以二进制的方式存在于非堆内存,节省了大量空间之外,还摆脱了GC的限制
  优化的执行计划
    查询计划通过Spark catalyst optimiser进行优化

Spark RDD操作的是纯数据,而Spark SQL在操作数据的同时还可以操作结构

代码

数据文件提取:
链接:https://pan.baidu.com/s/1yD9lSLk83GP3Ye1Zvlu2iw
提取码:l1d6
根据代码中用到的文件名称拿来使用即可


一个一个注释取消掉测试即可(本人亲测成功)

package com.jbit.sql

import java.sql.DriverManager

import org.apache.spark.sql.{SaveMode, SparkSession}
//排序计算的导入
import org.apache.spark.sql.functions._

/**
 * Spark SQL
 */
object SQLDemo {

  def main(args: Array[String]): Unit = {
    //创建spark对象(以getOrCreate()结束)
    var spark=SparkSession.builder()
      .appName("sql")
      .master("local")
      .getOrCreate();

    //读取csv格式的文件且使用首行当作标题
    //read:读取
    //option:设置读取属性(这里是以头行为标题)
    //var dataHeader = spark.read.option("header","true").csv("data/A_1.csv")

    //读取json类型的文件
    var data = spark.read.json("data/user.json")

    //显示前...条数据
    //data.show(10)

    //只显示name列的值
    //data.select("name").show()

    //也可以多列
    //data.select("name","age").show()

    //过滤
    //data.filter("age > 10").show()

    //评论数大于500的(filter中汉字列名加``)
    //dataHeader.filter(" `评论数` > 500").select("名称","评论数").show()

    //可以使用sql中的函数
   /* data.selectExpr("avg(age)").show()
    data.selectExpr("count(*)").show()
    data.selectExpr("max(age)").show()
    data.selectExpr("min(age)").show()*/

    //创建临时表
    //data.createOrReplaceTempView("user")
    //全局表
    //data.createGlobalTempView("userGlobal")
    //下面的操作除最后那个方法外都要以创建表为前提,也就是上面这条语句

    //查询临时表中的数据
    //spark.sql("select * from user").show()

    //查看全局表中的数据的两种方式(global_temp.表名)
    //与全局表的语句相关联使用
    //spark.sql("select * from global_temp.userGlobal").show()
    //spark.newSession().sql("select * from global_temp.userGlobal").show()

    //根据条件查询临时表中的数据
    //spark.sql("select * from user where age>10").show()

    //排序
    //spark.sql("select * from user order by age").show()

    //分组
    //spark.sql("select count(*),age from user group by age").show()

    //查看表结构
    //data.printSchema();

    //getFoodNum(spark)

    //f1(spark)

    //f2(spark)

    //f3(spark)

    //f4(spark)

    //f5(spark)

    //f6(spark)

    //f7(spark)

    //f8(spark)

    f9(spark)
  }

  //计算每个区中餐饮的数量
  def getFoodNum(spark:SparkSession)={

    //读取文件且按照首行分出标题
    var data=spark.read.option("header","true").csv("data/SQLaddress.csv");

    //根据标题行政区名称分组且计数,再通过orderBy来从大到小排序显示
    data.groupBy("行政区名称")
      .count()
      .orderBy(desc("count"))
      .show()

  }

  //餐饮人均价格排名
  def f1(spark:SparkSession): Unit ={
    val data=spark.read.option("header","true").csv("data/A_1.csv")

    //这条导入语句是用来处理DataFrame中不能直接使用map方法而隐式转换成RDD类型
    import spark.implicits._
    //查询所需的列
    data.select("名称","人均价格")
      .map(
        x => {
            //使用getString来获取上面select中查询中的这一行的某个列
            //第一个参数实际上获取到的就是餐饮名称(看上面select中查询的就知道)
            //第二个参数实际上获取的思路为: 人均价格中都带有元这个字,通过元拆分获取元前面的数据也就是(0)且转换成Int类型的
          (x.getString(0),x.getString(1).split("元")(0).toInt)
        }
      )
      //因为使用map后得出来的标题值为: _1| _2,所以使用toDF来转换成我们自己想要的标题
      .toDF("餐饮名称","人均价格")
      //排序
      .orderBy(desc("人均价格"))
      //显示
      .show()
  }

  //计算每个类型的平均价格
  def f2(spark:SparkSession): Unit ={
    var data=spark.read.option("header","true").csv("data/A_1.csv")

    import spark.implicits._
    val result = data.select("类别","人均价格")
      .map(
        x => {
          (x.getString(0),x.getString(1).split("元")(0).toInt)
        }
      ).toDF("类别","人均价格")
      .groupBy("类别")
      /*.count()
      .show()*/

    // 总消费
    /*result.sum("人均价格")
      .orderBy(desc("sum(人均价格)"))
      .show()*/

    // 算平均值
    /*result.avg("人均价格")
      .orderBy(desc("avg(人均价格)"))
      .show()*/

    // 每个类别中最贵的
    /*result.max("人均价格")
      .orderBy(desc("max(人均价格)"))
      .show()*/

    // 每个类别中最便宜的(升序排)
    result.min("人均价格")
      .orderBy("min(人均价格)")
      .show()

    //数量
    //result.count().show()
  }

  //针对评分进行排序,处理,以及类别处理...(转换数据格式)
  def f3(spark:SparkSession): Unit ={
    val data=spark.read.option("header","true").csv("data/A_1.csv")

    import spark.implicits._
    //转换列的数据类型
    val table=data.select("名称","评论数","人均价格","类别","商圈","口味评分","环境评分","服务评分","星级","店铺ID","网址")
        .map(
          x => {
            val comments = x.getString(1).toInt
            val price = x.getString(2).split("元")(0).toInt
            val tasteScore = x.getString(5).toDouble
            val evmScore = x.getString(6).toDouble
            val serviceScore = x.getString(7).toDouble
            (x.getString(0),comments,price,x.getString(3),x.getString(4),tasteScore,evmScore,serviceScore)
          }
        ).toDF("name","comments","price","class","address","tasteScore","evmScore","serviceScore")
      //.printSchema()
      //.show()

    //将转换好的数据类型创建临时表
    table.createOrReplaceTempView("food")

    //根据口味排名
    //spark.sql("select * from food order by tasteScore desc").show()

    //统计每个类型美食的平均价格
    //spark.sql("select class,avg(price) from food group by class").show()

    //统计每个商圈的消费平均水平
    spark.sql("select address,avg(price) from food group by address").show()

    //统计各个口味评分的餐饮数量
    spark.sql("select tasteScore,count(*) from food group by tasteScore").show()

    //统计每类美食的人气数量
    spark.sql("select class,count(*) from food group by class").show()


    /**
     * 为人均消费进行评级,并计算出每个级别的餐饮数量
     * 0-100为1级
     * 101-202为2级
     * 请计算出一级有多少个,二级有多少个
     */
    var oneLevel = 0;
    var twoLevel=0
    val classAvg=data
      .map(
      x => {
        //为一级
        if(x.getString(2).split("元")(0).toInt<=100) {
          oneLevel=oneLevel+1
        } else{
          twoLevel=twoLevel+1
        }
        (oneLevel.toInt,twoLevel.toInt)
      }
    ).toDF("oneLevel","twoLevel")

    classAvg.createOrReplaceTempView("classAvg")

    spark.sql("select oneLevel,twoLevel from classAvg order by oneLevel desc limit 1  ").show()

  }

  //自定义函数
  def f4(spark:SparkSession): Unit ={
    val data=spark.read.json("data/user.json")

    //自定义函数
    spark.udf.register("f1",(x:String) => {x+"a"})

    //结果可以看到输出的值为name+a
    data.selectExpr("f1(name)").show()

    val data2 = spark.read
      .option("header","true")
      .csv("data/A_1.csv")

    spark.udf.register("clearData",(x:String) => {
      x.split("元")(0)
    })

    //全部行
    data2.show()

    //部分行
    //第一个参数的实际意思是只对人均价格这一行进行clearData函数的处理
    //第二个参数的实际意思就是单纯的显示
    data2.selectExpr("clearData(`人均价格`)","`名称`").show()

  }

case class Person(name: String, age: Long)
  //数据集
  def f5(spark:SparkSession): Unit ={
    import spark.implicits._
    val caseClassDS = Seq(Person("Andy" , 32)).toDS()
    caseClassDS.show()

    val primitiveDS = Seq(1,2,3).toDS()
    //给里面的每个参数都加1
    primitiveDS.map(_ + 1).collect().foreach(println(_))

    val path="data/user.json"
    //读取了user.json文件中的内容并且转换成Person对象
    val peopleDS = spark.read.json(path).as[Person]
    peopleDS.show()

  }

  // 自定义函数
  def f6(spark: SparkSession): Unit ={

    val data = spark.read.json("data/user.json")

    spark.udf.register("f2",MyFun)

    data.createOrReplaceTempView("user")

    spark.sql("select f2(age) from user").show()

  }

  //读取mysql数据
  def f7(spark:SparkSession): Unit ={

    val data = spark.read.format("jdbc")
      .option("url","jdbc:mysql://127.0.0.1:3306/seckill")
      .option("driver","com.mysql.jdbc.Driver")
      .option("user","root")
      .option("password","root")
      .option("dbtable","tb_user")
      .load()

    data.show()

  }

  //读取表中的内容,如果数据库中没有对应的数据表则创建
  def f8(spark:SparkSession): Unit ={
    val data = spark.read.json("data/user.json")

    data.write.format("jdbc")
      .option("url","jdbc:mysql://127.0.0.1:3306/seckill")
      .option("driver","com.mysql.jdbc.Driver")
      .option("user","root")
      .option("password","root")
      .option("dbtable","user")
      .mode(SaveMode.Append)  //创建对应数据表
      .save()
  }

  //jdbc的方式
  def f9(spark:SparkSession): Unit ={
    val data = spark.read.json("data/user.json").collect()

    val connection = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/seckill","root","root")
    val prepare = connection.prepareStatement("insert into user values (?,?)")

    //遍历表中的数据并做处理
    data.foreach(
      x => {
        //设置提交的参数
        prepare.setObject(1,x.getLong(0))
        prepare.setObject(2,x.getString(1))

        prepare.addBatch()
      }
    )

    //组装完参数后提交SQL,执行SQL
    prepare.executeBatch()

  }

}
ata.groupBy("行政区名称")
      .count()
      .orderBy(desc("count"))
      .show()

  }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258
  • 259
  • 260
  • 261
  • 262
  • 263
  • 264
  • 265
  • 266
  • 267
  • 268
  • 269
  • 270
  • 271
  • 272
  • 273
  • 274
  • 275
  • 276
  • 277
  • 278
  • 279
  • 280
  • 281
  • 282
  • 283
  • 284
  • 285
  • 286
  • 287
  • 288
  • 289
  • 290
  • 291
  • 292
  • 293
  • 294
  • 295
  • 296
  • 297
  • 298
  • 299
  • 300
  • 301
  • 302
  • 303
  • 304
  • 305
  • 306
  • 307
  • 308
  • 309
  • 310
  • 311
  • 312
  • 313
  • 314
  • 315
  • 316
  • 317
  • 318
  • 319
  • 320
  • 321
  • 322
  • 323
  • 324
  • 325
  • 326
  • 327
  • 328
  • 329
  • 330
  • 331
  • 332
  • 333
  • 334
  • 335
  • 336
  • 337
  • 338
  • 339
  • 340
  • 341
  • 342
  • 343
  • 344
  • 345
  • 346
  • 347
  • 348
  • 349
  • 350
  • 351
  • 352
  • 353
  • 354
  • 355
  • 356
  • 357
  • 358
  • 359
  • 360
package com.jbit.sql

import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DoubleType, LongType, StructField, StructType}

/**
 * 自定义聚合函数
 * 这里定义的是一个求平均值的操作
 */
object MyFun extends UserDefinedAggregateFunction{

  // 输入的一个入口,接收的是集合
  override def inputSchema: StructType = {
    //第一个参数为name
    //第二个参数为Spark的数据类型
    //通过Nil声明这是一个集合
    StructType(StructField("input",LongType) :: Nil)
  }

  // 缓存操作
  override def bufferSchema: StructType = {
    StructType(
      StructField("sum",LongType) ::
      StructField("count",LongType) :: Nil
    )
  }

  // 执行完聚合函数后返回值的类型
  override def dataType: DataType = DoubleType

  // 相同的输入是否有相同的输出
  override def deterministic: Boolean = true

  // 初始化操作
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = 0L
    buffer(1) = 0L
  }

  // 数据的计算: input DF中的一行
  // buffer: 之前运算的结果缓存
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    if(!input.isNullAt(0)){
      // 和
      buffer(0) = buffer.getLong(0) + input.getLong(0)
      // 数量
      buffer(1) = buffer.getLong(1) + 1
     }
  }

  // 分区计算的合并
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
  }

  //最后的结果
  override def evaluate(buffer: Row): Any = {
    buffer.getLong(0).toDouble / buffer.getLong(1)
  }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64

学习网站:https://spark.apache.org/docs/latest/sql-getting-started.html
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/785638
推荐阅读
相关标签
  

闽ICP备14008679号