赞
踩
前面我们了解了 写给大忙人看的Flink 消费 Kafka,今天我们一起来看一下 FlinkSQL Kafka 是如何与 Flink Streaming Kafka 结合起来的
创建 kafka source
CREATE TABLE orders
(
status int,
courier_id bigint,
id bigint,
finish_time BIGINT,
place_time BIGINT,
PRIMARY KEY (id) NOT ENFORCED
)
WITH (
'connector' = 'kafka','topic' = 'test',
'properties.bootstrap.servers' = 'xxx','properties.group.id' = 'testGroup',
'format' = 'ss-canal-json','ss-canal-json.table.include' = 'orders','scan.startup.mode' = 'latest-offset');
经过 Apache Calcite 的一系列转化( 具体转化的过程后续会写 ),最终达到 CatalogSourceTable 类,此类继承自 FlinkPreparingTableBase,负责将 Calcite 的 RelOptTable 转化为 Flink 的 TableSourceTable
@Override //入口方法 SqlToRelConverter toRel 方法 public RelNode toRel(ToRelContext toRelContext) { final RelOptCluster cluster = toRelContext.getCluster(); final List<RelHint> hints = toRelContext.getTableHints();// sql Hint final FlinkContext context = ShortcutUtils.unwrapContext(cluster); final FlinkTypeFactory typeFactory = ShortcutUtils.unwrapTypeFactory(cluster); final FlinkRelBuilder relBuilder = FlinkRelBuilder.of(cluster, relOptSchema); // 0. finalize catalog table final Map<String, String> hintedOptions = FlinkHints.getHintedOptions(hints); final CatalogTable catalogTable = createFinalCatalogTable(context, hintedOptions); // 1. create and prepare table source final DynamicTableSource tableSource = createDynamicTableSource(context, catalogTable); prepareDynamicSource( schemaTable.getTableIdentifier(), catalogTable, tableSource, schemaTable.isStreamingMode(), context.getTableConfig()); // 2. push table scan pushTableScan(relBuilder, cluster, catalogTable, tableSource, typeFactory, hints); // 3. push project for non-physical columns final TableSchema schema = catalogTable.getSchema(); if (!TableSchemaUtils.containsPhysicalColumnsOnly(schema)) { pushMetadataProjection(relBuilder, typeFactory, schema); pushGeneratedProjection(context, relBuilder, schema); } // 4. push watermark assigner if (schemaTable.isStreamingMode() && !schema.getWatermarkSpecs().isEmpty()) { pushWatermarkAssigner(context, relBuilder, schema); } return relBuilder.build(); }
0-4 转化完成。这篇 blog 主要关心部分是 1 ,我们继续追踪到 FactoryUtil.createTableSource 方法
public static DynamicTableSource createTableSource( @Nullable Catalog catalog, //GenericlnMemoryCatalog ObjectIdentifier objectIdentifier,//`default_catalog`.`default_database`.`orders` CatalogTable catalogTable,//CatalogTableImpl ReadableConfig configuration, ClassLoader classLoader, boolean isTemporary) { final DefaultDynamicTableContext context = new DefaultDynamicTableContext( objectIdentifier, catalogTable, configuration, classLoader, isTemporary); try { final DynamicTableSourceFactory factory = // 找到 KafkaDynamicTableFactory getDynamicTableFactory(DynamicTableSourceFactory.class, catalog, context); return factory.createDynamicTableSource(context); } catch (Throwable t) { throw new ValidationException( String.format( "Unable to create a source for reading table '%s'.\n\n" + "Table options are:\n\n" + "%s", objectIdentifier.asSummaryString(), catalogTable.getOptions().entrySet().stream() .map(e -> stringifyOption(e.getKey(), e.getValue())) .sorted() .collect(Collectors.joining("\n"))), t); } }
我们到 KafkaDynamicTableFactory 的 createDynamicTableSource 方法
@Override public DynamicTableSource createDynamicTableSource(Context context) { final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); final ReadableConfig tableOptions = helper.getOptions();//with 里的配置信息 // 通过 format (SPI) final Optional<DecodingFormat<DeserializationSchema<RowData>>> keyDecodingFormat = getKeyDecodingFormat(helper); final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat =//SSCanalJsonFormatFactory getValueDecodingFormat(helper); // 一些类的校验 validate helper.validateExcept(PROPERTIES_PREFIX); validateTableSourceOptions(tableOptions); validatePKConstraints( context.getObjectIdentifier(), context.getCatalogTable(), valueDecodingFormat); final StartupOptions startupOptions = getStartupOptions(tableOptions); //获取 kafka 本身的一些配置 servers、group.id 等 final Properties properties = getKafkaProperties(context.getCatalogTable().getOptions()); // add topic-partition discovery properties.setProperty( FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, String.valueOf( tableOptions .getOptional(SCAN_TOPIC_PARTITION_DISCOVERY) .map(Duration::toMillis) .orElse(FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED))); final DataType physicalDataType =//ROW<`status` INT, `courier_id` BIGINT, `id` BIGINT, `finish_time` BIGINT> NOT NULL context.getCatalogTable().getSchema().toPhysicalRowDataType(); final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType); final int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType); final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null); return createKafkaTableSource( physicalDataType, keyDecodingFormat.orElse(null), valueDecodingFormat, keyProjection, valueProjection, keyPrefix, KafkaOptions.getSourceTopics(tableOptions), KafkaOptions.getSourceTopicPattern(tableOptions), properties, startupOptions.startupMode, startupOptions.specificOffsets, startupOptions.startupTimestampMillis); }
首先做了一些校验,然后传入一些配置来创建 tableSource ,如下
protected KafkaDynamicSource createKafkaTableSource( DataType physicalDataType,//要查询的字段 ROW<`status` INT, `courier_id` BIGINT, `id` BIGINT, `finish_time` BIGINT> NOT NULL @Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat, DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,//SSCanalJsonFormatFactory int[] keyProjection, int[] valueProjection, @Nullable String keyPrefix, @Nullable List<String> topics,// topics @Nullable Pattern topicPattern,//topicPattern Properties properties,// kafka 的一些配置信息,servers、group.id 等 StartupMode startupMode, Map<KafkaTopicPartition, Long> specificStartupOffsets, long startupTimestampMillis) { return new KafkaDynamicSource( physicalDataType, keyDecodingFormat, valueDecodingFormat, keyProjection, valueProjection, keyPrefix, topics, topicPattern, properties, startupMode, specificStartupOffsets, startupTimestampMillis, false); }
继续执行
prepareDynamicSource(
schemaTable.getTableIdentifier(),
catalogTable,
tableSource,
schemaTable.isStreamingMode(),
context.getTableConfig());
会调用 KafkaDynamicSource.getScanRuntimeProvider 方法,创建 FlinkKafkaConsumer 成功
关于 'format' = 'ss-canal-json' 的一些事情可以参考 FlinkSQL 平台
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。