赞
踩
Flink1.9开始,Flink提供了两个Table Planner实现来执行Table API和SQL程序:Blink Planner和Old Planner,Old Planner在1.9之前已经存在了。Planner的作用是将关系型操作翻译成可执行的、优化后的任务。两种Planner在优化规则和执行时类都不一样,在支持的功能上也有些差异。
所有的Table API和sql的代码都在flink-table或者flink-table-blink Maven artifats下
- flink-table-common: 公共模块,比如自定义函数、格式等需要依赖的。
- flink-table-api-java: Table 和 SQL API,使用 Java 语言编写的,给纯 table 程序使用(还在早期开发阶段,不建议使用)
- flink-table-api-scala: Table 和 SQL API,使用 Scala 语言编写的,给纯 table 程序使用(还在早期开发阶段,不建议使用)
- flink-table-api-java-bridge: Table 和 SQL API 结合 DataStream/DataSet API 一起使用,给 Java 语言使用。
- flink-table-api-scala-bridge: Table 和 SQL API 结合 DataStream/DataSet API 一起使用,给 Scala 语言使用。
- flink-table-planner: table Planner 和运行时。这是在1.9之前 Flink 的唯一的 Planner,但是从1.11版本开始我们不推荐继续使用。
- flink-table-planner-blink: 新的 Blink Planner,从1.11版本开始成为默认的 Planner。
- flink-table-runtime-blink: 新的 Blink 运行时。
- flink-table-uber: 把上述模块以及 Old Planner 打包到一起,可以在大部分Table & SQL API 场景下使用。打包到一起的 jar 文件 flink-table-*.jar 默认会直接放到 Flink 发行版的 /lib 目录下。
- flink-table-uber-blink: 把上述模块以及 Blink Planner 打包到一起,可以在大部分 Table & SQL API 场景下使用。打包到一起的 jar 文件 flink-table-blink-*.jar 默认会放到 Flink 发行版的 /lib 目录下。
Table程序的依赖
<!-- Either... -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.12.0</version>
<scope>provided</scope>
</dependency>
<!-- or... -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.12.0</version>
<scope>provided</scope>
</dependency>
如果你想在IDE本地运行程序,你需要添加下面的模块,具体用哪个取决于你使用哪个Planner
<!-- Either... (for the old planner that was available before Flink 1.9) -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.12.0</version>
<scope>provided</scope>
</dependency>
<!-- or.. (for the new Blink planner) -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.12.0</version>
<scope>provided</scope>
</dependency>
内部实现部分table功能是使用Scala编写的,所以下面依赖也有添加到程序中
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.12.0</version>
<scope>provided</scope>
</dependency>
如果你想实现自定义格式来解析Kafka数据,或者自定义函数,下面的依赖也足够了,编译出来的jar包可以直接给sql client使用:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.12.0</version>
<scope>provided</scope>
</dependency>
当前本模块包括以下扩展接口
SerializationSchemaFactory
DeserializationSchemaFactory
ScalarFunction
TableFunction
AggregateFunction
/** * 用于创建SerialItaseStudio的配置实例的工厂。 * * @param <T> 记录生成或者消费的什么格式 */ @PublicEvolving public interface SerializationSchemaFactory<T> extends TableFormatFactory<T> { /** * 使用给定的属性创建和配置一个SerializationSchema * * @param properties 描述格式的规范化属性 * @return 如果工厂类无法提供此类的实例,则返回配置的序列化架构或者null * of this class */ SerializationSchema<T> createSerializationSchema(Map<String, String> properties); }
/** * Factory for creating configured instances of {@link DeserializationSchema}. * * @param <T> record type that the format produces or consumes. */ @PublicEvolving public interface DeserializationSchemaFactory<T> extends TableFormatFactory<T> { /** * 使用给定的属性创建和配置一个DeserializationSchema * * @param properties 描述格式的规范化属性 * @return 如果工厂类无法提供此类的实例,则返回配置的序列化架构或者null * of this class */ DeserializationSchema<T> createDeserializationSchema(Map<String, String> properties); }
/** * 用户自定义Scala函数的基类,用户自定义的Scala函数将0个、1个或多个Scala值映射到新的Scala值 * * Scala函数实现的操作可以通过定义一个求值方法来实现。求值方法必须公开透明,并必须命名为<code>eval</code>。通过实现名为<code>eval</code>多个方法,也可以重载这个求职方法。 * * 默认情况下,使用反射自动提取和输入的数据类型。如果反射提供的信息不足,可以使用DataTypeHint和FunctionHint注解来支持需要的反射信息。 * * <p>The following examples show how to specify a scalar function: * * 计算两个int类型的参数并计算求和 * class SumFunction extends ScalarFunction { * public Integer eval(Integer a, Integer b) { * return a + b; * } * } * * 接收int not null或者boolean not null,使用函数计算后返回字符串的函数 * class StringifyFunction extends ScalarFunction { * public String eval(int i) { * return String.valueOf(i); * } * public String eval(boolean b) { * return String.valueOf(b); * } * } * * 接受int类型或者Boolean类型,并使用函数计算后返回一个字符串类型 * @FunctionHint(input = [@DataTypeHint("INT")]) * @FunctionHint(input = [@DataTypeHint("BOOLEAN")]) * class StringifyFunction extends ScalarFunction { * public String eval(Object o) { * return o.toString(); * } * } * * // a function that accepts any data type as argument and computes a STRING * class StringifyFunction extends ScalarFunction { * public String eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) { * return o.toString(); * } * } * * 一个函数接收任意数量的bigint值,并计算返回一个DECIMAL(10, 4)类型的值 * class SumFunction extends ScalarFunction { * public @DataTypeHint("DECIMAL(10, 4)") BigDecimal eval(Long... values) { * // ... * } * } * }</pre> * * 为了在catlog中存储用户自定义的函数,类必须具有默认的构造声明函数,并且在运行时是可以实例化的 */ @PublicEvolving public abstract class ScalarFunction extends UserDefinedFunction { /** * 返回一个具有指定结果类型的标志方法 * * 该方法使用旧类型的系统,基于旧的反射提取逻辑。不推荐使用该方法,并且该方法将在以后的版本中删除,仅在调用TableEnvironment.registerFunction(…)方法时使用。新的反射提取逻辑的功能足够强大,可以覆盖大多数的用例。对与高级用户,可以重写UserDefinedFunction#getTypeInference(DataTypeFactory)来满足需要的功能。 */ @Deprecated public TypeInformation<?> getResultType(Class<?>[] signature) { return null; } /** * 返回一个具有指定结果类型的标志方法 * * 该方法使用旧类型的系统,基于旧的反射提取逻辑。不推荐使用该方法,并且该方法将在以后的版本中删除, * 仅在调用TableEnvironment.registerFunction(…)方法时使用。新的反射提取逻辑的功能足够强大,可以覆盖大多数的用例。 * 对与高级用户,可以重写UserDefinedFunction#getTypeInference(DataTypeFactory)来满足需要的功能。 */ @Deprecated public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) { final TypeInformation<?>[] types = new TypeInformation<?>[signature.length]; for (int i = 0; i < signature.length; i++) { try { types[i] = TypeExtractor.getForClass(signature[i]); } catch (InvalidTypesException e) { throw new ValidationException( "Parameter types of scalar function " + this.getClass().getCanonicalName() + " cannot be automatically determined. Please provide type information manually."); } } return types; } @Override public final FunctionKind getKind() { return FunctionKind.SCALAR; } @Override public TypeInference getTypeInference(DataTypeFactory typeFactory) { return TypeInferenceExtractor.forScalarFunction(typeFactory, getClass()); } }
/** * 用户自定义函数的基类.用户定义的表函数将零个、一个或多个Scala函数值映射到零、一个或者多个行(或结构化类型)。如果输出记录只包含一个字段,则可以省略结构化记录,并且可以输出Scala值,该值将在运行时隐式包装到行中。 * * TableFunction的方法可以通过一个自定义的求值方法来实现。求职方法必须是公开透明的,而不是静态的,并命名为<code>eval</code>。 * 还可以通过实现多个名为<code>eval</code>的方法来重载求值的方法。 * * 默认情况下,使用反射自动提取输入和输出数据类型。这包括输出类型的泛型参数<T>。输入类型派生子一个或多个eval()方法。如果反射信息不够,可以通过使用DataTypeHint和FunctionHint注解获得支持。 * * 以下示例演示如何制定一个表函数 * * <pre>{@code * //指定一个函数,该函数接收任意数量int类型的参数,并将其作为隐式行<INT> * class FlattenFunction extends TableFunction<Integer> { * public void eval(Integer... args) { * for (Integer i : args) { * collect(i); * } * } * } * * // 一个函数接收int或者string类型任何一个参数,并将作为隐式行<STRING> * class DuplicatorFunction extends TableFunction<String> { * public void eval(Integer i) { * eval(String.valueOf(i)); * } * public void eval(String s) { * collect(s); * collect(s); * } * } * * // 从参数中生成一行<i INT,s STRING>的函数,这个函数有助于声明行中的字段 * @FunctionHint(output = @DataTypeHint("ROW< i INT, s STRING >")) * class DuplicatorFunction extends TableFunction<Row> { * public void eval(Integer i, String s) { * collect(Row.of(i, s)); * collect(Row.of(i, s)); * } * } * * // 一个函数接收INT或者DECIMAL(10,4),并使用函数提示声明输出类型,将他们作为隐式行<INT>或行<DECIMAL(10,4)>输出。 * class DuplicatorFunction extends TableFunction<Object> { * @FunctionHint(output = @DataTypeHint("INT")) * public void eval(Integer i) { * collect(i); * collect(i); * } * @FunctionHint(output = @DataTypeHint("DECIMAL(10, 4)")) * public void eval(@DataTypeHint("DECIMAL(10, 4)") BigDecimal d) { * collect(d); * collect(d); * } * } * }</pre> * * 为了保证用户自定义的函数可以保存在catlog中,类必须具有默认的构造函数,并且在运行时是可以实例化的。 * * 在API中,表函数的用法如下 * * <pre>{@code * public class Split extends TableFunction<String> { * * // 实现一个eval方法,包含你想要的任意多个参数 * public void eval(String str) { * for (String s : str.split(" ")) { * collect(s); // use collect(...) to emit an output row * } * } * * // 你可以在这里重载eval()方法. * } * * TableEnvironment tEnv = ... * Table table = ... // schema: ROW< a VARCHAR > * * // for Scala users * table.joinLateral(call(classOf[Split], $"a") as ("s")).select($"a", $"s") * * // for Java users * table.joinLateral(call(Split.class, $("a")).as("s")).select($("a"), $("s")); * * // for SQL users * tEnv.createTemporarySystemFunction("split", Split.class); // register table function first * tEnv.sqlQuery("SELECT a, s FROM MyTable, LATERAL TABLE(split(a)) as T(s)"); * }</pre> * * @param <T> 输出行的数据类型.显式复合类型或者一个字段组成的行中的原子类型 */ @PublicEvolving public abstract class TableFunction<T> extends UserDefinedFunction { /** The code generated collector used to emit rows. */ private Collector<T> collector; /** Internal use. Sets the current collector. */ public final void setCollector(Collector<T> collector) { this.collector = collector; } /** * 返回求值方法的结果类型 * * @deprecated 该方法采用旧类型系统,基于旧的反射提取逻辑。 * 该刚发将在以后的版中删除掉,并且只是使用在不推荐使用的TableEnvironment.registerFunction(...)方法时调用。 * 新的反射提取逻辑(可能包含了DataTypeHint}和FunctionHint})应该足够强大,可以覆盖大多数用例。 * 对于高级用户,可以重写UserDefinedFunction#getTypeInference(DataTypeFactory)来实现功能。 */ @Deprecated public TypeInformation<T> getResultType() { return null; } /** * Returns {@link TypeInformation} about the operands of the evaluation method with a given * signature. * * @deprecated 该方法采用旧类型系统,基于旧的反射提取逻辑。 * 该刚发将在以后的版中删除掉,并且只是使用在不推荐使用的TableEnvironment.registerFunction(...)方法时调用。 * 新的反射提取逻辑(可能包含了DataTypeHint}和FunctionHint})应该足够强大,可以覆盖大多数用例。 * 对于高级用户,可以重写UserDefinedFunction#getTypeInference(DataTypeFactory)来实现功能。 */ @Deprecated public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) { final TypeInformation<?>[] types = new TypeInformation<?>[signature.length]; for (int i = 0; i < signature.length; i++) { try { types[i] = TypeExtractor.getForClass(signature[i]); } catch (InvalidTypesException e) { throw new ValidationException( "Parameter types of table function " + this.getClass().getCanonicalName() + " cannot be automatically determined. Please provide type information manually."); } } return types; } /** * 发出(隐式或显式)一个输出行 * * 如果null作为显式行输出,则在运行时跳过他。对于隐式行,行的字段 * * @param 输出行 */ protected final void collect(T row) { collector.collect(row); } @Override public final FunctionKind getKind() { return FunctionKind.TABLE; } @Override @SuppressWarnings({"unchecked", "rawtypes"}) public TypeInference getTypeInference(DataTypeFactory typeFactory) { return TypeInferenceExtractor.forTableFunction(typeFactory, (Class) getClass()); } }
/** * * * <p>The behavior of an {@link AggregateFunction} is centered around the concept of an accumulator. * The accumulator is an intermediate data structure that stores the aggregated values until a final * aggregation result is computed. * * <p>For each set of rows that needs to be aggregated, the runtime will create an empty accumulator * by calling {@link #createAccumulator()}. Subsequently, the {@code accumulate()} method of the * function is called for each input row to update the accumulator. Once all rows have been * processed, the {@link #getValue(Object)} method of the function is called to compute and return * the final result. * * <p>The main behavior of an {@link AggregateFunction} can be defined by implementing a custom * accumulate method. An accumulate method must be declared publicly, not static, and named <code> * accumulate</code>. Accumulate methods can also be overloaded by implementing multiple methods * named <code>accumulate</code>. * * <p>By default, input, accumulator, and output data types are automatically extracted using * reflection. This includes the generic argument {@code ACC} of the class for determining an * accumulator data type and the generic argument {@code T} for determining an accumulator data * type. Input arguments are derived from one or more {@code accumulate()} methods. If the * reflective information is not sufficient, it can be supported and enriched with {@link * DataTypeHint} and {@link FunctionHint} annotations. * * <p>An {@link AggregateFunction} needs at least three methods: * * <ul> * <li>{@code createAccumulator} * <li>{@code accumulate} * <li>{@code getValue} * </ul> * * <p>There are a few other methods that are optional: * * <ul> * <li>{@code retract} * <li>{@code merge} * </ul> * * <p>All these methods must be declared publicly, not static, and named exactly as the names * mentioned above to be called by generated code. * * <p>For storing a user-defined function in a catalog, the class must have a default constructor * and must be instantiable during runtime. * * <pre>{@code * Processes the input values and updates the provided accumulator instance. The method * accumulate can be overloaded with different custom types and arguments. An aggregate function * requires at least one accumulate() method. * * param: accumulator the accumulator which contains the current aggregated results * param: [user defined inputs] the input value (usually obtained from new arrived data). * * public void accumulate(ACC accumulator, [user defined inputs]) * }</pre> * * <pre>{@code * Retracts the input values from the accumulator instance. The current design assumes the * inputs are the values that have been previously accumulated. The method retract can be * overloaded with different custom types and arguments. This method must be implemented for * bounded OVER aggregates over unbounded tables. * * param: accumulator the accumulator which contains the current aggregated results * param: [user defined inputs] the input value (usually obtained from new arrived data). * * public void retract(ACC accumulator, [user defined inputs]) * }</pre> * * <pre>{@code * Merges a group of accumulator instances into one accumulator instance. This method must be * implemented for unbounded session window and hop window grouping aggregates and * bounded grouping aggregates. Besides, implementing this method will be helpful for optimizations. * For example, two phase aggregation optimization requires all the {@link AggregateFunction}s * support "merge" method. * * param: accumulator the accumulator which will keep the merged aggregate results. It should * be noted that the accumulator may contain the previous aggregated * results. Therefore user should not replace or clean this instance in the * custom merge method. * param: iterable an java.lang.Iterable pointed to a group of accumulators that will be * merged. * * public void merge(ACC accumulator, java.lang.Iterable<ACC> iterable) * }</pre> * * <p>If this aggregate function can only be applied in an OVER window, this can be declared by * returning the requirement {@link FunctionRequirement#OVER_WINDOW_ONLY} in {@link * #getRequirements()}. * * <p>If an accumulator needs to store large amounts of data, {@link ListView} and {@link MapView} * provide advanced features for leveraging Flink's state backends in unbounded data scenarios. * * <p>The following examples show how to specify an aggregate function: * * <pre>{@code * // a function that counts STRING arguments that are not null and emits them as STRING * // the accumulator is BIGINT * public static class CountFunction extends AggregateFunction<String, CountFunction.MyAccumulator> { * public static class MyAccumulator { * public long count = 0L; * } * * {@literal @}Override * public MyAccumulator createAccumulator() { * return new MyAccumulator(); * } * * public void accumulate(MyAccumulator accumulator, Integer i) { * if (i != null) { * accumulator.count += i; * } * } * * {@literal @}Override * public String getValue(MyAccumulator accumulator) { * return "Result: " + accumulator.count; * } * } * * // a function that determines the maximum of either BIGINT or STRING arguments * // the accumulator and the output is either BIGINT or STRING * public static class MaxFunction extends AggregateFunction<Object, Row> { * {@literal @}Override * public Row createAccumulator() { * return new Row(1); * } * * {@literal @}FunctionHint( * accumulator = {@literal @}DataTypeHint("ROW<max BIGINT>"), * output = {@literal @}DataTypeHint("BIGINT") * ) * public void accumulate(Row accumulator, Long l) { * final Long max = (Long) accumulator.getField(0); * if (max == null || l > max) { * accumulator.setField(0, l); * } * } * * {@literal @}FunctionHint( * accumulator = {@literal @}DataTypeHint("ROW<max STRING>"), * output = {@literal @}DataTypeHint("STRING") * ) * public void accumulate(Row accumulator, String s) { * final String max = (String) accumulator.getField(0); * if (max == null || s.compareTo(max) > 0) { * accumulator.setField(0, s); * } * } * * {@literal @}Override * public Object getValue(Row accumulator) { * return accumulator.getField(0); * } * } * }</pre> * * @param <T> final result type of the aggregation * @param <ACC> intermediate result type during the aggregation */ @PublicEvolving public abstract class AggregateFunction<T, ACC> extends ImperativeAggregateFunction<T, ACC> { /** * Called every time when an aggregation result should be materialized. The returned value could * be either an early and incomplete result (periodically emitted as data arrives) or the final * result of the aggregation. * * @param accumulator the accumulator which contains the current intermediate results * @return the aggregation result */ public abstract T getValue(ACC accumulator); @Override public final FunctionKind getKind() { return FunctionKind.AGGREGATE; } @Override @SuppressWarnings({"unchecked", "rawtypes"}) public TypeInference getTypeInference(DataTypeFactory typeFactory) { return TypeInferenceExtractor.forAggregateFunction(typeFactory, (Class) getClass()); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。