Flink 内核源码分析

2022/03/06 Flink 共 4158 字,约 12 分钟
欢迎

对 Flink 1.12 部分源码进行学习和记录。

提交流程

  • 1) 通过命令行调用 bin/flink 脚本文件以 yarn_per_job 模式提交作业,该文件内又调用了同目录下 config.sh,后者会加载部分环境相关参数,最终 bin/flinkorg/apache/flink/client/cli/CliFrontend.java 作为主类提交
    • 1.1 CliFrontend.java 的 main 方法中,依次进行以下 3 步,其中第 3 步中会依次 add 三种不同 CLI(Generic、FlinkYarnSession、Default)
        // 1. find the configuration directory
        final String configurationDirectory = getConfigurationDirectoryFromEnv();
        // 2. load the global configuration
        final Configuration configuration = GlobalConfiguration loadConfiguration(configurationDirectory);
        // 3. load the custom command lines
        final List<CustomCommandLine> customCommandLines =  loadCustomCommandLines(configuration, configurationDirectory);
      
    • 1.2 解析参数和运行
        retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
      
      • 1.2.1 根据传入的参数 args[0],使用 switch-case 选择对应模式(run、cancel、savepoint 等),此处选择 run
          case ACTION_RUN:
                  run(params);
                  return 0;
        
        • 1.2.1.1 分别获取默认配置、命令行选项配置并匹配
            final Options commandOptions = CliFrontendParser.getRunCommandOptions();
            final CommandLine commandLine = getCommandLine(commandOptions, args, true);
          
        • 1.2.1.2 根据命令行选项找出对应的 CustomCommandLine
            final CustomCommandLine activeCommandLine = validateAndGetActiveCommandLine(checkNotNull(commandLine));
          
          • 1.2.1.2.1 依次遍历,寻找 1.1 中三种 CLI 中首先活跃的模式(-t => Generic;-m 或 AppID 或指定执行器为 Yarn => FlinkYarnSession;兜底为 Default)(依次寻找三个 CLI 实现类的 isActive 方法)
              for (CustomCommandLine cli : customCommandLines) {
              LOG.debug("Checking custom commandline {}, isActive: {}", cli, cli.isActive(commandLine));
              if (cli.isActive(commandLine)) {
                  return cli;
              }
            
        • 1.2.1.3 获取 -c -p -j -C -d等参数,添加 Jar 包及依赖
            final ProgramOptions programOptions = ProgramOptions.create(commandLine);
            final List<URL> jobJars = getJobJarAndDependencies(programOptions);
          
        • 1.2.1.4 获取有效配置(HA_ID、AppID、Target、JM、TM 内存、slot 数目),最底层是通过 CLI 实现类的 toConfiguration 实现
            final Configuration effectiveConfiguration = getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars);
          
        • 1.2.1.5 执行程序,底层为 ClientUtils 的 executeProgram 方法(内部配置执行环境供用户代码获取,并调用用户的 main 方法)
            executeProgram(effectiveConfiguration, program);
          
  • 2) 用户代码 main 方法中,最后通过 env.execute() 执行,org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java 中有许多重载的此方法(此处 StreamGraph 通过 getStreamGraph(jobName) 方法产生),最终执行以下代码
      CompletableFuture<JobClient> jobClientFuture = executorFactory.getExecutor(configuration).execute(streamGraph, configuration, userClassloader);
    
    • 2.1 进入方法,寻找到 PipelineExecutor 接口的实现类 YarnJobClusterExecutor,在其直接父类 AbstractJobClusterExecutor 中找到真正执行的 execute 方法
      • 2.1.1 StreamGraph 转换为 JobGraph
          final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
        
      • 2.1.2 创建 Yarn 客户端,封装进 Descriptor
          final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)
        
      • 2.1.3 获取集群配置,JM、TM 内存、每个 TM 的 slot 数目
          final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
        
      • 2.1.4 部署集群
          final ClusterClientProvider<ClusterID> clusterClientProvider = clusterDescriptor
                  .deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode());
          LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());
        
        • 2.1.4.1 YarnAppName 设置、jar 包路径核验、conf 路径核验、Yarn 核数是否足够核验、队列、Yarn 资源核验,通过则最后启动 AM
            ApplicationReport report = startAppMaster(
                flinkConfiguration,
                applicationName,
                yarnClusterEntrypoint,
                jobGraph,
                yarnClient,
                yarnApplication,
                validClusterSpecification);
          
          • 2.1.4.1.1 内部与 Yarn 常规启动 AM 思想类似(与 HDFS 交互),代码此处不再给出
  • 3) 在 2.1.4.1 中的 yarnClusterEntrypoint 即为 AM 执行的入口类(org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java),找到 main 方法,配置环境后执行
      ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);
    
    • 3.1 启动集群
        clusterEntrypoint.startCluster();
      
      • 3.1.1
          runCluster(configuration, pluginManager);
        
        • 3.1.1.1 初始化服务,设置 Rpc 相关参数
            initializeServices(configuration, pluginManager);
          
        • 3.1.1.2 创建和启动 JobManager 里的组件:ResourceManager(含 SlotManager,管理资源及向 Yarn 申请资源)、Dispatcher、JobMaster(过程中会生成 ExecutionGraph)(含 SlotPool,真正发送资源请求) 注:内部源码中,dispatcher.start(); 会最终调用自身的 onStart 方法
            clusterComponent = dispatcherResourceManagerComponentFactory.create(
            configuration,
            ioExecutor,
            commonRpcService,
            haServices,
            blobServer,
            heartbeatServices,
            metricRegistry,
            archivedExecutionGraphStore,
            new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
            this);
          
  • 4) 找到 org.apache.flink.yarn.YarnTaskExecutorRunner 类,查看 TM 相关,找到 main 方法,会启动 TM(向 RM 注册 slot,RM 中的 SlotManager 分配 slot)(与前述类似点击即可,无明显难点,故不在此展开叙述)

文档信息

Search

    Table of Contents