在IDEA中 基于内核的Flink任务提交

内部实现Flink任务的提交,本文尽量以通俗易懂的方式去解释如果在内部去提交一个flink任务,目前已经实现了standalone、yarn-perjob、yarn-session、yarn-application模式的任务的部署提交

1. 什么是内部提交

想想我们以前部署Flink任务的方式,就是在命令行界面,调用flink run然后指定参数提交到对应的集群中去。

什么是内部提交呢?想想一个场景,现在我们需要做一个计算平台,在计算平台上去写sql管理任务等等,然后提交到集群中去。网上有一种解决方案是在计算平台的程序端,利用Runtime.exec(cmd)去构建flink命令然后执行该命令进行提交,实际上这有点套娃了,为什么这么说呢?

本质上flink run命令也是调用java程序解析客户端命令,然后部署到集群中的,所以我们完全可以绕开构建命令的这个环节。

2.了解Flink部署流程

关于flink部署流程,我推荐去看一下尚硅谷的视频解析(针对1.12版本的),看了之后再仔细捉摸一下就能对flink的任务提交流程有一个很好的理解了。视频链接我放在了文章最后的参考资料上

2.1 几个概念

StreamGraph

JobGraph

如果你对flink的相关运行概念不是很了解,那么就可以参考这篇文章:Flink 作业执行深度解析

2.2 部署相关的类

我们知道flink程序最终都会去执行StreamExecutionEnvironment#execute()方法,内部通过跟踪可以看到最核心的就是在

    /**
     * Triggers the program execution asynchronously. The environment will execute all parts of the
     * program that have resulted in a "sink" operation. Sink operations are for example printing
     * results or forwarding them to a message queue.
     *
     * @param streamGraph the stream graph representing the transformations
     * @return A {@link JobClient} that can be used to communicate with the submitted job, completed
     *     on submission succeeded.
     * @throws Exception which occurs during job execution.
     */
@Internal
    public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
        checkNotNull(streamGraph, "StreamGraph cannot be null.");
        checkNotNull(
                configuration.get(DeploymentOptions.TARGET),
                "No execution.target specified in your configuration file.");

        final PipelineExecutorFactory executorFactory =
                executorServiceLoader.getExecutorFactory(configuration);

        checkNotNull(
                executorFactory,
                "Cannot find compatible factory for specified execution.target (=%s)",
                configuration.get(DeploymentOptions.TARGET));

        CompletableFuture<JobClient> jobClientFuture =
                executorFactory
                        .getExecutor(configuration)
                        .execute(streamGraph, configuration, userClassloader);

        try {
            JobClient jobClient = jobClientFuture.get();
            jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
            return jobClient;
        } catch (ExecutionException executionException) {
            final Throwable strippedException =
                    ExceptionUtils.stripExecutionException(executionException);
            jobListeners.forEach(
                    jobListener -> jobListener.onJobSubmitted(null, strippedException));

            throw new FlinkException(
                    String.format("Failed to execute job '%s'.", streamGraph.getJobName()),
                    strippedException);
        }
    }

3 提交部署程序实现

首先说一下我们的思路:

flink内部都是将我们的算子去转换成StreamGraph,然后将StreamGraph转换为JobGraph,再将JobGraph提交到集群的。

所以我们就是要想办法将我们的程序jar包转换为JobGraph,然后利用ClusterDescriptor的实现类去完成JobGraph的提交

ClusterDescriptor
    - StandaloneClusterDescriptor
    - YarnClusterDescriptor

3.1 项目创建

项目模块目前只需要两个,flink-deployer-commonflink-deployer-1.13

3.1.1 模块解释

common模块包含一些公共类

deployer模块是真正的针对不同部署模式的提交程序

3.1.2 common模块引入的依赖

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>

3.1.3 deployer模块引入的依赖

<properties>
	<scala.binary.version>2.12</scala.binary.version>
    <flink.version>1.13.0</flink.version>
</properties>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<!--引入flink-yarn-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-yarn_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

3.2 代码编写

下面以模块名加类名来进行表示

3.2.1 common#FlinkDeployer

public interface FlinkDeployer {
    /**
     * 提交Flink作业
     * @return
     */
    DeployResponse deployJob(DeployParam deployParam) throws Exception;


}

3.2.2 common#DeployParam

部署参数

@Data
public class DeployJobParam {

    private String sql;

    private String jobType;

    /** 集群的地址 */
    private String clusterAddress;

    private String mainAppJar;

    /** 对于yarn-session模式有用 */
    private String yarnApplicationId;

    private String mainClassName;

    private String mainClassArgs;

    private String parameters;

    /** 作业的第三方依赖,例如连接器,udf等,例如:/home/xcchen/bdcp/udf/flink-sql-udf-test.jar */
    private List<String> dependencies;

    private String deployMode;

    private String flinkHome;

    /** used by application mode */
    private List<String> providedLibDirs;
}

3.2.3 common#DeployResponse

部署的响应结果

/** the response for deploy job */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class DeployJobResponse {

    private String clusterId;

    private String jobId;

    private String clusterWebInterfaceUrl;

    private String jobStatus;
}

3.2.4 common#AbstractFlinkDeployer

抽象父类,目的是隔离类加载器,实现多版本支持

public abstract class AbstractFlinkDeployer implements FlinkDeployer{

    protected abstract class FlinkDeployeHandler {
        public Object call() throws Exception{

            Object result;

            ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader();
            // 临时更改 ClassLoader
            Thread.currentThread().setContextClassLoader(AbstractFlinkDeployer.class.getClassLoader());
            result = handle();
            // 还原为之前的 ClassLoader
            Thread.currentThread().setContextClassLoader(oldClassLoader);

            return result;
        }

        public abstract Object handle() throws Exception;
    }

    protected Object handle(FlinkDeployeHandler flinkDeployeHandler) throws Exception{
        return flinkDeployeHandler.call();
    }

}

3.2.5 deployer#FlinkDeployerBase13

这是flink1.13部署程序的基础类,后续出现的类都继承自该类,该类封装了一些通用的方法和属性

public abstract class FlinkDeployerBase13 extends AbstractFlinkDeployer {

    private static final Logger logger = LoggerFactory.getLogger(FlinkDeployerBase13.class);

    protected Configuration configuration;

    /** FLINK根目录 */
    protected String FLINK_HOME;

    /** FLINK配置目录 */
    protected String FLINK_CONF_DIR;

    /** FLINK根目录下的lib目录 */
    protected String FLINK_LIB_DIR;

    /** FLINK根目录下的plugins目录 */
    protected String FLINK_PLUGINS_DIR;

    protected PackagedProgram program;

    protected JobGraph jobGraph;

    public FlinkDeployerBase13() {}

    public FlinkDeployerBase13(DeployJobParam deployJobParam) {
        this.FLINK_HOME = FlinkUtils.getFlinkHome(deployJobParam.getFlinkHome());
        this.FLINK_CONF_DIR = FLINK_HOME + "/conf";
        this.configuration = GlobalConfiguration.loadConfiguration(FLINK_CONF_DIR);
        this.FLINK_LIB_DIR = FLINK_HOME + "/lib";
        this.FLINK_PLUGINS_DIR = FLINK_HOME + "/plugins";
        this.initConfiguration(deployJobParam);
        if (!DeployModeEnum.FLINK_YARN_APPLICATION
                .getName()
                .equals(deployJobParam.getDeployMode())) { // application模式不需要去构建包
            try {
                this.program = this.buildProgram(deployJobParam);
                this.jobGraph =
                        PackagedProgramUtils.createJobGraph(
                                program,
                                configuration,
                                configuration.get(CoreOptions.DEFAULT_PARALLELISM),
                                true);
            } catch (Exception e) {
                throw new RuntimeException("build program and job graph fail", e);
            }
        }
    }

    /**
     * init configuration for perjob,session and application
     *
     * @param deployJobParam
     */
    private void initConfiguration(DeployJobParam deployJobParam) {
        DeployModeEnum deployMode = DeployModeEnum.getByName(deployJobParam.getDeployMode());
        switch (deployMode) {
            case FLINK_STANDALONE:
                DeployUtils.getStandaloneConfiguration(configuration, deployJobParam);
                break;
            case FLINK_YARN_SESSION:
                DeployUtils.getYarnSessionConfiguration(configuration, deployJobParam);
                break;
            case FLINK_YARN_PERJOB:
                DeployUtils.getYarnPerjobConfiguration(
                        configuration,
                        deployJobParam,
                        FLINK_CONF_DIR,
                        FLINK_HOME,
                        FLINK_LIB_DIR,
                        FLINK_PLUGINS_DIR);
                break;
            case FLINK_YARN_APPLICATION:
                DeployUtils.getApplicationConfiguration(
                        configuration, deployJobParam, FLINK_CONF_DIR);
                break;
            default:
                throw new IllegalArgumentException();
        }
    }

    /**
     * @param deployJobParam
     * @return
     * @throws ProgramInvocationException
     */
    private PackagedProgram buildProgram(DeployJobParam deployJobParam) throws Exception {

        String jobType = deployJobParam.getJobType();

        String[] programArgs = {};
        if (deployJobParam.getMainClassArgs() != null) {
            programArgs = deployJobParam.getMainClassArgs().split("\\s+");
        }
        String jarFilePath =
                jobType.equals(JobTypeEnum.FLINK_SQL.name())
                        ? ConstantUtils.EP_FLINK_SQL_CLIENT
                        : deployJobParam.getMainAppJar();

        // TODO 针对不同的作业,入口类是不同的
        String entryPointClass =
                jobType.equals(JobTypeEnum.FLINK_SQL.name())
                        ? ConstantUtils.EP_FLINK_SQL_CLIENT_ENTRYPOINT_CLASS
                        : deployJobParam.getMainClassName();

        PackagedProgram.Builder builder =
                PackagedProgram.newBuilder()
                        .setJarFile(new File(jarFilePath))
                        .setEntryPointClassName(entryPointClass)
                        .setArguments(programArgs);

        if (deployJobParam.getDependencies() != null
                && deployJobParam.getDependencies().size() > 0) {

            builder.setUserClassPaths(
                    deployJobParam.getDependencies().stream()
                            .map(
                                    ele -> {
                                        try {
                                            return new URL("file://" + ele);
                                        } catch (Exception e) {
                                            throw new IllegalArgumentException(
                                                    "cant cast dependencies to user classpath", e);
                                        }
                                    })
                            .collect(Collectors.toList()));
        }

        logger.info("build package program success...");
        return builder.build();
    }

    /**
     * deploy job graph for flink sql client,in this case deployParam only support some needed
     * config info
     *
     * @param jobGraph
     * @param deployJobParam
     * @return
     */
    public abstract JobClient deployJobGraph(JobGraph jobGraph, DeployJobParam deployJobParam)
            throws Exception;

    protected abstract Tuple2<JobClient, ClusterClient> deployJobGraphInternal(
            JobGraph jobGraph, ClassLoader userCodeClassLoader) throws Exception;
}

3.2.6 deployer#StandaloneDeployer13

提交到standalone集群的类

public class StandaloneDeployer13 extends FlinkDeployerBase13 {

    public StandaloneDeployer13() {}

    public StandaloneDeployer13(DeployJobParam deployJobParam) {
        super(deployJobParam);
    }

    @Override
    public DeployJobResponse deployJob(DeployJobParam deployJobParam) throws Exception {

        Object jobClientObject =
                this.handle(
                        new FlinkDeployeHandler() {
                            @Override
                            public Object handle() throws Exception {
                                return deployJobGraphInternal(
                                        jobGraph, program.getUserCodeClassLoader());
                            }
                        });

        return DeployUtils.tuple2ObjectToResponse(jobClientObject);
    }

    @Override
    public JobClient deployJobGraph(JobGraph jobGraph, DeployJobParam deployJobParam)
            throws Exception {

        Object jobClientObject =
                this.handle(
                        new FlinkDeployeHandler() {
                            @Override
                            public Object handle() throws Exception {
                                return deployJobGraphInternal(
                                        jobGraph, getClass().getClassLoader());
                            }
                        });

        return (JobClient) jobClientObject;
    }

    @Override
    protected Tuple2<JobClient, ClusterClient> deployJobGraphInternal(
            JobGraph jobGraph, ClassLoader userCodeClassLoader) throws Exception {
        StandaloneClientFactory clusterClientFactory = new StandaloneClientFactory();

        final StandaloneClusterDescriptor clusterDescriptor =
                clusterClientFactory.createClusterDescriptor(configuration);

        final StandaloneClusterId clusterId = clusterClientFactory.getClusterId(configuration);

        final ClusterClientProvider<StandaloneClusterId> clusterClientProvider =
                clusterDescriptor.retrieve(clusterId);

        final ClusterClient<StandaloneClusterId> clusterClient =
                clusterClientProvider.getClusterClient();

        final CompletableFuture<JobClient> jobClientFuture =
                clusterClient
                        .submitJob(jobGraph)
                        .thenApplyAsync(
                                FunctionUtils.uncheckedFunction(
                                        jobId -> {
                                            ClientUtils.waitUntilJobInitializationFinished(
                                                    () -> clusterClient.getJobStatus(jobId).get(),
                                                    () ->
                                                            clusterClient
                                                                    .requestJobResult(jobId)
                                                                    .get(),
                                                    userCodeClassLoader);
                                            return jobId;
                                        }))
                        .thenApplyAsync(
                                jobID ->
                                        (JobClient)
                                                new ClusterClientJobClientAdapter<>(
                                                        clusterClientProvider,
                                                        jobID,
                                                        userCodeClassLoader))
                        .whenCompleteAsync((ignored1, ignored2) -> clusterClient.close());

        JobClient jobClient = jobClientFuture.get();

        return new Tuple2<>(jobClient, clusterClient);
    }
}

3.2.7 deployer#YarnDeployer13

/** 用于存储Yarn部署相关的一些公共变量 */
public abstract class YarnDeployer13 extends FlinkDeployerBase13 {

    protected YarnClusterClientFactory clusterClientFactory = new YarnClusterClientFactory();

    protected ClusterSpecification clusterSpecification;

    protected YarnClusterDescriptor clusterDescriptor;

    public YarnDeployer13() {}

    public YarnDeployer13(DeployJobParam deployJobParam) {
        super(deployJobParam);

        try {
            this.handle(
                    new FlinkDeployeHandler() {
                        @Override
                        public Object handle() throws Exception {
                            clusterSpecification =
                                    clusterClientFactory.getClusterSpecification(configuration);
                            clusterDescriptor =
                                    clusterClientFactory.createClusterDescriptor(configuration);
                            return null;
                        }
                    });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

3.2.8 deployer#YarnApplicationDeployer13

提交yarn-application模式的类

public class YarnApplicationDeployer13 extends YarnDeployer13 {

    public YarnApplicationDeployer13() {}

    public YarnApplicationDeployer13(DeployJobParam deployJobParam) {
        super(deployJobParam);
    }

    @Override
    public DeployJobResponse deployJob(DeployJobParam deployJobParam) throws Exception {

        Object clusterClientObj =
                this.handle(
                        new FlinkDeployeHandler() {
                            @Override
                            public Object handle() throws Exception {
                                // 设置应用程序的主类名称和主类参数
                                final ApplicationConfiguration applicationConfiguration =
                                        new ApplicationConfiguration(
                                                deployJobParam.getMainClassArgs().split("\\s+"),
                                                deployJobParam.getMainClassName());

                                final ClusterClientProvider<ApplicationId> clusterClientProvider =
                                        clusterDescriptor.deployApplicationCluster(
                                                clusterSpecification, applicationConfiguration);

                                return clusterClientProvider.getClusterClient();
                            }
                        });

        ClusterClient<ApplicationId> clusterClient =
                (ClusterClient<ApplicationId>) clusterClientObj;

        // 这里有异步问题
        List<JobStatusMessage> collect = new ArrayList<>(clusterClient.listJobs().get());

        return new DeployJobResponse(
                clusterClient.getClusterId().toString(),
                collect.get(0).getJobId().toHexString(),
                clusterClient.getWebInterfaceURL(),
                collect.get(0).getJobState().toString());
    }

    @Override
    public JobClient deployJobGraph(JobGraph jobGraph, DeployJobParam deployJobParam)
            throws Exception {
        throw new UnsupportedOperationException(
                "yarn application mode can't deploy jobGraph,please use deployJob(DeployParam deployParam) to deploy job");
    }

    @Override
    protected Tuple2<JobClient, ClusterClient> deployJobGraphInternal(
            JobGraph jobGraph, ClassLoader userCodeClassLoader) throws Exception {
        throw new UnsupportedOperationException(
                "yarn application mode can't deploy jobGraph,please use deployJob(DeployParam deployParam) to deploy job");
    }
}

3.2.9 deployer#YarnSessionDeployer13

提交yarn-session模式的类

public class YarnSessionDeployer13 extends YarnDeployer13 {

    public YarnSessionDeployer13() {}

    public YarnSessionDeployer13(DeployJobParam deployJobParam) {
        super(deployJobParam);
    }

    @Override
    public DeployJobResponse deployJob(DeployJobParam deployJobParam) throws Exception {

        Object tuple2Object =
                this.handle(
                        new FlinkDeployeHandler() {
                            @Override
                            public Object handle() throws Exception {
                                return deployJobGraphInternal(
                                        jobGraph, program.getUserCodeClassLoader());
                            }
                        });

        return DeployUtils.tuple2ObjectToResponse(tuple2Object);
    }

    @Override
    public JobClient deployJobGraph(JobGraph jobGraph, DeployJobParam deployJobParam)
            throws Exception {

        Object jobClientObject =
                this.handle(
                        new FlinkDeployeHandler() {
                            @Override
                            public Object handle() throws Exception {
                                return deployJobGraphInternal(
                                        jobGraph, getClass().getClassLoader());
                            }
                        });

        return (JobClient) jobClientObject;
    }

    @Override
    protected Tuple2<JobClient, ClusterClient> deployJobGraphInternal(
            JobGraph jobGraph, ClassLoader userCodeClassLoader) throws Exception {

        final ApplicationId clusterId = clusterClientFactory.getClusterId(configuration);

        final ClusterClientProvider<ApplicationId> clusterClientProvider =
                clusterDescriptor.retrieve(clusterId);

        final ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();

        final CompletableFuture<JobClient> jobClientFuture =
                clusterClient
                        .submitJob(jobGraph)
                        .thenApplyAsync(
                                FunctionUtils.uncheckedFunction(
                                        jobId -> {
                                            ClientUtils.waitUntilJobInitializationFinished(
                                                    () -> clusterClient.getJobStatus(jobId).get(),
                                                    () ->
                                                            clusterClient
                                                                    .requestJobResult(jobId)
                                                                    .get(),
                                                    userCodeClassLoader);
                                            return jobId;
                                        }))
                        .thenApplyAsync(
                                jobID ->
                                        (JobClient)
                                                new ClusterClientJobClientAdapter<>(
                                                        clusterClientProvider,
                                                        jobID,
                                                        userCodeClassLoader))
                        .whenCompleteAsync((ignored1, ignored2) -> clusterClient.close());

        JobClient jobClient = jobClientFuture.get();
        return new Tuple2<>(jobClient, clusterClient);
    }
}

3.2.10 deployer#YarnPerjobDeployer13

提交yarn perjob模式的类

public class YarnPerjobDeployer13 extends YarnDeployer13 {

    public YarnPerjobDeployer13() {}

    public YarnPerjobDeployer13(DeployJobParam deployJobParam) {
        super(deployJobParam);
    }

    @Override
    public DeployJobResponse deployJob(DeployJobParam deployJobParam) throws Exception {

        Object tuple2Object =
                this.handle(
                        new FlinkDeployeHandler() {
                            @Override
                            public Object handle() throws Exception {
                                return deployJobGraphInternal(
                                        jobGraph, program.getUserCodeClassLoader());
                            }
                        });

        return DeployUtils.tuple2ObjectToResponse(tuple2Object);
    }

    @Override
    public JobClient deployJobGraph(JobGraph jobGraph, DeployJobParam deployJobParam)
            throws Exception {
        Object jobClientObject =
                this.handle(
                        new FlinkDeployeHandler() {
                            @Override
                            public Object handle() throws Exception {
                                return deployJobGraphInternal(
                                        jobGraph, getClass().getClassLoader());
                            }
                        });
        return (JobClient) jobClientObject;
    }

    @Override
    protected Tuple2<JobClient, ClusterClient> deployJobGraphInternal(
            JobGraph jobGraph, ClassLoader userCodeClassLoader) throws Exception {

        final ClusterClientProvider<ApplicationId> clusterClientProvider =
                clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, true);

        final CompletableFuture<ClusterClientJobClientAdapter<ApplicationId>>
                jobClientAdapterCompletableFuture =
                        CompletableFuture.completedFuture(
                                new ClusterClientJobClientAdapter<>(
                                        clusterClientProvider,
                                        jobGraph.getJobID(),
                                        userCodeClassLoader));

        final ClusterClientJobClientAdapter<ApplicationId> jobClientProvider =
                jobClientAdapterCompletableFuture.get();

        return new Tuple2<>(jobClientProvider, clusterClientProvider.getClusterClient());
    }
}

3.2.11 deployer#DeployUtils

public class DeployUtils {

    /**
     * 用户配置覆盖默认配置
     *
     * @param configuration
     * @param deployJobParam
     * @return
     */
    public static Configuration covertDefaultConfiguration(
            Configuration configuration, DeployJobParam deployJobParam) {
        configuration.set(CoreOptions.DEFAULT_PARALLELISM, 1);

        // 用户配置覆盖默认配置
        if (!StringUtils.isNullOrWhitespaceOnly(deployJobParam.getParameters())) {
            final String[] parameters =
                    deployJobParam.getParameters().replaceAll("\n", "").split("\\s+");
            Arrays.stream(parameters)
                    .map(ele -> ele.replaceAll("-D", ""))
                    .forEach(
                            ele -> {
                                final String[] split = ele.split("=");
                                configuration.setString(split[0], split[1]);
                            });
        }
        return configuration;
    }

    public static Configuration getYarnPerjobConfiguration(
            Configuration configuration,
            DeployJobParam deployJobParam,
            String configurationDirectory,
            String flinkHome,
            String flinkLibDir,
            String flinkPluginsDir) {
        configuration.set(DeploymentOptions.TARGET, YarnDeploymentTarget.PER_JOB.getName());
        configuration.set(DeploymentOptions.SHUTDOWN_IF_ATTACHED, true);
        configuration.set(DeploymentOptionsInternal.CONF_DIR, configurationDirectory);
        configuration.set(
                YarnConfigOptions.FLINK_DIST_JAR,
                FlinkUtils.getFlinkDistPathByFlinkHome(flinkHome));
        configuration.set(
                YarnConfigOptions.SHIP_FILES,
                Arrays.asList(
                        flinkLibDir, flinkPluginsDir, deployJobParam.getDependencies().get(0)));

        // 默认配置
        configuration.setString(TaskManagerOptions.NUM_TASK_SLOTS.key(), "1");

        return covertDefaultConfiguration(configuration, deployJobParam);
    }

    public static Configuration getYarnSessionConfiguration(
            Configuration configuration, DeployJobParam deployJobParam) {
        configuration.set(YarnConfigOptions.APPLICATION_ID, deployJobParam.getYarnApplicationId());
        return covertDefaultConfiguration(configuration, deployJobParam);
    }

    public static Configuration getStandaloneConfiguration(
            Configuration configuration, DeployJobParam deployJobParam) {
        final String clusterAddress = deployJobParam.getClusterAddress();
        if (clusterAddress != null
                && !"".equals(clusterAddress)
                && clusterAddress.matches("(\\d+\\.){3}\\d+:\\d+")) {
            configuration.set(JobManagerOptions.ADDRESS, clusterAddress.split(":")[0]);
            configuration.set(
                    JobManagerOptions.PORT, Integer.parseInt(clusterAddress.split(":")[1]));
        }
        return covertDefaultConfiguration(configuration, deployJobParam);
    }

    /**
     * get application configuration
     *
     * @param configuration
     * @param deployJobParam
     * @return
     */
    public static Configuration getApplicationConfiguration(
            Configuration configuration, DeployJobParam deployJobParam, String flinkConfDir) {
        configuration.set(DeploymentOptions.TARGET, YarnDeploymentTarget.APPLICATION.getName());
        configuration.set(DeploymentOptions.SHUTDOWN_IF_ATTACHED, true);
        configuration.set(DeploymentOptionsInternal.CONF_DIR, flinkConfDir);

        // 用户应用程序jar不提前上传到hdfs上,而是随着任务的生命周期进行删除
        // TODO 对于SQL作业应该提前上传ep-flink-sql-client到远程hdfs上,所以这里需要判断是否为SQL作业
        configuration.set(
                PipelineOptions.JARS, Collections.singletonList(deployJobParam.getMainAppJar()));

        // 配置lib目录和plugin目录到provided.lib.dir下,perjob模式不能设置
        // TODO 这里的目录不应该写死
        configuration.set(
                YarnConfigOptions.PROVIDED_LIB_DIRS,
                Arrays.asList(
                        "hdfs://localhost:9820/flink/dist/1.13.0/lib",
                        "hdfs://localhost:9820/flink/dist/1.13.0/plugins",
                        "hdfs://localhost:9820/flink/dist/1.13.0/udf"));
        // 默认配置
        configuration.setString(TaskManagerOptions.NUM_TASK_SLOTS.key(), "1");
        return DeployUtils.covertDefaultConfiguration(configuration, deployJobParam);
    }

    public static DeployJobResponse tuple2ToResponse(Tuple2<JobClient, ClusterClient> tuple2) {
        try {
            return new DeployJobResponse(
                    tuple2.f1.getClusterId().toString(),
                    tuple2.f0.getJobID().toString(),
                    tuple2.f1.getWebInterfaceURL(),
                    tuple2.f0.getJobStatus().get().toString());
        } catch (Exception e) {
            throw new RuntimeException("tuple to deployResponse fail.", e);
        }
    }

    public static DeployJobResponse tuple2ObjectToResponse(Object tupleObject) {
        if (!(tupleObject instanceof Tuple2)) {
            throw new RuntimeException(
                    "the input only used for method:deployJobGraphInternal's return value");
        }

        Tuple2<JobClient, ClusterClient> tuple2 = (Tuple2<JobClient, ClusterClient>) tupleObject;

        return tuple2ToResponse(tuple2);
    }
}

3.3 测试样例

3.3.1 环境准备

  • 需要一个flink1.13版本的客户端,如果是standalone模式请先启动集群
  • 用户的应用程序jar包
  • 如果有相关的依赖,例如udf,请使用setDependencies()方法设置
  • 在idea中为测试程序设置环境变量(或者通过setFlinkHome()方法设置):例如FLINK_HOME=/home/xcchen/software/flink/flink-1.13.0
  • 如果要提交到yarn,不要忘了设置HADOOP_HOMEHADOOP_CONF_DIR
  • 也可以通过-Dflink.hadoop 高级参数去指定hdfs地址和yarn地址,从而不配置HADOOP_HOME

3.3.2 standalone模式提交测试

public class StandaloneDeployer13Test {

    @Test
    public void deploy() throws Exception{
        FlinkDeployer flinkDeployer = new StandaloneDeployer13();
        DeployParam deployParam = new DeployParam();
        deployParam.setJobType(JobTypeEnum.FLINK_JAR.name());
        deployParam.setMainClassName("top.yangc.flink.sql.connector.DataGenTest");
        deployParam.setMainClassArgs("");
        deployParam.setClusterAddress("192.168.58.11:8081");
        deployParam.setMainAppJar("/home/xcchen/software/flink/flink-1.13.0/examples/flink-sql-example-1.0.0.jar");
        deployParam.setParameters("-Dtaskmanager.numberOfTaskSlots=2 -Dparallelism.default=1");
//        deployParam.setDependencies(Collections.singletonList("file:///home/xcchen/software/flink/flink-1.13.0/examples/flink-udf-example-1.0.0.jar"));
        flinkDeployer.deployJob(deployParam);
    }

}

3.3.3 perjob模式提交测试

public class YarnPerjobDeployer13Test {


    @Test
    public void deploy() throws Exception{
        FlinkDeployer flinkDeployer = new YarnPerjobDeployer13();
        DeployParam deployParam = new DeployParam();
        deployParam.setJobType(JobTypeEnum.FLINK_JAR.name());
        deployParam.setMainClassName("top.yangc.flink.sql.connector.UdfDependencyTest");
        deployParam.setMainClassArgs("");
        deployParam.setMainAppJar("/home/xcchen/software/flink/flink-1.13.0/examples/flink-sql-example-1.0.0.jar");

        deployParam.setParameters("-Dtaskmanager.numberOfTaskSlots=2 -Dparallelism.default=2");

        //连接远程hdfs集群
//        System.setProperty("HADOOP_USER_NAME","root");
//        deployParam.setParameters("-Dtaskmanager.numberOfTaskSlots=2 -Dparallelism.default=2 -Dflink.hadoop.fs.defaultFS=hdfs://192.168.58.11:9820 -Dflink.yarn.resourcemanager.hostname=192.168.58.11");

        deployParam.setDependencies(Collections.singletonList("file:///home/xcchen/software/flink/flink-1.13.0/examples/flink-udf-example-1.0.0.jar"));

        flinkDeployer.deployJob(deployParam);
    }

}

3 思考

如何让提交程序去兼容多版本呢?例如计算平台不应该仅仅考虑支持flink 1.13版本的程序,还需要考虑支持其他的版本?

如何将sql转换为JobGraph同时对接我们的部署程序,实现一个类sql-client的功能呢?

这些我们下次再聊,有问题的也可以私信我~

参考资料

尚硅谷flink内核视频链接

streamx开源项目的提交模块,项目链接:http://www.streamxhub.com/

感谢上述项目的参考