赞
踩
Spark SQL是用于结构化数据处理的Spark模块。与基本的Spark RDD API不同,Spark SQL提供的接口为Spark提供了有关数据结构和正在执行的计算的更多信息。在内部,Spark SQL使用这些额外的信息来执行额外的优化。与Spark SQL交互的方法有多种,包括SQL和Dataset API。计算结果时,将使用相同的执行引擎,而与要用来表达计算的API /语言无关。这种统一意味着开发人员可以轻松地在不同的API之间来回切换,从而提供最自然的方式来表达给定的转换。
Spark SQL的一种用途是执行SQL查询。Spark SQL还可以用于从现有的Hive安装中读取数据。有关如何配置此功能的更多信息,请参考Hive Tables部分。当从另一种编程语言中运行SQL时,结果将作为Dataset / DataFrame()返回。您还可以使用命令行 或通过JDBC / ODBC与SQL接口进行交互。
DataFrame只是类型别名Dataset[Row]。而在Java API中,用户需要使用Dataset来代表DataFrame。
性能比RDD好,原因两点
定制化内存管理
数据以二进制的方式存在于非堆内存,节省了大量空间之外,还摆脱了GC的限制
优化的执行计划
查询计划通过Spark catalyst optimiser进行优化
Spark RDD操作的是纯数据,而Spark SQL在操作数据的同时还可以操作结构
数据文件提取:
链接:https://pan.baidu.com/s/1yD9lSLk83GP3Ye1Zvlu2iw
提取码:l1d6
根据代码中用到的文件名称拿来使用即可
一个一个注释取消掉测试即可(本人亲测成功)
package com.jbit.sql import java.sql.DriverManager import org.apache.spark.sql.{SaveMode, SparkSession} //排序计算的导入 import org.apache.spark.sql.functions._ /** * Spark SQL */ object SQLDemo { def main(args: Array[String]): Unit = { //创建spark对象(以getOrCreate()结束) var spark=SparkSession.builder() .appName("sql") .master("local") .getOrCreate(); //读取csv格式的文件且使用首行当作标题 //read:读取 //option:设置读取属性(这里是以头行为标题) //var dataHeader = spark.read.option("header","true").csv("data/A_1.csv") //读取json类型的文件 var data = spark.read.json("data/user.json") //显示前...条数据 //data.show(10) //只显示name列的值 //data.select("name").show() //也可以多列 //data.select("name","age").show() //过滤 //data.filter("age > 10").show() //评论数大于500的(filter中汉字列名加``) //dataHeader.filter(" `评论数` > 500").select("名称","评论数").show() //可以使用sql中的函数 /* data.selectExpr("avg(age)").show() data.selectExpr("count(*)").show() data.selectExpr("max(age)").show() data.selectExpr("min(age)").show()*/ //创建临时表 //data.createOrReplaceTempView("user") //全局表 //data.createGlobalTempView("userGlobal") //下面的操作除最后那个方法外都要以创建表为前提,也就是上面这条语句 //查询临时表中的数据 //spark.sql("select * from user").show() //查看全局表中的数据的两种方式(global_temp.表名) //与全局表的语句相关联使用 //spark.sql("select * from global_temp.userGlobal").show() //spark.newSession().sql("select * from global_temp.userGlobal").show() //根据条件查询临时表中的数据 //spark.sql("select * from user where age>10").show() //排序 //spark.sql("select * from user order by age").show() //分组 //spark.sql("select count(*),age from user group by age").show() //查看表结构 //data.printSchema(); //getFoodNum(spark) //f1(spark) //f2(spark) //f3(spark) //f4(spark) //f5(spark) //f6(spark) //f7(spark) //f8(spark) f9(spark) } //计算每个区中餐饮的数量 def getFoodNum(spark:SparkSession)={ //读取文件且按照首行分出标题 var data=spark.read.option("header","true").csv("data/SQLaddress.csv"); //根据标题行政区名称分组且计数,再通过orderBy来从大到小排序显示 data.groupBy("行政区名称") .count() .orderBy(desc("count")) .show() } //餐饮人均价格排名 def f1(spark:SparkSession): Unit ={ val data=spark.read.option("header","true").csv("data/A_1.csv") //这条导入语句是用来处理DataFrame中不能直接使用map方法而隐式转换成RDD类型 import spark.implicits._ //查询所需的列 data.select("名称","人均价格") .map( x => { //使用getString来获取上面select中查询中的这一行的某个列 //第一个参数实际上获取到的就是餐饮名称(看上面select中查询的就知道) //第二个参数实际上获取的思路为: 人均价格中都带有元这个字,通过元拆分获取元前面的数据也就是(0)且转换成Int类型的 (x.getString(0),x.getString(1).split("元")(0).toInt) } ) //因为使用map后得出来的标题值为: _1| _2,所以使用toDF来转换成我们自己想要的标题 .toDF("餐饮名称","人均价格") //排序 .orderBy(desc("人均价格")) //显示 .show() } //计算每个类型的平均价格 def f2(spark:SparkSession): Unit ={ var data=spark.read.option("header","true").csv("data/A_1.csv") import spark.implicits._ val result = data.select("类别","人均价格") .map( x => { (x.getString(0),x.getString(1).split("元")(0).toInt) } ).toDF("类别","人均价格") .groupBy("类别") /*.count() .show()*/ // 总消费 /*result.sum("人均价格") .orderBy(desc("sum(人均价格)")) .show()*/ // 算平均值 /*result.avg("人均价格") .orderBy(desc("avg(人均价格)")) .show()*/ // 每个类别中最贵的 /*result.max("人均价格") .orderBy(desc("max(人均价格)")) .show()*/ // 每个类别中最便宜的(升序排) result.min("人均价格") .orderBy("min(人均价格)") .show() //数量 //result.count().show() } //针对评分进行排序,处理,以及类别处理...(转换数据格式) def f3(spark:SparkSession): Unit ={ val data=spark.read.option("header","true").csv("data/A_1.csv") import spark.implicits._ //转换列的数据类型 val table=data.select("名称","评论数","人均价格","类别","商圈","口味评分","环境评分","服务评分","星级","店铺ID","网址") .map( x => { val comments = x.getString(1).toInt val price = x.getString(2).split("元")(0).toInt val tasteScore = x.getString(5).toDouble val evmScore = x.getString(6).toDouble val serviceScore = x.getString(7).toDouble (x.getString(0),comments,price,x.getString(3),x.getString(4),tasteScore,evmScore,serviceScore) } ).toDF("name","comments","price","class","address","tasteScore","evmScore","serviceScore") //.printSchema() //.show() //将转换好的数据类型创建临时表 table.createOrReplaceTempView("food") //根据口味排名 //spark.sql("select * from food order by tasteScore desc").show() //统计每个类型美食的平均价格 //spark.sql("select class,avg(price) from food group by class").show() //统计每个商圈的消费平均水平 spark.sql("select address,avg(price) from food group by address").show() //统计各个口味评分的餐饮数量 spark.sql("select tasteScore,count(*) from food group by tasteScore").show() //统计每类美食的人气数量 spark.sql("select class,count(*) from food group by class").show() /** * 为人均消费进行评级,并计算出每个级别的餐饮数量 * 0-100为1级 * 101-202为2级 * 请计算出一级有多少个,二级有多少个 */ var oneLevel = 0; var twoLevel=0 val classAvg=data .map( x => { //为一级 if(x.getString(2).split("元")(0).toInt<=100) { oneLevel=oneLevel+1 } else{ twoLevel=twoLevel+1 } (oneLevel.toInt,twoLevel.toInt) } ).toDF("oneLevel","twoLevel") classAvg.createOrReplaceTempView("classAvg") spark.sql("select oneLevel,twoLevel from classAvg order by oneLevel desc limit 1 ").show() } //自定义函数 def f4(spark:SparkSession): Unit ={ val data=spark.read.json("data/user.json") //自定义函数 spark.udf.register("f1",(x:String) => {x+"a"}) //结果可以看到输出的值为name+a data.selectExpr("f1(name)").show() val data2 = spark.read .option("header","true") .csv("data/A_1.csv") spark.udf.register("clearData",(x:String) => { x.split("元")(0) }) //全部行 data2.show() //部分行 //第一个参数的实际意思是只对人均价格这一行进行clearData函数的处理 //第二个参数的实际意思就是单纯的显示 data2.selectExpr("clearData(`人均价格`)","`名称`").show() } case class Person(name: String, age: Long) //数据集 def f5(spark:SparkSession): Unit ={ import spark.implicits._ val caseClassDS = Seq(Person("Andy" , 32)).toDS() caseClassDS.show() val primitiveDS = Seq(1,2,3).toDS() //给里面的每个参数都加1 primitiveDS.map(_ + 1).collect().foreach(println(_)) val path="data/user.json" //读取了user.json文件中的内容并且转换成Person对象 val peopleDS = spark.read.json(path).as[Person] peopleDS.show() } // 自定义函数 def f6(spark: SparkSession): Unit ={ val data = spark.read.json("data/user.json") spark.udf.register("f2",MyFun) data.createOrReplaceTempView("user") spark.sql("select f2(age) from user").show() } //读取mysql数据 def f7(spark:SparkSession): Unit ={ val data = spark.read.format("jdbc") .option("url","jdbc:mysql://127.0.0.1:3306/seckill") .option("driver","com.mysql.jdbc.Driver") .option("user","root") .option("password","root") .option("dbtable","tb_user") .load() data.show() } //读取表中的内容,如果数据库中没有对应的数据表则创建 def f8(spark:SparkSession): Unit ={ val data = spark.read.json("data/user.json") data.write.format("jdbc") .option("url","jdbc:mysql://127.0.0.1:3306/seckill") .option("driver","com.mysql.jdbc.Driver") .option("user","root") .option("password","root") .option("dbtable","user") .mode(SaveMode.Append) //创建对应数据表 .save() } //jdbc的方式 def f9(spark:SparkSession): Unit ={ val data = spark.read.json("data/user.json").collect() val connection = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/seckill","root","root") val prepare = connection.prepareStatement("insert into user values (?,?)") //遍历表中的数据并做处理 data.foreach( x => { //设置提交的参数 prepare.setObject(1,x.getLong(0)) prepare.setObject(2,x.getString(1)) prepare.addBatch() } ) //组装完参数后提交SQL,执行SQL prepare.executeBatch() } } ata.groupBy("行政区名称") .count() .orderBy(desc("count")) .show() } }
package com.jbit.sql import org.apache.spark.sql.Row import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types.{DataType, DoubleType, LongType, StructField, StructType} /** * 自定义聚合函数 * 这里定义的是一个求平均值的操作 */ object MyFun extends UserDefinedAggregateFunction{ // 输入的一个入口,接收的是集合 override def inputSchema: StructType = { //第一个参数为name //第二个参数为Spark的数据类型 //通过Nil声明这是一个集合 StructType(StructField("input",LongType) :: Nil) } // 缓存操作 override def bufferSchema: StructType = { StructType( StructField("sum",LongType) :: StructField("count",LongType) :: Nil ) } // 执行完聚合函数后返回值的类型 override def dataType: DataType = DoubleType // 相同的输入是否有相同的输出 override def deterministic: Boolean = true // 初始化操作 override def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0L buffer(1) = 0L } // 数据的计算: input DF中的一行 // buffer: 之前运算的结果缓存 override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { if(!input.isNullAt(0)){ // 和 buffer(0) = buffer.getLong(0) + input.getLong(0) // 数量 buffer(1) = buffer.getLong(1) + 1 } } // 分区计算的合并 override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0) buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1) } //最后的结果 override def evaluate(buffer: Row): Any = { buffer.getLong(0).toDouble / buffer.getLong(1) } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。