当前位置:   article > 正文

Flink kafka source定义并行度_the kafka table with 'json' format doesn't support

the kafka table with 'json' format doesn't support defining
  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. package org.apache.flink.streaming.connectors.kafka.table;
  19. import org.apache.flink.annotation.Internal;
  20. import org.apache.flink.api.common.serialization.DeserializationSchema;
  21. import org.apache.flink.api.common.serialization.SerializationSchema;
  22. import org.apache.flink.configuration.ConfigOption;
  23. import org.apache.flink.configuration.ConfigOptions;
  24. import org.apache.flink.configuration.Configuration;
  25. import org.apache.flink.configuration.ReadableConfig;
  26. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
  27. import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
  28. import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
  29. import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
  30. import org.apache.flink.table.api.ValidationException;
  31. import org.apache.flink.table.catalog.CatalogTable;
  32. import org.apache.flink.table.catalog.ObjectIdentifier;
  33. import org.apache.flink.table.connector.format.DecodingFormat;
  34. import org.apache.flink.table.connector.format.EncodingFormat;
  35. import org.apache.flink.table.connector.format.Format;
  36. import org.apache.flink.table.connector.sink.DynamicTableSink;
  37. import org.apache.flink.table.connector.source.DynamicTableSource;
  38. import org.apache.flink.table.data.RowData;
  39. import org.apache.flink.table.factories.DeserializationFormatFactory;
  40. import org.apache.flink.table.factories.DynamicTableSinkFactory;
  41. import org.apache.flink.table.factories.DynamicTableSourceFactory;
  42. import org.apache.flink.table.factories.FactoryUtil;
  43. import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
  44. import org.apache.flink.table.factories.SerializationFormatFactory;
  45. import org.apache.flink.table.types.DataType;
  46. import org.apache.flink.types.RowKind;
  47. import javax.annotation.Nullable;
  48. import java.time.Duration;
  49. import java.util.HashSet;
  50. import java.util.List;
  51. import java.util.Map;
  52. import java.util.Optional;
  53. import java.util.Properties;
  54. import java.util.Set;
  55. import java.util.regex.Pattern;
  56. import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.KEY_FIELDS;
  57. import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.KEY_FIELDS_PREFIX;
  58. import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.KEY_FORMAT;
  59. import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPERTIES_PREFIX;
  60. import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_BOOTSTRAP_SERVERS;
  61. import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_GROUP_ID;
  62. import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_STARTUP_MODE;
  63. import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
  64. import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
  65. import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_TOPIC_PARTITION_DISCOVERY;
  66. import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SINK_PARTITIONER;
  67. import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SINK_SEMANTIC;
  68. import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.StartupOptions;
  69. import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.TOPIC;
  70. import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.TOPIC_PATTERN;
  71. import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.VALUE_FIELDS_INCLUDE;
  72. import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.VALUE_FORMAT;
  73. import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.createKeyFormatProjection;
  74. import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.createValueFormatProjection;
  75. import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getFlinkKafkaPartitioner;
  76. import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getKafkaProperties;
  77. import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getSinkSemantic;
  78. import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getStartupOptions;
  79. import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.validateTableSinkOptions;
  80. import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.validateTableSourceOptions;
  81. import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
  82. @Internal
  83. public class KafkaDynamicTableFactory
  84. implements DynamicTableSourceFactory, DynamicTableSinkFactory {
  85. public static final ConfigOption<Integer> SOURCE_PARALLELISM =
  86. ConfigOptions.key("source.parallelism")
  87. .intType()
  88. .noDefaultValue()
  89. .withDescription(
  90. "Defines a custom parallelism for the source. "
  91. + "By default, if this option is not defined, the planner will derive the parallelism "
  92. + "for each statement individually by also considering the global configuration.");
  93. public static final String IDENTIFIER = "kafka";
  94. @Override
  95. public String factoryIdentifier() {
  96. return IDENTIFIER;
  97. }
  98. @Override
  99. public Set<ConfigOption<?>> requiredOptions() {
  100. final Set<ConfigOption<?>> options = new HashSet<>();
  101. options.add(PROPS_BOOTSTRAP_SERVERS);
  102. return options;
  103. }
  104. @Override
  105. public Set<ConfigOption<?>> optionalOptions() {
  106. final Set<ConfigOption<?>> options = new HashSet<>();
  107. options.add(FactoryUtil.FORMAT);
  108. options.add(KEY_FORMAT);
  109. options.add(KEY_FIELDS);
  110. options.add(KEY_FIELDS_PREFIX);
  111. options.add(VALUE_FORMAT);
  112. options.add(VALUE_FIELDS_INCLUDE);
  113. options.add(TOPIC);
  114. options.add(TOPIC_PATTERN);
  115. options.add(PROPS_GROUP_ID);
  116. options.add(SCAN_STARTUP_MODE);
  117. options.add(SCAN_STARTUP_SPECIFIC_OFFSETS);
  118. options.add(SCAN_TOPIC_PARTITION_DISCOVERY);
  119. options.add(SCAN_STARTUP_TIMESTAMP_MILLIS);
  120. options.add(SINK_PARTITIONER);
  121. options.add(SINK_SEMANTIC);
  122. options.add(SINK_PARALLELISM);
  123. options.add(SOURCE_PARALLELISM);
  124. return options;
  125. }
  126. @Override
  127. public DynamicTableSource createDynamicTableSource(Context context) {
  128. final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
  129. final ReadableConfig tableOptions = helper.getOptions();
  130. final Optional<DecodingFormat<DeserializationSchema<RowData>>> keyDecodingFormat =
  131. getKeyDecodingFormat(helper);
  132. final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat =
  133. getValueDecodingFormat(helper);
  134. helper.validateExcept(PROPERTIES_PREFIX);
  135. validateTableSourceOptions(tableOptions);
  136. validatePKConstraints(
  137. context.getObjectIdentifier(), context.getCatalogTable(), valueDecodingFormat);
  138. final StartupOptions startupOptions = getStartupOptions(tableOptions);
  139. final Properties properties = getKafkaProperties(context.getCatalogTable().getOptions());
  140. // add topic-partition discovery
  141. properties.setProperty(
  142. FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
  143. String.valueOf(
  144. tableOptions
  145. .getOptional(SCAN_TOPIC_PARTITION_DISCOVERY)
  146. .map(Duration::toMillis)
  147. .orElse(FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED)));
  148. final DataType physicalDataType =
  149. context.getCatalogTable().getSchema().toPhysicalRowDataType();
  150. final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType);
  151. final int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType);
  152. final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
  153. return createKafkaTableSource(
  154. physicalDataType,
  155. keyDecodingFormat.orElse(null),
  156. valueDecodingFormat,
  157. keyProjection,
  158. valueProjection,
  159. keyPrefix,
  160. KafkaOptions.getSourceTopics(tableOptions),
  161. KafkaOptions.getSourceTopicPattern(tableOptions),
  162. properties,
  163. startupOptions.startupMode,
  164. startupOptions.specificOffsets,
  165. startupOptions.startupTimestampMillis);
  166. }
  167. @Override
  168. public DynamicTableSink createDynamicTableSink(Context context) {
  169. final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
  170. final ReadableConfig tableOptions = helper.getOptions();
  171. final Optional<EncodingFormat<SerializationSchema<RowData>>> keyEncodingFormat =
  172. getKeyEncodingFormat(helper);
  173. final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat =
  174. getValueEncodingFormat(helper);
  175. helper.validateExcept(PROPERTIES_PREFIX);
  176. validateTableSinkOptions(tableOptions);
  177. validatePKConstraints(
  178. context.getObjectIdentifier(), context.getCatalogTable(), valueEncodingFormat);
  179. final DataType physicalDataType =
  180. context.getCatalogTable().getSchema().toPhysicalRowDataType();
  181. final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType);
  182. final int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType);
  183. final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
  184. final Integer parallelism = tableOptions.getOptional(SINK_PARALLELISM).orElse(null);
  185. return createKafkaTableSink(
  186. physicalDataType,
  187. keyEncodingFormat.orElse(null),
  188. valueEncodingFormat,
  189. keyProjection,
  190. valueProjection,
  191. keyPrefix,
  192. tableOptions.get(TOPIC).get(0),
  193. getKafkaProperties(context.getCatalogTable().getOptions()),
  194. getFlinkKafkaPartitioner(tableOptions, context.getClassLoader()).orElse(null),
  195. getSinkSemantic(tableOptions),
  196. parallelism);
  197. }
  198. // --------------------------------------------------------------------------------------------
  199. private static Optional<DecodingFormat<DeserializationSchema<RowData>>> getKeyDecodingFormat(
  200. TableFactoryHelper helper) {
  201. final Optional<DecodingFormat<DeserializationSchema<RowData>>> keyDecodingFormat =
  202. helper.discoverOptionalDecodingFormat(
  203. DeserializationFormatFactory.class, KEY_FORMAT);
  204. keyDecodingFormat.ifPresent(
  205. format -> {
  206. if (!format.getChangelogMode().containsOnly(RowKind.INSERT)) {
  207. throw new ValidationException(
  208. String.format(
  209. "A key format should only deal with INSERT-only records. "
  210. + "But %s has a changelog mode of %s.",
  211. helper.getOptions().get(KEY_FORMAT),
  212. format.getChangelogMode()));
  213. }
  214. });
  215. return keyDecodingFormat;
  216. }
  217. private static Optional<EncodingFormat<SerializationSchema<RowData>>> getKeyEncodingFormat(
  218. TableFactoryHelper helper) {
  219. final Optional<EncodingFormat<SerializationSchema<RowData>>> keyEncodingFormat =
  220. helper.discoverOptionalEncodingFormat(SerializationFormatFactory.class, KEY_FORMAT);
  221. keyEncodingFormat.ifPresent(
  222. format -> {
  223. if (!format.getChangelogMode().containsOnly(RowKind.INSERT)) {
  224. throw new ValidationException(
  225. String.format(
  226. "A key format should only deal with INSERT-only records. "
  227. + "But %s has a changelog mode of %s.",
  228. helper.getOptions().get(KEY_FORMAT),
  229. format.getChangelogMode()));
  230. }
  231. });
  232. return keyEncodingFormat;
  233. }
  234. private static DecodingFormat<DeserializationSchema<RowData>> getValueDecodingFormat(
  235. TableFactoryHelper helper) {
  236. return helper.discoverOptionalDecodingFormat(
  237. DeserializationFormatFactory.class, FactoryUtil.FORMAT)
  238. .orElseGet(
  239. () ->
  240. helper.discoverDecodingFormat(
  241. DeserializationFormatFactory.class, VALUE_FORMAT));
  242. }
  243. private static EncodingFormat<SerializationSchema<RowData>> getValueEncodingFormat(
  244. TableFactoryHelper helper) {
  245. return helper.discoverOptionalEncodingFormat(
  246. SerializationFormatFactory.class, FactoryUtil.FORMAT)
  247. .orElseGet(
  248. () ->
  249. helper.discoverEncodingFormat(
  250. SerializationFormatFactory.class, VALUE_FORMAT));
  251. }
  252. private static void validatePKConstraints(
  253. ObjectIdentifier tableName, CatalogTable catalogTable, Format format) {
  254. if (catalogTable.getSchema().getPrimaryKey().isPresent()
  255. && format.getChangelogMode().containsOnly(RowKind.INSERT)) {
  256. Configuration options = Configuration.fromMap(catalogTable.getOptions());
  257. String formatName =
  258. options.getOptional(FactoryUtil.FORMAT).orElse(options.get(VALUE_FORMAT));
  259. throw new ValidationException(
  260. String.format(
  261. "The Kafka table '%s' with '%s' format doesn't support defining PRIMARY KEY constraint"
  262. + " on the table, because it can't guarantee the semantic of primary key.",
  263. tableName.asSummaryString(), formatName));
  264. }
  265. }
  266. // --------------------------------------------------------------------------------------------
  267. protected KafkaDynamicSource createKafkaTableSource(
  268. DataType physicalDataType,
  269. @Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat,
  270. DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
  271. int[] keyProjection,
  272. int[] valueProjection,
  273. @Nullable String keyPrefix,
  274. @Nullable List<String> topics,
  275. @Nullable Pattern topicPattern,
  276. Properties properties,
  277. StartupMode startupMode,
  278. Map<KafkaTopicPartition, Long> specificStartupOffsets,
  279. long startupTimestampMillis) {
  280. return new KafkaDynamicSource(
  281. physicalDataType,
  282. keyDecodingFormat,
  283. valueDecodingFormat,
  284. keyProjection,
  285. valueProjection,
  286. keyPrefix,
  287. topics,
  288. topicPattern,
  289. properties,
  290. startupMode,
  291. specificStartupOffsets,
  292. startupTimestampMillis,
  293. false);
  294. }
  295. protected KafkaDynamicSink createKafkaTableSink(
  296. DataType physicalDataType,
  297. @Nullable EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat,
  298. EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat,
  299. int[] keyProjection,
  300. int[] valueProjection,
  301. @Nullable String keyPrefix,
  302. String topic,
  303. Properties properties,
  304. FlinkKafkaPartitioner<RowData> partitioner,
  305. KafkaSinkSemantic semantic,
  306. Integer parallelism) {
  307. return new KafkaDynamicSink(
  308. physicalDataType,
  309. keyEncodingFormat,
  310. valueEncodingFormat,
  311. keyProjection,
  312. valueProjection,
  313. keyPrefix,
  314. topic,
  315. properties,
  316. partitioner,
  317. semantic,
  318. false,
  319. parallelism);
  320. }
  321. }
  322. //
  323. /*
  324. * Licensed to the Apache Software Foundation (ASF) under one
  325. * or more contributor license agreements. See the NOTICE file
  326. * distributed with this work for additional information
  327. * regarding copyright ownership. The ASF licenses this file
  328. * to you under the Apache License, Version 2.0 (the
  329. * "License"); you may not use this file except in compliance
  330. * with the License. You may obtain a copy of the License at
  331. *
  332. * http://www.apache.org/licenses/LICENSE-2.0
  333. *
  334. * Unless required by applicable law or agreed to in writing, software
  335. * distributed under the License is distributed on an "AS IS" BASIS,
  336. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  337. * See the License for the specific language governing permissions and
  338. * limitations under the License.
  339. */
  340. package org.apache.flink.table.planner.plan.nodes.common
  341. import org.apache.flink.api.common.eventtime.WatermarkStrategy
  342. import org.apache.flink.api.common.io.InputFormat
  343. import org.apache.flink.api.dag.Transformation
  344. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
  345. import org.apache.flink.table.connector.source.{DataStreamScanProvider, InputFormatProvider, ScanTableSource, SourceFunctionProvider, SourceProvider}
  346. import org.apache.flink.table.data.RowData
  347. import org.apache.flink.table.planner.calcite.FlinkTypeFactory
  348. import org.apache.flink.table.planner.plan.schema.TableSourceTable
  349. import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext
  350. import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
  351. import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
  352. import org.apache.calcite.rel.RelWriter
  353. import org.apache.calcite.rel.`type`.RelDataType
  354. import org.apache.calcite.rel.core.TableScan
  355. import scala.collection.JavaConverters._
  356. /**
  357. * Base physical RelNode to read data from an external source defined by a [[ScanTableSource]].
  358. */
  359. abstract class CommonPhysicalTableSourceScan(
  360. cluster: RelOptCluster,
  361. traitSet: RelTraitSet,
  362. relOptTable: TableSourceTable)
  363. extends TableScan(cluster, traitSet, relOptTable) {
  364. // cache table source transformation.
  365. protected var sourceTransform: Transformation[_] = _
  366. protected val tableSourceTable: TableSourceTable = relOptTable.unwrap(classOf[TableSourceTable])
  367. protected[flink] val tableSource: ScanTableSource =
  368. tableSourceTable.tableSource.asInstanceOf[ScanTableSource]
  369. override def deriveRowType(): RelDataType = {
  370. // TableScan row type should always keep same with its
  371. // interval RelOptTable's row type.
  372. relOptTable.getRowType
  373. }
  374. override def explainTerms(pw: RelWriter): RelWriter = {
  375. super.explainTerms(pw)
  376. .item("fields", getRowType.getFieldNames.asScala.mkString(", "))
  377. }
  378. protected def createSourceTransformation(
  379. env: StreamExecutionEnvironment,
  380. name: String): Transformation[RowData] = {
  381. val runtimeProvider = tableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE)
  382. val outRowType = FlinkTypeFactory.toLogicalRowType(tableSourceTable.getRowType)
  383. val outTypeInfo = InternalTypeInfo.of(outRowType)
  384. val transformation = runtimeProvider match {
  385. case provider: SourceFunctionProvider =>
  386. val sourceFunction = provider.createSourceFunction()
  387. env
  388. .addSource(sourceFunction, name, outTypeInfo)
  389. .getTransformation
  390. case provider: InputFormatProvider =>
  391. val inputFormat = provider.createInputFormat()
  392. createInputFormatTransformation(env, inputFormat, name, outTypeInfo)
  393. case provider: SourceProvider =>
  394. // TODO: Push down watermark strategy to source scan
  395. val strategy: WatermarkStrategy[RowData] = WatermarkStrategy.noWatermarks()
  396. env.fromSource(provider.createSource(), strategy, name).getTransformation
  397. case provider: DataStreamScanProvider =>
  398. provider.produceDataStream(env).getTransformation
  399. }
  400. val parallelism = tableSourceTable.catalogTable.toProperties.get("source.parallelism")
  401. if(parallelism != null){
  402. transformation.setParallelism(parallelism.toInt)
  403. }
  404. transformation
  405. }
  406. /**
  407. * Creates a [[Transformation]] based on the given [[InputFormat]].
  408. * The implementation is different for streaming mode and batch mode.
  409. */
  410. protected def createInputFormatTransformation(
  411. env: StreamExecutionEnvironment,
  412. inputFormat: InputFormat[RowData, _],
  413. name: String,
  414. outTypeInfo: InternalTypeInfo[RowData]): Transformation[RowData]
  415. }

需要覆盖的类有2个:KafkaDynamicTableFactory,CommonPhysicalTableSourceScan

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/44088
推荐阅读
相关标签
  

闽ICP备14008679号