当前位置:   article > 正文

spark sql读取hive数据直接写入doris,离线批量导入_java开发spark把hive数据导出到doris

java开发spark把hive数据导出到doris

先简单的贴贴代码,后面会完善一下。

一,spark sql 读取hive表

这里通过catalog查询表的字段信息,然后 查询出来的字段colStr 要给下面的doris使用。

 注意:我这里是直接拿取的hive表所有的字段。

二,spark自定义输出


这里就是简单封装了一下

实现的效果:

三,通过stream load方式数据写入doris


   循环遍历DataFrame之后写入到doris里面:

  1. val dorisStreamLoader = new DorisStreamLoad("192.168.5.xx:8040", "example_db", "assuer_order_test", "root", "root")
  2. val cumsArrays = colStr.split(",")
  3. val fieldDelimiter: String = "\t"
  4. val lineDelimiter: String = "\n"
  5. val NULL_VALUE: String = "\\N"
  6. val maxRowCount = 5000
  7. val maxRetryTimes = 3
  8. data.rdd.foreachPartition(partition => {
  9. val buffer = ListBuffer[String]()
  10. var jsonArrays = new JSONArray()
  11. partition.foreach(f = row => {
  12. // val value: StringJoiner = new StringJoiner(fieldDelimiter)
  13. // create one row string
  14. val json = new JSONObject()
  15. for (i <- 0 until row.size) {
  16. val field = row.get(i)
  17. val fieldName = cumsArrays(i)
  18. if (field == null) {
  19. // value.add(NULL_VALUE)
  20. json.put(fieldName, NULL_VALUE)
  21. } else {
  22. // value.add(field.toString)
  23. json.put(fieldName, field.toString)
  24. }
  25. }
  26. jsonArrays.add(json)
  27. // add one row string to
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Li_阴宅/article/detail/973218
推荐阅读
相关标签
  

闽ICP备14008679号