当前位置:   article > 正文

Flink源码-StreamGraph的生成_flink streamgraph构建源码

flink streamgraph构建源码

        上个帖子我们分析到了我们代码中算子最终会通过transformation的add方法将自身的血缘依赖添加到一个transformations这个list中,接下来我们看一下transforamtions是如何把血缘依赖放进streamgraph中.

1.StreamGraph的结构

        StreamGraph我们可能比较熟悉,如下图所示,主要包含两部分streamNode和StreamEdge,StreamNode也就是,每个算子直接转换过来的,一个算子会转换成一个streamNode,streamEdge是指连接两个streamNode的边,由于中间的streamNode前后都有streamNode,所以中间的streamNode有两条边,既InputEdge和OutputEdge。

        例如下图中StreamGraph共有四个StreamNode,对应flatMap的那个StreamNode就有两个StreamEdge,既InputEdge和OutputEdge。处于两头的source和sink StreamNode只有一个streamEdge.source有一个OutputEdge,而sink有一个inputEdge。

image.png

2.StreamGraph的生成

我们都知道flink代码的执行都是从env.execute方法开始的,那我们就顺着这个方法往下看

 getStreamGraph方法用来获取streamGraph,这下flink第一个关键执行图出来了,证明我们的方向是没有错的,我们继续往下看

通过getStreamGraphGenerator方法创建一个streamGraphGenerator实例,然后调用generate方法生成streamGraph,这里如果clearTransformations为true会调用transformations.clear方法清空transformation

 

 接下来到关键方法:generate方法,这个方法比较长,我们分步来看里面总共做了以下操作:

1.根据传入的执行参数,checkpoint配置参数,savepoint重启参数创建streamgraph

2.判断当前的作业是批式作业还是流式作业

3.配置streamGraph的job name,checkpoint设置和savepoint等设置

4.循环遍历transforamtions,将transformation转换成streamNode和streamEdge

5.循环遍历每个streamEdge的inputEdge,判断是否支持异步checkpoint

6.清空已经转换的transforamtion集合,返回创建的streamGraph

  1. public StreamGraph generate() {
  2. //传入执行参数和checkpoint配置参数,savepoint重启设置创建streamgraph
  3. streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
  4. //判断执行模式是批次执行还是流式执行
  5. shouldExecuteInBatchMode = shouldExecuteInBatchMode(runtimeExecutionMode);
  6. //配置streamGraph job名,checkpoint设置,savepoint等设置
  7. configureStreamGraph(streamGraph);
  8. alreadyTransformed = new HashMap<>();
  9. //循环遍历transformations,将transformation转换成streamNode和streamEdge
  10. for (Transformation<?> transformation : transformations) {
  11. transform(transformation);
  12. }
  13. //循环遍历每个streamEdge的入边,判断是否支持异步checkpoint
  14. for (StreamNode node : streamGraph.getStreamNodes()) {
  15. if (node.getInEdges().stream().anyMatch(this::shouldDisableUnalignedCheckpointing)) {
  16. for (StreamEdge edge : node.getInEdges()) {
  17. edge.setSupportsUnalignedCheckpoints(false);
  18. }
  19. }
  20. }
  21. final StreamGraph builtStreamGraph = streamGraph;
  22. alreadyTransformed.clear();
  23. alreadyTransformed = null;
  24. streamGraph = null;
  25. return builtStreamGraph;
  26. }

 3.StreamNode和StreamEdge的生成

上一步我们看到了在transform方法中transformation变成了StreamNode和StreamEdge,下面我们深入进去看看他们是如何生成的

1.transform方法

该方法中经过各种判断,最终在translate方法中执行转换

  1. private Collection<Integer> transform(Transformation<?> transform) {
  2. //判断已经转换的transformation是否包含当前transformation
  3. if (alreadyTransformed.containsKey(transform)) {
  4. return alreadyTransformed.get(transform);
  5. }
  6. LOG.debug("Transforming " + transform);
  7. //设置transform的并行度
  8. if (transform.getMaxParallelism() <= 0) {
  9. // if the max parallelism hasn't been set, then first use the job wide max parallelism
  10. // from the ExecutionConfig.
  11. int globalMaxParallelismFromConfig = executionConfig.getMaxParallelism();
  12. if (globalMaxParallelismFromConfig > 0) {
  13. transform.setMaxParallelism(globalMaxParallelismFromConfig);
  14. }
  15. }
  16. // call at least once to trigger exceptions about MissingTypeInfo
  17. transform.getOutputType();
  18. //获取该transformation的translator类型
  19. final TransformationTranslator<?, Transformation<?>> translator =
  20. (TransformationTranslator<?, Transformation<?>>)
  21. translatorMap.get(transform.getClass());
  22. Collection<Integer> transformedIds;
  23. //根据不同类型调用translate方法获取transformId
  24. if (translator != null) {
  25. transformedIds = translate(translator, transform);
  26. } else {
  27. transformedIds = legacyTransform(transform);
  28. }
  29. // need this check because the iterate transformation adds itself before
  30. // transforming the feedback edges
  31. //判断该transformation是否转换,该map中没有对应的key就加入map
  32. if (!alreadyTransformed.containsKey(transform)) {
  33. alreadyTransformed.put(transform, transformedIds);
  34. }
  35. return transformedIds;
  36. }

2.translate方法

在方法中可以看到最后会根据执行模式选择执行translateForBatch或者translateForStreaming方法

我们这里是流式执行,所以我们后面继续看translateForStreaming方法

  1. private Collection<Integer> translate(
  2. final TransformationTranslator<?, Transformation<?>> translator,
  3. final Transformation<?> transform) {
  4. checkNotNull(translator);
  5. checkNotNull(transform);
  6. //获取该tranformation的parentId
  7. final List<Collection<Integer>> allInputIds = getParentInputIds(transform.getInputs());
  8. // the recursive call might have already transformed this
  9. //判断该transfomation是否在alreadyTransformed这个集合中
  10. if (alreadyTransformed.containsKey(transform)) {
  11. return alreadyTransformed.get(transform);
  12. }
  13. //获取该transformation的共享组
  14. final String slotSharingGroup =
  15. determineSlotSharingGroup(
  16. transform.getSlotSharingGroup(),
  17. allInputIds.stream()
  18. .flatMap(Collection::stream)
  19. .collect(Collectors.toList()));
  20. final TransformationTranslator.Context context =
  21. new ContextImpl(this, streamGraph, slotSharingGroup, configuration);
  22. //根据执行模式选择不同的方法进行转换
  23. return shouldExecuteInBatchMode
  24. ? translator.translateForBatch(transform, context)
  25. : translator.translateForStreaming(transform, context);
  26. }

3.translateForStreaming

这个方法在两个类中有两个实现方法,sink算子选择sinkTransformationTranslator,其他算子就选择 simpleTransformationTranslator,这里我们选择simpleTransformationTranslator.translateForStreaming作为例子讲解

  1. @Override
  2. public final Collection<Integer> translateForStreaming(
  3. final T transformation, final Context context) {
  4. checkNotNull(transformation);
  5. checkNotNull(context);
  6. //底层调用translateForStreamingInternal进行转换
  7. final Collection<Integer> transformedIds =
  8. translateForStreamingInternal(transformation, context);
  9. configure(transformation, context);
  10. return transformedIds;
  11. }

translateForStreamingInternal是一个抽象方法,他有很多的实现类:

 这里我们就选择OneInputTransformationTranslator作为例子讲解,其实选择哪一个都可以,里面的主要代码都是一样的,大家有兴趣的可以自己下去看看这些到底有啥区别,顺便也可以自己学习一下现在transformation有哪些种类。话不多说我们继续往下看代码:他有调用自己内部的方法 translateInternal     那我们就看看这个方法吧,看到了这个方法我们基本上也来到了终点站了,这个方法中总主要做了两件事:

        1.给streamGraph设置operator

        2.给streamGraph设置edge

  1. protected Collection<Integer> translateInternal(
  2. final Transformation<OUT> transformation,
  3. final StreamOperatorFactory<OUT> operatorFactory,
  4. final TypeInformation<IN> inputType,
  5. @Nullable final KeySelector<IN, ?> stateKeySelector,
  6. @Nullable final TypeInformation<?> stateKeyType,
  7. final Context context) {
  8. checkNotNull(transformation);
  9. checkNotNull(operatorFactory);
  10. checkNotNull(inputType);
  11. checkNotNull(context);
  12. //获取streamGraph和transformaId
  13. final StreamGraph streamGraph = context.getStreamGraph();
  14. final String slotSharingGroup = context.getSlotSharingGroup();
  15. final int transformationId = transformation.getId();
  16. final ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
  17. //给streamGraph添加算子
  18. streamGraph.addOperator(
  19. transformationId,
  20. slotSharingGroup,
  21. transformation.getCoLocationGroupKey(),
  22. operatorFactory,
  23. inputType,
  24. transformation.getOutputType(),
  25. transformation.getName());
  26. if (stateKeySelector != null) {
  27. TypeSerializer<?> keySerializer = stateKeyType.createSerializer(executionConfig);
  28. streamGraph.setOneInputStateKey(transformationId, stateKeySelector, keySerializer);
  29. }
  30. int parallelism =
  31. transformation.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT
  32. ? transformation.getParallelism()
  33. : executionConfig.getParallelism();
  34. streamGraph.setParallelism(transformationId, parallelism);
  35. streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism());
  36. //获取该transformation的父transformation
  37. final List<Transformation<?>> parentTransformations = transformation.getInputs();
  38. checkState(
  39. parentTransformations.size() == 1,
  40. "Expected exactly one input transformation but found "
  41. + parentTransformations.size());
  42. //给streamGraph设置streamEdge
  43. for (Integer inputId : context.getStreamNodeIds(parentTransformations.get(0))) {
  44. streamGraph.addEdge(inputId, transformationId, 0);
  45. }
  46. return Collections.singleton(transformationId);
  47. }

可能有的小伙伴就问了前面不是说streamGraph是streamNodeheStreamEdge组成的,这里是设置operator和streamEdge,没有看见streamNode啊,别急咱们再往下看:

 这里transformation也就转换成streamGraph了,离我们的目标物理执行图更进了一步,下次我们在讲streamGraph怎么转换成JobGraph。(都看到这了,还不点赞,白嫖可耻啊)

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号