赞
踩
1、读取本地文件,转换成map
- val path = "文件路径"
- val source = Source.fromFile(path).getLines().toList.mkString("").replaceAll(" ","")
-
- val key = JSON.parseObject(source).get("key").toString
- val columns = JSON.parseObject(source).get("value").toString
-
- val map = new util.HashMap[String, String]()
- map.put("RK", getValue(key))
-
- JSON.parseObject(columns.toString).keySet().asScala.foreach(elem => {
- val valueJson = JSON.parseObject(columns.toString).get(elem).toString
- map.put(elem, getValue(valueJson))
- })
-
-
-
- def getValue(str: String): String = {
-
- val value = str.toString.replace("[","").replace("]","")
- JSON.parseObject(value).get("value").toString
- }

2、将map转变成rdd
- val schema = StructType(map.asScala.toSeq.map {case(k,v) =>
- StruchField(k, StringType, nullable = true)
- })
-
- val row = Row.fromSeq(map.values().asScala.toSeq)
-
- val rowRDD = spark.sparkContext.parallelize(Seq(row))
-
- val df = spark.createDataFrame(rowRDD, schema)
备注:数据格式
- {
- "key":[
- {
- "name":"RK",
- "type":"String",
- "value":"1234567890"
- }
- ],
- "columns":{
- "column_name1":[
- "name":"column_name1",
- "type":"String",
- "value":"111"
- ],
- "column_name2":[
- "name":"column_name2",
- "type":"String",
- "value":"222"
- ],
- "column_name3":[
- "name":"column_name3",
- "type":"String",
- "value":"333"
- ]
- }
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。