当前位置:   article > 正文

flink sql 处理自定义json解析_rowtype rowfield

rowtype rowfield

最近要把公司flink nginx 日志入库的代码,修改成flink sql的方式执行。这样日志的入库通过sql更加方便。

但是flink sql没有提供对json的解析方式,而nginx日志又是json字符串无法完全使用sql来完成日志的入库。

因此,需要修改flink来完成json的自定义解析。

首先

定义source表

  1. CREATE TABLE kafka (
  2. json_data string,
  3. type string, ts TIMESTAMP(3) METADATA FROM 'timestamp'
  4. ) WITH (
  5. 'connector' = 'kafka',
  6. 'topic' = 'nginxlog',
  7. 'properties.bootstrap.servers' = '127.0.0.1:9092',
  8. 'properties.group.id' = 'flink-kafka',
  9. 'scan.startup.mode' = 'earliest-offset',
  10. 'format' = 'json'
  11. )

定义sink表

  1. CREATE TABLE hdfs (
  2. request_url STRING,
  3. resp_code INT,
  4. ...
  5. ) WITH (
  6. 'connector' = 'filesystem',
  7. 'path' = 'file:///path/to/whatever',
  8. 'format' = 'orc',
  9. ...
  10. )

定义一个json解析的表(涉及方法,后面有实现)

  1. 'table' = 'kafka'读取的数据表
  2. 'stream' = 'org.apache.flink.streaming.JsonStreamTableProcessFunction' json解析类
  3. json_data$reuest_url 为
  4. {
  5. "json_data": {
  6. "reuest_url": "http://www.baidu.com"
  7. }
  8. }
  9. StreamTableUtil.createStreamTable(fsTableEnv, "CREATE TABLE to_json (\n" +
  10. " json_data$reuest_url STRING,\n" +
  11. " json_data$resp_code INT\n" +
  12. ") WITH (\n" +
  13. " 'table' = 'kafka',\n" +
  14. " 'stream' = 'org.apache.flink.streaming.JsonStreamTableProcessFunction'\n" +
  15. ")");

执行入库sql

insert into hdfs select * from to_json

下面是createStreamTable的实现

  1. package org.apache.flink.streaming;
  2. import org.apache.flink.streaming.api.datastream.DataStream;
  3. import org.apache.flink.table.api.ApiExpression;
  4. import org.apache.flink.table.api.Table;
  5. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  6. import org.apache.flink.table.api.internal.TableEnvironmentInternal;
  7. import org.apache.flink.table.catalog.CatalogTable;
  8. import org.apache.flink.table.data.RowData;
  9. import org.apache.flink.table.delegation.Parser;
  10. import org.apache.flink.table.operations.Operation;
  11. import org.apache.flink.table.operations.ddl.CreateTableOperation;
  12. import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
  13. import org.apache.flink.table.types.logical.RowType;
  14. import java.lang.reflect.Constructor;
  15. import java.util.List;
  16. import java.util.Map;
  17. import java.util.UUID;
  18. import java.util.stream.Collectors;
  19. import static org.apache.flink.table.api.Expressions.$;
  20. public final class StreamTableUtil {
  21. private StreamTableUtil() {
  22. }
  23. public static void createStreamTable(StreamTableEnvironment fsTableEnv, String sql) throws Exception {
  24. TableEnvironmentInternal tableEnvironment = (TableEnvironmentInternal) fsTableEnv;
  25. Parser parser = tableEnvironment.getParser();
  26. List<Operation> parse = parser.parse(sql);
  27. CreateTableOperation operation = (CreateTableOperation) parse.get(0);
  28. String tableName = operation.getTableIdentifier().getObjectName();
  29. CatalogTable catalogTable = operation.getCatalogTable();
  30. //create tmp table
  31. String tmpTable = "tmp" + UUID.randomUUID().toString().replaceAll("\\-", "");
  32. fsTableEnv.executeSql(sql.replace(tableName, tmpTable));
  33. Map<String, String> properties = catalogTable.getOptions();
  34. String fromTable = properties.get("table");
  35. Class<? extends StreamTableProcessFunction> streamClass = (Class<? extends StreamTableProcessFunction>) Class.forName(properties.get("stream"));
  36. createStreamTable(fsTableEnv, fromTable, catalogTable, tableName, streamClass);
  37. }
  38. private static void createStreamTable(StreamTableEnvironment fsTableEnv, String fromTable, CatalogTable toCatalogTable, String toTable, Class<? extends StreamTableProcessFunction> streamClass) throws Exception {
  39. Table fromCatalogTable = fsTableEnv.sqlQuery("select * from " + fromTable);
  40. DataStream<RowData> stream = fsTableEnv.toAppendStream(fromCatalogTable, RowData.class);
  41. RowType fromTableRowType = new RowType(fromCatalogTable.getSchema().getTableColumns().stream()
  42. .map(e -> new RowType.RowField(e.getName(), e.getType().getLogicalType(), null))
  43. .collect(Collectors.toList()));
  44. RowType toTableRowType = new RowType(toCatalogTable.getSchema().getTableColumns().stream()
  45. .map(e -> new RowType.RowField(e.getName(), e.getType().getLogicalType(), null))
  46. .collect(Collectors.toList()));
  47. Constructor<? extends StreamTableProcessFunction> constructor = streamClass.getDeclaredConstructor(RowType.class, RowType.class);
  48. InternalTypeInfo<RowData> internalTypeInfo = InternalTypeInfo.of(toTableRowType);
  49. DataStream<RowData> out = stream.process(constructor.newInstance(fromTableRowType, toTableRowType), internalTypeInfo);
  50. List<ApiExpression> apiExpressionList = toTableRowType.getFieldNames().stream().map(e -> $(e)).collect(Collectors.toList());
  51. fsTableEnv.createTemporaryView(toTable, out, apiExpressionList.toArray(new ApiExpression[0]));
  52. }
  53. }

定义stream table处理的父类

  1. package org.apache.flink.streaming;
  2. import org.apache.flink.streaming.api.functions.ProcessFunction;
  3. import org.apache.flink.table.data.RowData;
  4. import org.apache.flink.table.types.logical.RowType;
  5. public abstract class StreamTableProcessFunction extends ProcessFunction<RowData, RowData> {
  6. private RowType fromTableRowType;
  7. private RowType toTableRowType;
  8. public RowType getFromTableRowType() {
  9. return fromTableRowType;
  10. }
  11. public RowType getToTableRowType() {
  12. return toTableRowType;
  13. }
  14. public StreamTableProcessFunction(RowType fromTableRowType, RowType toTableRowType) {
  15. this.fromTableRowType = fromTableRowType;
  16. this.toTableRowType = toTableRowType;
  17. }
  18. }

实现json解析的子类

  1. package org.apache.flink.streaming;
  2. import com.google.gson.Gson;
  3. import com.google.gson.JsonObject;
  4. import com.google.gson.JsonPrimitive;
  5. import org.apache.flink.configuration.Configuration;
  6. import org.apache.flink.table.data.GenericRowData;
  7. import org.apache.flink.table.data.RowData;
  8. import org.apache.flink.table.data.binary.BinaryStringData;
  9. import org.apache.flink.table.types.logical.RowType;
  10. import org.apache.flink.util.Collector;
  11. import java.util.*;
  12. public class JsonStreamTableProcessFunction extends StreamTableProcessFunction {
  13. private Gson gson;
  14. public JsonStreamTableProcessFunction(RowType fromTableRowType, RowType toTableRowType) {
  15. super(fromTableRowType, toTableRowType);
  16. }
  17. @Override
  18. public void open(Configuration parameters) throws Exception {
  19. gson = new Gson();
  20. }
  21. @Override
  22. public void processElement(RowData value, Context ctx, Collector<RowData> out) throws Exception {
  23. Map<String, Integer> fieldIndex = new LinkedHashMap<>();
  24. for (int i = 0; i < getFromTableRowType().getFields().size(); i++) {
  25. RowType.RowField rowField = getFromTableRowType().getFields().get(i);
  26. fieldIndex.put(rowField.getName(), i);
  27. }
  28. GenericRowData rowData = new GenericRowData(getToTableRowType().getFieldCount());
  29. List<RowType.RowField> fields = getToTableRowType().getFields();
  30. for (int f = 0; f < fields.size(); f++) {
  31. RowType.RowField rowField = fields.get(f);
  32. String[] split = rowField.getName().split("\\$");
  33. String key = split[0];
  34. Integer idx = fieldIndex.get(key);
  35. //针对nginx request_body 二进制数据转义
  36. String json = value.getString(idx).toString().replaceAll("\\\\x", "\\\\\\\\x");
  37. JsonObject jsonObject = gson.fromJson(json, JsonObject.class);
  38. for (int i = 1; i < split.length; i++) {
  39. String k = split[i];
  40. if (i < split.length - 1) {
  41. jsonObject = jsonObject.get(k).getAsJsonObject();
  42. } else {
  43. if (Integer.class.isAssignableFrom(rowField.getType().getDefaultConversion())) {
  44. try {
  45. rowData.setField(f, jsonObject.get(k).getAsInt());
  46. } catch (Exception e) {
  47. }
  48. } else if (Float.class.isAssignableFrom(rowField.getType().getDefaultConversion())) {
  49. try {
  50. rowData.setField(f, jsonObject.get(k).getAsFloat());
  51. } catch (Exception e) {
  52. }
  53. } else if (Double.class.isAssignableFrom(rowField.getType().getDefaultConversion())) {
  54. try {
  55. rowData.setField(f, jsonObject.get(k).getAsDouble());
  56. } catch (Exception e) {
  57. }
  58. } else {
  59. String ret = jsonObject.get(k).toString();
  60. rowData.setField(f, new BinaryStringData(ret));
  61. }
  62. }
  63. }
  64. }
  65. out.collect(rowData);
  66. }
  67. }

 

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/煮酒与君饮/article/detail/921315
推荐阅读
相关标签
  

闽ICP备14008679号