赞
踩
- /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
- package org.apache.flink.streaming.connectors.kafka.table;
-
- import org.apache.flink.annotation.Internal;
- import org.apache.flink.api.common.serialization.DeserializationSchema;
- import org.apache.flink.api.common.serialization.SerializationSchema;
- import org.apache.flink.configuration.ConfigOption;
- import org.apache.flink.configuration.ConfigOptions;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.configuration.ReadableConfig;
- import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
- import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
- import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
- import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
- import org.apache.flink.table.api.ValidationException;
- import org.apache.flink.table.catalog.CatalogTable;
- import org.apache.flink.table.catalog.ObjectIdentifier;
- import org.apache.flink.table.connector.format.DecodingFormat;
- import org.apache.flink.table.connector.format.EncodingFormat;
- import org.apache.flink.table.connector.format.Format;
- import org.apache.flink.table.connector.sink.DynamicTableSink;
- import org.apache.flink.table.connector.source.DynamicTableSource;
- import org.apache.flink.table.data.RowData;
- import org.apache.flink.table.factories.DeserializationFormatFactory;
- import org.apache.flink.table.factories.DynamicTableSinkFactory;
- import org.apache.flink.table.factories.DynamicTableSourceFactory;
- import org.apache.flink.table.factories.FactoryUtil;
- import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
- import org.apache.flink.table.factories.SerializationFormatFactory;
- import org.apache.flink.table.types.DataType;
- import org.apache.flink.types.RowKind;
-
- import javax.annotation.Nullable;
-
- import java.time.Duration;
- import java.util.HashSet;
- import java.util.List;
- import java.util.Map;
- import java.util.Optional;
- import java.util.Properties;
- import java.util.Set;
- import java.util.regex.Pattern;
-
- import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.KEY_FIELDS;
- import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.KEY_FIELDS_PREFIX;
- import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.KEY_FORMAT;
- import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPERTIES_PREFIX;
- import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_BOOTSTRAP_SERVERS;
- import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_GROUP_ID;
- import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_STARTUP_MODE;
- import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
- import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
- import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_TOPIC_PARTITION_DISCOVERY;
- import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SINK_PARTITIONER;
- import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SINK_SEMANTIC;
- import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.StartupOptions;
- import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.TOPIC;
- import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.TOPIC_PATTERN;
- import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.VALUE_FIELDS_INCLUDE;
- import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.VALUE_FORMAT;
- import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.createKeyFormatProjection;
- import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.createValueFormatProjection;
- import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getFlinkKafkaPartitioner;
- import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getKafkaProperties;
- import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getSinkSemantic;
- import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getStartupOptions;
- import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.validateTableSinkOptions;
- import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.validateTableSourceOptions;
- import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
-
-
- @Internal
- public class KafkaDynamicTableFactory
- implements DynamicTableSourceFactory, DynamicTableSinkFactory {
- public static final ConfigOption<Integer> SOURCE_PARALLELISM =
- ConfigOptions.key("source.parallelism")
- .intType()
- .noDefaultValue()
- .withDescription(
- "Defines a custom parallelism for the source. "
- + "By default, if this option is not defined, the planner will derive the parallelism "
- + "for each statement individually by also considering the global configuration.");
-
- public static final String IDENTIFIER = "kafka";
-
- @Override
- public String factoryIdentifier() {
- return IDENTIFIER;
- }
-
- @Override
- public Set<ConfigOption<?>> requiredOptions() {
- final Set<ConfigOption<?>> options = new HashSet<>();
- options.add(PROPS_BOOTSTRAP_SERVERS);
- return options;
- }
-
- @Override
- public Set<ConfigOption<?>> optionalOptions() {
- final Set<ConfigOption<?>> options = new HashSet<>();
- options.add(FactoryUtil.FORMAT);
- options.add(KEY_FORMAT);
- options.add(KEY_FIELDS);
- options.add(KEY_FIELDS_PREFIX);
- options.add(VALUE_FORMAT);
- options.add(VALUE_FIELDS_INCLUDE);
- options.add(TOPIC);
- options.add(TOPIC_PATTERN);
- options.add(PROPS_GROUP_ID);
- options.add(SCAN_STARTUP_MODE);
- options.add(SCAN_STARTUP_SPECIFIC_OFFSETS);
- options.add(SCAN_TOPIC_PARTITION_DISCOVERY);
- options.add(SCAN_STARTUP_TIMESTAMP_MILLIS);
- options.add(SINK_PARTITIONER);
- options.add(SINK_SEMANTIC);
- options.add(SINK_PARALLELISM);
- options.add(SOURCE_PARALLELISM);
- return options;
- }
-
- @Override
- public DynamicTableSource createDynamicTableSource(Context context) {
- final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
-
- final ReadableConfig tableOptions = helper.getOptions();
-
- final Optional<DecodingFormat<DeserializationSchema<RowData>>> keyDecodingFormat =
- getKeyDecodingFormat(helper);
-
- final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat =
- getValueDecodingFormat(helper);
-
- helper.validateExcept(PROPERTIES_PREFIX);
-
- validateTableSourceOptions(tableOptions);
-
- validatePKConstraints(
- context.getObjectIdentifier(), context.getCatalogTable(), valueDecodingFormat);
-
- final StartupOptions startupOptions = getStartupOptions(tableOptions);
-
- final Properties properties = getKafkaProperties(context.getCatalogTable().getOptions());
-
- // add topic-partition discovery
- properties.setProperty(
- FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
- String.valueOf(
- tableOptions
- .getOptional(SCAN_TOPIC_PARTITION_DISCOVERY)
- .map(Duration::toMillis)
- .orElse(FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED)));
-
- final DataType physicalDataType =
- context.getCatalogTable().getSchema().toPhysicalRowDataType();
-
- final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType);
-
- final int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType);
-
- final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
-
- return createKafkaTableSource(
- physicalDataType,
- keyDecodingFormat.orElse(null),
- valueDecodingFormat,
- keyProjection,
- valueProjection,
- keyPrefix,
- KafkaOptions.getSourceTopics(tableOptions),
- KafkaOptions.getSourceTopicPattern(tableOptions),
- properties,
- startupOptions.startupMode,
- startupOptions.specificOffsets,
- startupOptions.startupTimestampMillis);
- }
-
- @Override
- public DynamicTableSink createDynamicTableSink(Context context) {
- final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
-
- final ReadableConfig tableOptions = helper.getOptions();
-
- final Optional<EncodingFormat<SerializationSchema<RowData>>> keyEncodingFormat =
- getKeyEncodingFormat(helper);
-
- final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat =
- getValueEncodingFormat(helper);
-
- helper.validateExcept(PROPERTIES_PREFIX);
-
- validateTableSinkOptions(tableOptions);
-
- validatePKConstraints(
- context.getObjectIdentifier(), context.getCatalogTable(), valueEncodingFormat);
-
- final DataType physicalDataType =
- context.getCatalogTable().getSchema().toPhysicalRowDataType();
-
- final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType);
-
- final int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType);
-
- final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
-
- final Integer parallelism = tableOptions.getOptional(SINK_PARALLELISM).orElse(null);
-
- return createKafkaTableSink(
- physicalDataType,
- keyEncodingFormat.orElse(null),
- valueEncodingFormat,
- keyProjection,
- valueProjection,
- keyPrefix,
- tableOptions.get(TOPIC).get(0),
- getKafkaProperties(context.getCatalogTable().getOptions()),
- getFlinkKafkaPartitioner(tableOptions, context.getClassLoader()).orElse(null),
- getSinkSemantic(tableOptions),
- parallelism);
- }
-
- // --------------------------------------------------------------------------------------------
-
- private static Optional<DecodingFormat<DeserializationSchema<RowData>>> getKeyDecodingFormat(
- TableFactoryHelper helper) {
- final Optional<DecodingFormat<DeserializationSchema<RowData>>> keyDecodingFormat =
- helper.discoverOptionalDecodingFormat(
- DeserializationFormatFactory.class, KEY_FORMAT);
- keyDecodingFormat.ifPresent(
- format -> {
- if (!format.getChangelogMode().containsOnly(RowKind.INSERT)) {
- throw new ValidationException(
- String.format(
- "A key format should only deal with INSERT-only records. "
- + "But %s has a changelog mode of %s.",
- helper.getOptions().get(KEY_FORMAT),
- format.getChangelogMode()));
- }
- });
- return keyDecodingFormat;
- }
-
- private static Optional<EncodingFormat<SerializationSchema<RowData>>> getKeyEncodingFormat(
- TableFactoryHelper helper) {
- final Optional<EncodingFormat<SerializationSchema<RowData>>> keyEncodingFormat =
- helper.discoverOptionalEncodingFormat(SerializationFormatFactory.class, KEY_FORMAT);
- keyEncodingFormat.ifPresent(
- format -> {
- if (!format.getChangelogMode().containsOnly(RowKind.INSERT)) {
- throw new ValidationException(
- String.format(
- "A key format should only deal with INSERT-only records. "
- + "But %s has a changelog mode of %s.",
- helper.getOptions().get(KEY_FORMAT),
- format.getChangelogMode()));
- }
- });
- return keyEncodingFormat;
- }
-
- private static DecodingFormat<DeserializationSchema<RowData>> getValueDecodingFormat(
- TableFactoryHelper helper) {
- return helper.discoverOptionalDecodingFormat(
- DeserializationFormatFactory.class, FactoryUtil.FORMAT)
- .orElseGet(
- () ->
- helper.discoverDecodingFormat(
- DeserializationFormatFactory.class, VALUE_FORMAT));
- }
-
- private static EncodingFormat<SerializationSchema<RowData>> getValueEncodingFormat(
- TableFactoryHelper helper) {
- return helper.discoverOptionalEncodingFormat(
- SerializationFormatFactory.class, FactoryUtil.FORMAT)
- .orElseGet(
- () ->
- helper.discoverEncodingFormat(
- SerializationFormatFactory.class, VALUE_FORMAT));
- }
-
- private static void validatePKConstraints(
- ObjectIdentifier tableName, CatalogTable catalogTable, Format format) {
- if (catalogTable.getSchema().getPrimaryKey().isPresent()
- && format.getChangelogMode().containsOnly(RowKind.INSERT)) {
- Configuration options = Configuration.fromMap(catalogTable.getOptions());
- String formatName =
- options.getOptional(FactoryUtil.FORMAT).orElse(options.get(VALUE_FORMAT));
- throw new ValidationException(
- String.format(
- "The Kafka table '%s' with '%s' format doesn't support defining PRIMARY KEY constraint"
- + " on the table, because it can't guarantee the semantic of primary key.",
- tableName.asSummaryString(), formatName));
- }
- }
-
- // --------------------------------------------------------------------------------------------
-
- protected KafkaDynamicSource createKafkaTableSource(
- DataType physicalDataType,
- @Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat,
- DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
- int[] keyProjection,
- int[] valueProjection,
- @Nullable String keyPrefix,
- @Nullable List<String> topics,
- @Nullable Pattern topicPattern,
- Properties properties,
- StartupMode startupMode,
- Map<KafkaTopicPartition, Long> specificStartupOffsets,
- long startupTimestampMillis) {
- return new KafkaDynamicSource(
- physicalDataType,
- keyDecodingFormat,
- valueDecodingFormat,
- keyProjection,
- valueProjection,
- keyPrefix,
- topics,
- topicPattern,
- properties,
- startupMode,
- specificStartupOffsets,
- startupTimestampMillis,
- false);
- }
-
- protected KafkaDynamicSink createKafkaTableSink(
- DataType physicalDataType,
- @Nullable EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat,
- EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat,
- int[] keyProjection,
- int[] valueProjection,
- @Nullable String keyPrefix,
- String topic,
- Properties properties,
- FlinkKafkaPartitioner<RowData> partitioner,
- KafkaSinkSemantic semantic,
- Integer parallelism) {
- return new KafkaDynamicSink(
- physicalDataType,
- keyEncodingFormat,
- valueEncodingFormat,
- keyProjection,
- valueProjection,
- keyPrefix,
- topic,
- properties,
- partitioner,
- semantic,
- false,
- parallelism);
- }
- }
-
- //
- /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
- package org.apache.flink.table.planner.plan.nodes.common
-
- import org.apache.flink.api.common.eventtime.WatermarkStrategy
- import org.apache.flink.api.common.io.InputFormat
- import org.apache.flink.api.dag.Transformation
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
- import org.apache.flink.table.connector.source.{DataStreamScanProvider, InputFormatProvider, ScanTableSource, SourceFunctionProvider, SourceProvider}
- import org.apache.flink.table.data.RowData
- import org.apache.flink.table.planner.calcite.FlinkTypeFactory
- import org.apache.flink.table.planner.plan.schema.TableSourceTable
- import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext
- import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
-
- import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
- import org.apache.calcite.rel.RelWriter
- import org.apache.calcite.rel.`type`.RelDataType
- import org.apache.calcite.rel.core.TableScan
-
- import scala.collection.JavaConverters._
-
- /**
- * Base physical RelNode to read data from an external source defined by a [[ScanTableSource]].
- */
- abstract class CommonPhysicalTableSourceScan(
- cluster: RelOptCluster,
- traitSet: RelTraitSet,
- relOptTable: TableSourceTable)
- extends TableScan(cluster, traitSet, relOptTable) {
-
- // cache table source transformation.
- protected var sourceTransform: Transformation[_] = _
-
- protected val tableSourceTable: TableSourceTable = relOptTable.unwrap(classOf[TableSourceTable])
-
- protected[flink] val tableSource: ScanTableSource =
- tableSourceTable.tableSource.asInstanceOf[ScanTableSource]
-
- override def deriveRowType(): RelDataType = {
- // TableScan row type should always keep same with its
- // interval RelOptTable's row type.
- relOptTable.getRowType
- }
-
- override def explainTerms(pw: RelWriter): RelWriter = {
- super.explainTerms(pw)
- .item("fields", getRowType.getFieldNames.asScala.mkString(", "))
- }
-
- protected def createSourceTransformation(
- env: StreamExecutionEnvironment,
- name: String): Transformation[RowData] = {
- val runtimeProvider = tableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE)
- val outRowType = FlinkTypeFactory.toLogicalRowType(tableSourceTable.getRowType)
- val outTypeInfo = InternalTypeInfo.of(outRowType)
-
- val transformation = runtimeProvider match {
- case provider: SourceFunctionProvider =>
- val sourceFunction = provider.createSourceFunction()
- env
- .addSource(sourceFunction, name, outTypeInfo)
- .getTransformation
- case provider: InputFormatProvider =>
- val inputFormat = provider.createInputFormat()
- createInputFormatTransformation(env, inputFormat, name, outTypeInfo)
- case provider: SourceProvider =>
- // TODO: Push down watermark strategy to source scan
- val strategy: WatermarkStrategy[RowData] = WatermarkStrategy.noWatermarks()
- env.fromSource(provider.createSource(), strategy, name).getTransformation
- case provider: DataStreamScanProvider =>
- provider.produceDataStream(env).getTransformation
- }
- val parallelism = tableSourceTable.catalogTable.toProperties.get("source.parallelism")
- if(parallelism != null){
- transformation.setParallelism(parallelism.toInt)
- }
- transformation
- }
-
- /**
- * Creates a [[Transformation]] based on the given [[InputFormat]].
- * The implementation is different for streaming mode and batch mode.
- */
- protected def createInputFormatTransformation(
- env: StreamExecutionEnvironment,
- inputFormat: InputFormat[RowData, _],
- name: String,
- outTypeInfo: InternalTypeInfo[RowData]): Transformation[RowData]
- }

需要覆盖的类有2个:KafkaDynamicTableFactory,CommonPhysicalTableSourceScan
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。