赞
踩
testCse
final StreamExecutionEnvironment env = new StreamExecutionEnvironment(configuration);
//将算子添加进transformArrayList中
env.fromCollection(Collections.singletonList(42))
.addSink(new DiscardingSink<>());
return env.execute();
StreamExecutionEnvironment
构建streamGraph(进行相关配置初始化,并未真正进行构建)并异步执行
public JobExecutionResult execute(String jobName) throws Exception { Preconditions.checkNotNull(jobName, "Streaming Job name should not be null."); final StreamGraph streamGraph = getStreamGraph(); streamGraph.setJobName(jobName); return execute(streamGraph); } public JobExecutionResult execute(StreamGraph streamGraph) throws Exception { //异步执行streamGraph final JobClient jobClient = executeAsync(streamGraph); try { final JobExecutionResult jobExecutionResult; if (configuration.getBoolean(DeploymentOptions.ATTACHED)) { jobExecutionResult = jobClient.getJobExecutionResult().get(); } else { jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID()); } jobListeners.forEach( jobListener -> jobListener.onJobExecuted(jobExecutionResult, null)); return jobExecutionResult; } catch (Throwable t) { ... } public JobClient executeAsync(StreamGraph streamGraph) throws Exception { final PipelineExecutorFactory executorFactory = executorServiceLoader.getExecutorFactory(configuration); CompletableFuture<JobClient> jobClientFuture = executorFactory //获取执行器,具体实现类如下图 .getExecutor(configuration) //实例化执行器并执行 .execute(streamGraph, configuration, userClassloader); try { JobClient jobClient = jobClientFuture.get(); //提交成功 jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null)); return jobClient; } catch (ExecutionException executionException) { final Throwable strippedException = ExceptionUtils.stripExecutionException(executionException); jobListeners.forEach( //提交失败 jobListener -> jobListener.onJobSubmitted(null, strippedException)); throw new FlinkException( String.format("Failed to execute job '%s'.", streamGraph.getJobName()), strippedException); } }
executor实现:

LocalExecutorFactory
public class LocalExecutorFactory implements PipelineExecutorFactory {
@Override
public PipelineExecutor getExecutor(final Configuration configuration) {
//创建执行器
return LocalExecutor.create(configuration);
}
...
}
LocalExecutor
public static LocalExecutor create(Configuration configuration) {
//创建cluster
return new LocalExecutor(configuration, MiniCluster::new);
}
LocalExecutor
@Override public CompletableFuture<JobClient> execute( Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassloader) throws Exception { checkNotNull(pipeline); checkNotNull(configuration); Configuration effectiveConfig = new Configuration(); effectiveConfig.addAll(this.configuration); effectiveConfig.addAll(configuration); // we only support attached execution with the local executor. checkState(configuration.getBoolean(DeploymentOptions.ATTACHED)); final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig); return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory) //提交作业 .submitJob(jobGraph, userCodeClassloader); } /** Starts a {@link MiniCluster} and submits a job. */ public CompletableFuture<JobClient> submitJob( JobGraph jobGraph, ClassLoader userCodeClassloader) throws Exception { MiniClusterConfiguration miniClusterConfig = getMiniClusterConfig(jobGraph.getMaximumParallelism()); MiniCluster miniCluster = miniClusterFactory.apply(miniClusterConfig); //启动cluster miniCluster.start(); return miniCluster .submitJob(jobGraph) .thenApplyAsync( FunctionUtils.uncheckedFunction( submissionResult -> { org.apache.flink.client.ClientUtils .waitUntilJobInitializationFinished( () -> miniCluster .getJobStatus( submissionResult .getJobID()) .get(), () -> miniCluster .requestJobResult( submissionResult .getJobID()) .get(), userCodeClassloader); return submissionResult; })) .thenApply( result -> new MiniClusterJobClient( result.getJobID(), miniCluster, userCodeClassloader, MiniClusterJobClient.JobFinalizationBehavior .SHUTDOWN_CLUSTER)) .whenComplete( (ignored, throwable) -> { if (throwable != null) { // We failed to create the JobClient and must shutdown to ensure // cleanup. shutDownCluster(miniCluster); } }) .thenApply(Function.identity()); }
MiniCluster
public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) { final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = getDispatcherGatewayFuture(); final CompletableFuture<InetSocketAddress> blobServerAddressFuture = createBlobServerAddress(dispatcherGatewayFuture); final CompletableFuture<Void> jarUploadFuture = uploadAndSetJobFiles(blobServerAddressFuture, jobGraph); final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture .thenCombine( dispatcherGatewayFuture, (Void ack, DispatcherGateway dispatcherGateway) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout)) .thenCompose(Function.identity()); return acknowledgeCompletableFuture.thenApply( (Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID())); }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。