赞
踩
先简单的贴贴代码,后面会完善一下。
这里通过catalog查询表的字段信息,然后 查询出来的字段colStr 要给下面的doris使用。
注意:我这里是直接拿取的hive表所有的字段。
这里就是简单封装了一下
实现的效果:
循环遍历DataFrame之后写入到doris里面:
- val dorisStreamLoader = new DorisStreamLoad("192.168.5.xx:8040", "example_db", "assuer_order_test", "root", "root")
- val cumsArrays = colStr.split(",")
- val fieldDelimiter: String = "\t"
- val lineDelimiter: String = "\n"
- val NULL_VALUE: String = "\\N"
- val maxRowCount = 5000
- val maxRetryTimes = 3
- data.rdd.foreachPartition(partition => {
- val buffer = ListBuffer[String]()
-
- var jsonArrays = new JSONArray()
- partition.foreach(f = row => {
- // val value: StringJoiner = new StringJoiner(fieldDelimiter)
- // create one row string
- val json = new JSONObject()
- for (i <- 0 until row.size) {
-
- val field = row.get(i)
- val fieldName = cumsArrays(i)
- if (field == null) {
- // value.add(NULL_VALUE)
- json.put(fieldName, NULL_VALUE)
- } else {
- // value.add(field.toString)
- json.put(fieldName, field.toString)
- }
-
-
- }
- jsonArrays.add(json)
- // add one row string to

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。