赞
踩
最近要把公司flink nginx 日志入库的代码,修改成flink sql的方式执行。这样日志的入库通过sql更加方便。
但是flink sql没有提供对json的解析方式,而nginx日志又是json字符串无法完全使用sql来完成日志的入库。
因此,需要修改flink来完成json的自定义解析。
首先
定义source表
- CREATE TABLE kafka (
- json_data string,
- type string, ts TIMESTAMP(3) METADATA FROM 'timestamp'
- ) WITH (
- 'connector' = 'kafka',
- 'topic' = 'nginxlog',
- 'properties.bootstrap.servers' = '127.0.0.1:9092',
- 'properties.group.id' = 'flink-kafka',
- 'scan.startup.mode' = 'earliest-offset',
- 'format' = 'json'
- )
定义sink表
- CREATE TABLE hdfs (
- request_url STRING,
- resp_code INT,
- ...
- ) WITH (
- 'connector' = 'filesystem',
- 'path' = 'file:///path/to/whatever',
- 'format' = 'orc',
- ...
- )
定义一个json解析的表(涉及方法,后面有实现)
- 'table' = 'kafka'读取的数据表
- 'stream' = 'org.apache.flink.streaming.JsonStreamTableProcessFunction' json解析类
- json_data$reuest_url 为
- {
- "json_data": {
- "reuest_url": "http://www.baidu.com"
- }
- }
- StreamTableUtil.createStreamTable(fsTableEnv, "CREATE TABLE to_json (\n" +
- " json_data$reuest_url STRING,\n" +
- " json_data$resp_code INT\n" +
- ") WITH (\n" +
- " 'table' = 'kafka',\n" +
- " 'stream' = 'org.apache.flink.streaming.JsonStreamTableProcessFunction'\n" +
- ")");
执行入库sql
insert into hdfs select * from to_json
下面是createStreamTable的实现
- package org.apache.flink.streaming;
-
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.table.api.ApiExpression;
- import org.apache.flink.table.api.Table;
- import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
- import org.apache.flink.table.api.internal.TableEnvironmentInternal;
- import org.apache.flink.table.catalog.CatalogTable;
- import org.apache.flink.table.data.RowData;
- import org.apache.flink.table.delegation.Parser;
- import org.apache.flink.table.operations.Operation;
- import org.apache.flink.table.operations.ddl.CreateTableOperation;
- import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
- import org.apache.flink.table.types.logical.RowType;
-
- import java.lang.reflect.Constructor;
- import java.util.List;
- import java.util.Map;
- import java.util.UUID;
- import java.util.stream.Collectors;
-
- import static org.apache.flink.table.api.Expressions.$;
-
- public final class StreamTableUtil {
-
- private StreamTableUtil() {
-
- }
-
- public static void createStreamTable(StreamTableEnvironment fsTableEnv, String sql) throws Exception {
- TableEnvironmentInternal tableEnvironment = (TableEnvironmentInternal) fsTableEnv;
- Parser parser = tableEnvironment.getParser();
- List<Operation> parse = parser.parse(sql);
- CreateTableOperation operation = (CreateTableOperation) parse.get(0);
- String tableName = operation.getTableIdentifier().getObjectName();
- CatalogTable catalogTable = operation.getCatalogTable();
-
- //create tmp table
- String tmpTable = "tmp" + UUID.randomUUID().toString().replaceAll("\\-", "");
- fsTableEnv.executeSql(sql.replace(tableName, tmpTable));
-
- Map<String, String> properties = catalogTable.getOptions();
- String fromTable = properties.get("table");
- Class<? extends StreamTableProcessFunction> streamClass = (Class<? extends StreamTableProcessFunction>) Class.forName(properties.get("stream"));
- createStreamTable(fsTableEnv, fromTable, catalogTable, tableName, streamClass);
- }
-
- private static void createStreamTable(StreamTableEnvironment fsTableEnv, String fromTable, CatalogTable toCatalogTable, String toTable, Class<? extends StreamTableProcessFunction> streamClass) throws Exception {
- Table fromCatalogTable = fsTableEnv.sqlQuery("select * from " + fromTable);
- DataStream<RowData> stream = fsTableEnv.toAppendStream(fromCatalogTable, RowData.class);
- RowType fromTableRowType = new RowType(fromCatalogTable.getSchema().getTableColumns().stream()
- .map(e -> new RowType.RowField(e.getName(), e.getType().getLogicalType(), null))
- .collect(Collectors.toList()));
- RowType toTableRowType = new RowType(toCatalogTable.getSchema().getTableColumns().stream()
- .map(e -> new RowType.RowField(e.getName(), e.getType().getLogicalType(), null))
- .collect(Collectors.toList()));
- Constructor<? extends StreamTableProcessFunction> constructor = streamClass.getDeclaredConstructor(RowType.class, RowType.class);
- InternalTypeInfo<RowData> internalTypeInfo = InternalTypeInfo.of(toTableRowType);
- DataStream<RowData> out = stream.process(constructor.newInstance(fromTableRowType, toTableRowType), internalTypeInfo);
- List<ApiExpression> apiExpressionList = toTableRowType.getFieldNames().stream().map(e -> $(e)).collect(Collectors.toList());
- fsTableEnv.createTemporaryView(toTable, out, apiExpressionList.toArray(new ApiExpression[0]));
- }
- }

定义stream table处理的父类
- package org.apache.flink.streaming;
-
- import org.apache.flink.streaming.api.functions.ProcessFunction;
- import org.apache.flink.table.data.RowData;
- import org.apache.flink.table.types.logical.RowType;
-
- public abstract class StreamTableProcessFunction extends ProcessFunction<RowData, RowData> {
-
- private RowType fromTableRowType;
- private RowType toTableRowType;
-
- public RowType getFromTableRowType() {
- return fromTableRowType;
- }
-
- public RowType getToTableRowType() {
- return toTableRowType;
- }
-
- public StreamTableProcessFunction(RowType fromTableRowType, RowType toTableRowType) {
- this.fromTableRowType = fromTableRowType;
- this.toTableRowType = toTableRowType;
- }
- }

实现json解析的子类
- package org.apache.flink.streaming;
-
- import com.google.gson.Gson;
- import com.google.gson.JsonObject;
- import com.google.gson.JsonPrimitive;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.table.data.GenericRowData;
- import org.apache.flink.table.data.RowData;
- import org.apache.flink.table.data.binary.BinaryStringData;
- import org.apache.flink.table.types.logical.RowType;
- import org.apache.flink.util.Collector;
-
- import java.util.*;
-
- public class JsonStreamTableProcessFunction extends StreamTableProcessFunction {
-
- private Gson gson;
-
- public JsonStreamTableProcessFunction(RowType fromTableRowType, RowType toTableRowType) {
- super(fromTableRowType, toTableRowType);
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- gson = new Gson();
- }
-
- @Override
- public void processElement(RowData value, Context ctx, Collector<RowData> out) throws Exception {
- Map<String, Integer> fieldIndex = new LinkedHashMap<>();
- for (int i = 0; i < getFromTableRowType().getFields().size(); i++) {
- RowType.RowField rowField = getFromTableRowType().getFields().get(i);
- fieldIndex.put(rowField.getName(), i);
- }
- GenericRowData rowData = new GenericRowData(getToTableRowType().getFieldCount());
- List<RowType.RowField> fields = getToTableRowType().getFields();
- for (int f = 0; f < fields.size(); f++) {
- RowType.RowField rowField = fields.get(f);
- String[] split = rowField.getName().split("\\$");
- String key = split[0];
- Integer idx = fieldIndex.get(key);
- //针对nginx request_body 二进制数据转义
- String json = value.getString(idx).toString().replaceAll("\\\\x", "\\\\\\\\x");
- JsonObject jsonObject = gson.fromJson(json, JsonObject.class);
- for (int i = 1; i < split.length; i++) {
- String k = split[i];
- if (i < split.length - 1) {
- jsonObject = jsonObject.get(k).getAsJsonObject();
- } else {
- if (Integer.class.isAssignableFrom(rowField.getType().getDefaultConversion())) {
- try {
- rowData.setField(f, jsonObject.get(k).getAsInt());
- } catch (Exception e) {
- }
- } else if (Float.class.isAssignableFrom(rowField.getType().getDefaultConversion())) {
- try {
- rowData.setField(f, jsonObject.get(k).getAsFloat());
- } catch (Exception e) {
- }
- } else if (Double.class.isAssignableFrom(rowField.getType().getDefaultConversion())) {
- try {
- rowData.setField(f, jsonObject.get(k).getAsDouble());
- } catch (Exception e) {
- }
- } else {
- String ret = jsonObject.get(k).toString();
- rowData.setField(f, new BinaryStringData(ret));
- }
- }
- }
- }
- out.collect(rowData);
- }
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。