赞
踩
转换成DataFrame之后就可以直接针对HDFS等任何可以构建为RDD的数据,进行Spark SQL进行SQL查询了。
package pz.spark.study.sql; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import java.io.Serializable; import java.util.List; /** * 使用反射的方式将RDD转换成DataFrame */ public class RDD2DataFrameReflection { public static void main(String[] args) { //创建普通RDD SparkConf conf = new SparkConf().setAppName("RDD2DataFrameReflection").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); JavaRDD<String> lines = sc.textFile("./student.txt"); JavaRDD<Student> students = lines.map(new Function<String, Student>() { public Student call(String line) throws Exception { String[] lineSplited = line.split(","); Student stu = new Student(); stu.setAge(Integer.valueOf(lineSplited[0])); stu.setId(Integer.valueOf(lineSplited[1])); stu.setName(lineSplited[2]); return stu; } }); //使用反射的方式,将RDD转换成DataFrame //将Student.class传入进去,其实就是用反射的方法来创建DataFrame //因为Student.class本身就是反射的一个应用 //然后底层还得通过对Student class进行反射,来获取其中的field Dataset<Row> studentDF = sqlContext.createDataFrame(students, Student.class); //拿到了一个DataFrame之后就可以将其注册为一个临时表,然后针对其中的数据执行sql语句 studentDF.registerTempTable("students"); //针对students临时表执行sql语句,查询年龄小于等于18岁的学生 Dataset<Row> teenagerDF = sqlContext.sql("select * from students where age<=18"); //将查询出来的DataFrame,再次转换成RDD JavaRDD<Row> teenagerRDD = teenagerDF.javaRDD(); //将RDD中的数据,进行映射,映射为Student JavaRDD<Student> teenagerStudentRDD = teenagerRDD.map(new Function<Row, Student>() { public Student call(Row row) throws Exception { Student stu = new Student(); stu.setAge(row.getInt(0)); stu.setId(row.getInt(1)); stu.setName(row.getString(2)); return stu; } }); //将数据collect回来,打印出来 List<Student> studentList = teenagerStudentRDD.collect(); for (Student stu: studentList) { System.out.println(stu); } } } class Student implements Serializable { private int id; private String name; private int age; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } @Override public String toString() { return "Student{" + "id=" + id + ", name='" + name + '\'' + ", age=" + age + '}'; } }
package pz.spark.study.sql import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} /** * */ object RDD2DataFrameReflection_scala { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrameReflection_scala"); val sc = new SparkContext(conf); val sqlContext = new SQLContext(sc); /** * 在scala中使用反射方法,进行RDD到DataFrame的转换,需要手动导入一个隐式转换 */ import sqlContext.implicits._ case class Student(id:Int,name:String,age:Int) //这里其实就是一个普通的,元素为case class的RDD //直接使用RDD 的toDF,即可将其转换为DataFrame val studentDF = sc.textFile("./student.txt", 1) .map(line => line.split(",")) .map(arr => Student(arr(0).trim().toInt,arr(1),arr(2).trim().toInt)) .toDF();//不知道为什么此处总是报错,无法继续运行 studentDF.registerTempTable("students"); val teenagerDF = sqlContext.sql("select * from students where age <=18"); val teenagerRDD = teenagerDF.rdd teenagerRDD.map(row => Student(row(0).toString().toInt,row(1).toString,row(2).toString.toInt)) .collect() .foreach(stu => println(stu.id +":" + stu.name + ": " + stu.age)) } }
package pz.spark.study.sql; import org.apache.derby.impl.sql.compile.SQLBooleanConstantNode; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import java.util.ArrayList; import java.util.List; /** * * 以编程方式动态指定元数据,将RDDzhuanhuan成DataFrame */ public class RDD2DataFrameProgrammatically { public static void main(String[] args) { //创建SparkConf ,JavaSparkContext,SQLContext SparkConf conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrameProgrammatically"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); //第一步,创建一个普通的RDD,但是必须将其转换成RDD<Row>的这种格式 JavaRDD<String> lines = sc.textFile("./student.txt"); //报了一个错,不能直接从String转换成Integer的一个类型转换错误 //说明,有个数据,定义成了String,但使用的时候将age强行转换成了Integer来使用 //而且,错误报在了sql相关的代码中 //所以,基本可以断定,在sql中用到了age<=18的语法,所以就强行将age转换成了Integer来使用 //但是,肯定是之前的某些步骤,将age定义成了string //所以就往前找 //往Row中放数据的时候,要注意,什么格式的数据,就用什么格式转换一下再放进去 JavaRDD<Row> rows = lines.map(new Function<String, Row>() { public Row call(String line) throws Exception { String[] lineSplited = line.split(","); //将数据封装到一个一个的row中 return RowFactory.create(Integer.valueOf(lineSplited[0]),lineSplited[1],Integer.valueOf(lineSplited[2])); } }); //第二步,动态构造元数据 //比如,id,name等,field的名称和类型,可能都是在程序运行过程中,动态从mysql,db里 //或者是配置文件中加载出来的,是不固定的 //所以特别适合用这种编程的方式,来构建元数据 List<StructField> structFields = new ArrayList<StructField>(); structFields.add(DataTypes.createStructField("id",DataTypes.IntegerType,true)); structFields.add(DataTypes.createStructField("name",DataTypes.StringType,true)); structFields.add(DataTypes.createStructField("age",DataTypes.IntegerType,true)); StructType structType = DataTypes.createStructType(structFields); //第三步,使用动态构造的元数据,将RDD转化成DataFrame Dataset<Row> studentDF = sqlContext.createDataFrame(rows, structType); //后面就可以使用DataFrame了 studentDF.registerTempTable("students"); Dataset<Row> teenagerDF = sqlContext.sql("select * from students where age <= 18"); List<Row> teenagerRDD = teenagerDF.javaRDD().collect(); for (Row row:teenagerRDD) { System.out.println(row); } } }
package pz.spark.study.sql import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.{SparkConf, SparkContext} object RDD2DataFrameProgrammatically_scala { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrameProgrammatically"); val sc = new SparkContext(conf); val sQLContext = new SQLContext(sc); //第一步:构造出元素为Row的普通RDD val studentRDD = sc.textFile("./student.txt") .map(line => Row(line.split(",")(0).toInt,line.split(",")(1),line.split(",")(2).toInt)); //第二步:编程方式动态构建元数据 val structType = StructType(Array( StructField("id",IntegerType,true), StructField("name",StringType,true), StructField("age",IntegerType,true))) //第三步,进行RDD到DataFrame的转换 val studentDF = sQLContext.createDataFrame(studentRDD,structType); //继续正常使用 studentDF.registerTempTable("students"); val teenagerDF = sQLContext.sql("select * from students where age <=18"); val teenagerRDD = teenagerDF.rdd.collect().foreach(row => println(row)); } }
本文为北风网Spark2.0培训视频的学习笔记
视频链接:
https://www.bilibili.com/video/av19995678/?p=105
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。