当前位置:   article > 正文

【Spark】HashMap转RDD_spark 把map创建成rdd

spark 把map创建成rdd

1、读取本地文件,转换成map

  1. val path = "文件路径"
  2. val source = Source.fromFile(path).getLines().toList.mkString("").replaceAll(" ","")
  3. val key = JSON.parseObject(source).get("key").toString
  4. val columns = JSON.parseObject(source).get("value").toString
  5. val map = new util.HashMap[String, String]()
  6. map.put("RK", getValue(key))
  7. JSON.parseObject(columns.toString).keySet().asScala.foreach(elem => {
  8. val valueJson = JSON.parseObject(columns.toString).get(elem).toString
  9. map.put(elem, getValue(valueJson))
  10. })
  11. def getValue(str: String): String = {
  12. val value = str.toString.replace("[","").replace("]","")
  13. JSON.parseObject(value).get("value").toString
  14. }

2、将map转变成rdd

  1. val schema = StructType(map.asScala.toSeq.map {case(k,v) =>
  2. StruchField(k, StringType, nullable = true)
  3. })
  4. val row = Row.fromSeq(map.values().asScala.toSeq)
  5. val rowRDD = spark.sparkContext.parallelize(Seq(row))
  6. val df = spark.createDataFrame(rowRDD, schema)

备注:数据格式

  1. {
  2. "key":[
  3. {
  4. "name":"RK",
  5. "type":"String",
  6. "value":"1234567890"
  7. }
  8. ],
  9. "columns":{
  10. "column_name1":[
  11. "name":"column_name1",
  12. "type":"String",
  13. "value":"111"
  14. ],
  15. "column_name2":[
  16. "name":"column_name2",
  17. "type":"String",
  18. "value":"222"
  19. ],
  20. "column_name3":[
  21. "name":"column_name3",
  22. "type":"String",
  23. "value":"333"
  24. ]
  25. }
  26. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/空白诗007/article/detail/906409
推荐阅读
相关标签
  

闽ICP备14008679号