对 Flink 1.12 部分源码进行学习和记录。
提交流程
- 1) 通过命令行调用
bin/flink
脚本文件以 yarn_per_job 模式提交作业,该文件内又调用了同目录下config.sh
,后者会加载部分环境相关参数,最终bin/flink
将org/apache/flink/client/cli/CliFrontend.java
作为主类提交- 1.1
CliFrontend.java
的 main 方法中,依次进行以下 3 步,其中第 3 步中会依次 add 三种不同 CLI(Generic、FlinkYarnSession、Default)// 1. find the configuration directory final String configurationDirectory = getConfigurationDirectoryFromEnv(); // 2. load the global configuration final Configuration configuration = GlobalConfiguration loadConfiguration(configurationDirectory); // 3. load the custom command lines final List<CustomCommandLine> customCommandLines = loadCustomCommandLines(configuration, configurationDirectory);
- 1.2 解析参数和运行
retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
- 1.2.1 根据传入的参数 args[0],使用 switch-case 选择对应模式(run、cancel、savepoint 等),此处选择 run
case ACTION_RUN: run(params); return 0;
- 1.2.1.1 分别获取默认配置、命令行选项配置并匹配
final Options commandOptions = CliFrontendParser.getRunCommandOptions(); final CommandLine commandLine = getCommandLine(commandOptions, args, true);
- 1.2.1.2 根据命令行选项找出对应的 CustomCommandLine
final CustomCommandLine activeCommandLine = validateAndGetActiveCommandLine(checkNotNull(commandLine));
- 1.2.1.2.1 依次遍历,寻找 1.1 中三种 CLI 中首先活跃的模式(-t => Generic;-m 或 AppID 或指定执行器为 Yarn => FlinkYarnSession;兜底为 Default)(依次寻找三个 CLI 实现类的 isActive 方法)
for (CustomCommandLine cli : customCommandLines) { LOG.debug("Checking custom commandline {}, isActive: {}", cli, cli.isActive(commandLine)); if (cli.isActive(commandLine)) { return cli; }
- 1.2.1.2.1 依次遍历,寻找 1.1 中三种 CLI 中首先活跃的模式(-t => Generic;-m 或 AppID 或指定执行器为 Yarn => FlinkYarnSession;兜底为 Default)(依次寻找三个 CLI 实现类的 isActive 方法)
- 1.2.1.3 获取 -c -p -j -C -d等参数,添加 Jar 包及依赖
final ProgramOptions programOptions = ProgramOptions.create(commandLine); final List<URL> jobJars = getJobJarAndDependencies(programOptions);
- 1.2.1.4 获取有效配置(HA_ID、AppID、Target、JM、TM 内存、slot 数目),最底层是通过 CLI 实现类的 toConfiguration 实现
final Configuration effectiveConfiguration = getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars);
- 1.2.1.5 执行程序,底层为 ClientUtils 的 executeProgram 方法(内部配置执行环境供用户代码获取,并调用用户的 main 方法)
executeProgram(effectiveConfiguration, program);
- 1.2.1.1 分别获取默认配置、命令行选项配置并匹配
- 1.2.1 根据传入的参数 args[0],使用 switch-case 选择对应模式(run、cancel、savepoint 等),此处选择 run
- 1.1
- 2) 用户代码 main 方法中,最后通过
env.execute()
执行,org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java 中有许多重载的此方法(此处 StreamGraph 通过getStreamGraph(jobName)
方法产生),最终执行以下代码CompletableFuture<JobClient> jobClientFuture = executorFactory.getExecutor(configuration).execute(streamGraph, configuration, userClassloader);
- 2.1 进入方法,寻找到 PipelineExecutor 接口的实现类
YarnJobClusterExecutor
,在其直接父类 AbstractJobClusterExecutor 中找到真正执行的 execute 方法- 2.1.1 StreamGraph 转换为 JobGraph
final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
- 2.1.2 创建 Yarn 客户端,封装进 Descriptor
final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)
- 2.1.3 获取集群配置,JM、TM 内存、每个 TM 的 slot 数目
final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
- 2.1.4 部署集群
final ClusterClientProvider<ClusterID> clusterClientProvider = clusterDescriptor .deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode()); LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());
- 2.1.4.1 YarnAppName 设置、jar 包路径核验、conf 路径核验、Yarn 核数是否足够核验、队列、Yarn 资源核验,通过则最后启动 AM
ApplicationReport report = startAppMaster( flinkConfiguration, applicationName, yarnClusterEntrypoint, jobGraph, yarnClient, yarnApplication, validClusterSpecification);
- 2.1.4.1.1 内部与 Yarn 常规启动 AM 思想类似(与 HDFS 交互),代码此处不再给出
- 2.1.4.1 YarnAppName 设置、jar 包路径核验、conf 路径核验、Yarn 核数是否足够核验、队列、Yarn 资源核验,通过则最后启动 AM
- 2.1.1 StreamGraph 转换为 JobGraph
- 2.1 进入方法,寻找到 PipelineExecutor 接口的实现类
- 3) 在 2.1.4.1 中的 yarnClusterEntrypoint 即为 AM 执行的入口类(org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java),找到 main 方法,配置环境后执行
ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);
- 3.1 启动集群
clusterEntrypoint.startCluster();
- 3.1.1
runCluster(configuration, pluginManager);
- 3.1.1.1 初始化服务,设置 Rpc 相关参数
initializeServices(configuration, pluginManager);
- 3.1.1.2 创建和启动 JobManager 里的组件:ResourceManager(含 SlotManager,管理资源及向 Yarn 申请资源)、Dispatcher、JobMaster(过程中会生成 ExecutionGraph)(含 SlotPool,真正发送资源请求) 注:内部源码中,
dispatcher.start();
会最终调用自身的onStart
方法clusterComponent = dispatcherResourceManagerComponentFactory.create( configuration, ioExecutor, commonRpcService, haServices, blobServer, heartbeatServices, metricRegistry, archivedExecutionGraphStore, new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()), this);
- 3.1.1.1 初始化服务,设置 Rpc 相关参数
- 3.1.1
- 3.1 启动集群
- 4) 找到 org.apache.flink.yarn.YarnTaskExecutorRunner 类,查看 TM 相关,找到 main 方法,会启动 TM(向 RM 注册 slot,RM 中的 SlotManager 分配 slot)(与前述类似点击即可,无明显难点,故不在此展开叙述)