当前位置:   article > 正文

使Flink SQL Kafka Source支持独立设置并行度

kafaka 算子指定 source算子上指定parallelisms

前言

社区在Flink 1.12版本通过FLIP-146提出了增强Flink SQL DynamicTableSource/Sink接口的动议,其中的一个主要工作就是让它们支持独立设置并行度。很多Sink都已经可以配置sink.parallelism参数(见FLINK-19937),但Source还没动静。这是因为Source一直以来有两种并行的标准,一是传统的流式SourceFunction与批式InputFormat,二是原生支持流批一体的FLIP-27 Source,并且Connector之间的实现并不统一。

笔者最近在Flink钉群闲逛时,经常看到如下图所示的发言,可见大家对Source(主要是Kafka Source)支持独立设置并行度的需求比较急切。

本文就来基于1.13.0版本实现该需求,注意此版本的SQL Kafka Source尚未迁移到FLIP-27。这项改进已经过验证,可以在生产环境使用,但仍属于过渡方案,故不会向社区发起PR。

实现ParallelismProvider

ScanTableSource的运行时逻辑需要由ScanTableSource.ScanRuntimeProvider来提供,一共有5种,如下图所示。

显然我们要修改SourceFunctionProvider,让它实现FLIP-146定义的ParallelismProvider接口,表示它支持独立设置并行度。代码很简单:

  1. @PublicEvolving
  2. public interface SourceFunctionProvider extends ScanTableSource.ScanRuntimeProvider, ParallelismProvider {
  3. /** Helper method for creating a static provider. */
  4. static SourceFunctionProvider of(SourceFunction<RowData> sourceFunction, boolean isBounded) {
  5. return new SourceFunctionProvider() {
  6. @Override
  7. public SourceFunction<RowData> createSourceFunction() {
  8. return sourceFunction;
  9. }
  10. @Override
  11. public boolean isBounded() {
  12. return isBounded;
  13. }
  14. };
  15. }
  16. /** Helper method for creating a static provider with a provided parallelism. */
  17. static SourceFunctionProvider of(SourceFunction<RowData> sourceFunction, boolean isBounded, Integer sourceParallelism) {
  18. return new SourceFunctionProvider() {
  19. @Override
  20. public SourceFunction<RowData> createSourceFunction() {
  21. return sourceFunction;
  22. }
  23. @Override
  24. public boolean isBounded() {
  25. return isBounded;
  26. }
  27. @Override
  28. public Optional<Integer> getParallelism() {
  29. return Optional.ofNullable(sourceParallelism);
  30. }
  31. };
  32. }
  33. /** Creates a {@link SourceFunction} instance. */
  34. SourceFunction<RowData> createSourceFunction();
  35. }

添加scan.parallelism参数

o.a.f.table.factories.FactoryUtil中添加:

  1. public static final ConfigOption<Integer> SCAN_PARALLELISM =
  2. ConfigOptions.key("scan.parallelism")
  3. .intType()
  4. .noDefaultValue()
  5. .withDescription(
  6. "Defines a custom parallelism for the scan source. "
  7. + "By default, if this option is not defined, the planner will derive the parallelism "
  8. + "for each statement individually by also considering the global configuration.");

修改Kafka Connector

首先修改KafkaDynamicSource

  • 在构造方法中添加@Nullable Integer parallelism及相关的代码;
  • getScanRuntimeProvider()方法的最后:
return SourceFunctionProvider.of(kafkaConsumer, false, parallelism);
  • copy() / equals() / hashCode()方法内加上parallelism

然后修改KafkaDynamicTableFactory,加入SCAN_PARALLELISM参数,以及使用带并行度的KafkaDynamicSource构造方法,不再赘述。

修改Source物理执行节点

负责使ScanTableSource发挥作用的物理执行节点为CommonExecTableSourceScan,注意到它的translateToPlanInternal()方法中,对不同类型的ScanRuntimeProvider分别做了处理。我们找到SourceFunctionProvider对应的那个判断分支,加上与并行度相关的代码。

  1. if (provider instanceof SourceFunctionProvider) {
  2. SourceFunction<RowData> sourceFunction =
  3. ((SourceFunctionProvider) provider).createSourceFunction();
  4. DataStreamSource<RowData> streamSource = env.addSource(
  5. sourceFunction, operatorName, outputTypeInfo);
  6. final int confParallelism = streamSource.getParallelism();
  7. final int sourceParallelism = deriveSourceParallelism(
  8. (ParallelismProvider) provider, confParallelism);
  9. Transformation<RowData> transformation = streamSource.getTransformation();
  10. transformation.setParallelism(sourceParallelism);
  11. return transformation;
  12. }
  13. private int deriveSourceParallelism(
  14. ParallelismProvider parallelismProvider, int confParallelism) {
  15. final Optional<Integer> parallelismOptional = parallelismProvider.getParallelism();
  16. if (parallelismOptional.isPresent()) {
  17. int sourceParallelism = parallelismOptional.get();
  18. if (sourceParallelism <= 0) {
  19. throw new TableException(
  20. String.format(
  21. "Table: %s configured source parallelism: "
  22. + "%s should not be less than zero or equal to zero",
  23. tableSourceSpec.getObjectIdentifier().asSummaryString(),
  24. sourceParallelism));
  25. }
  26. return sourceParallelism;
  27. } else {
  28. return confParallelism;
  29. }
  30. }

大功告成?

将全局并行度设为10,用一条简单的SQL语句测试一下:

  1. SELECT siteId, COUNT(orderId)
  2. FROM rtdw_dwd.kafka_order_done_log /*+ OPTIONS('scan.parallelism'='5') */
  3. WHERE mainSiteId = 10029
  4. GROUP BY siteId;

emm,看起来似乎不太对,为什么Source后面的Calc节点并行度也变成了5?这是因为Calc的并行度默认以输入流的并行度决定,所以我们还要提供强制打断算子链的选项,让Calc能够恢复全局并行度。

ExecutionConfigOptions中加入一个参数table.exec.source.force-break-chain

  1. @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
  2. public static final ConfigOption<Boolean> TABLE_EXEC_SOURCE_FORCE_BREAK_CHAIN =
  3. key("table.exec.source.force-break-chain")
  4. .booleanType()
  5. .defaultValue(false)
  6. .withDescription(
  7. "Indicates whether to forcefully break the operator chain after the source.");

然后在上面改过的CommonExecTableSourceScan代码中,加入对此参数的判断,如果为true,则调用disableChaining()方法断链。

  1. final Configuration config = planner.getTableConfig().getConfiguration();
  2. if (config.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_FORCE_BREAK_CHAIN)) {
  3. streamSource.disableChaining();
  4. }

最后不要忘了修改CommonExecCalc。如果它的输入是CommonExecTableSourceScan且上述参数生效,那么就将它的并行度直接置为PARALLELISM_DEFAULT,即全局并行度。

  1. @Override
  2. protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
  3. final ExecEdge inputEdge = getInputEdges().get(0);
  4. final Transformation<RowData> inputTransform =
  5. (Transformation<RowData>) inputEdge.translateToPlan(planner);
  6. final CodeGeneratorContext ctx = /* ... */;
  7. final CodeGenOperatorFactory<RowData> substituteStreamOperator = /* ... */;
  8. int parallelism = inputTransform.getParallelism();
  9. if (inputEdge.getSource() instanceof CommonExecTableSourceScan) {
  10. final Configuration config = planner.getTableConfig().getConfiguration();
  11. if (config.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_FORCE_BREAK_CHAIN)) {
  12. parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
  13. }
  14. }
  15. return new OneInputTransformation<>(
  16. inputTransform,
  17. getDescription(),
  18. substituteStreamOperator,
  19. InternalTypeInfo.of(getOutputType()),
  20. parallelism);
  21. }

再试一试,结果符合预期:

提供强制断链的参数还有一重好处,即能够在SQL作业并行度变化时安全地恢复现场。举个例子,若Source并行度和全局并行度起初都是5,但是在作业运行过程中发现下游处理速度不够,而将全局并行度提升到10的话,那么原有的checkpoint将无法使用——因为并行度的变化导致了作业拓扑变化。如果我们在一开始就将table.exec.source.force-break-chain设为true,那么上面所述的情况将不会造成困扰。

The End

民那晚安晚安。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/44070
推荐阅读
相关标签
  

闽ICP备14008679号