在IDEA中 基于内核的Flink任务提交
内部实现Flink任务的提交,本文尽量以通俗易懂的方式去解释如果在内部去提交一个flink任务,目前已经实现了standalone、yarn-perjob、yarn-session、yarn-application模式的任务的部署提交
1. 什么是内部提交
想想我们以前部署Flink任务的方式,就是在命令行界面,调用flink run然后指定参数提交到对应的集群中去。
什么是内部提交呢?想想一个场景,现在我们需要做一个计算平台,在计算平台上去写sql管理任务等等,然后提交到集群中去。网上有一种解决方案是在计算平台的程序端,利用Runtime.exec(cmd)去构建flink命令然后执行该命令进行提交,实际上这有点套娃了,为什么这么说呢?
本质上flink run命令也是调用java程序解析客户端命令,然后部署到集群中的,所以我们完全可以绕开构建命令的这个环节。
2.了解Flink部署流程
关于flink部署流程,我推荐去看一下尚硅谷的视频解析(针对1.12版本的),看了之后再仔细捉摸一下就能对flink的任务提交流程有一个很好的理解了。视频链接我放在了文章最后的参考资料上
2.1 几个概念
StreamGraph
JobGraph
如果你对flink的相关运行概念不是很了解,那么就可以参考这篇文章:Flink 作业执行深度解析
2.2 部署相关的类
我们知道flink程序最终都会去执行StreamExecutionEnvironment#execute()方法,内部通过跟踪可以看到最核心的就是在
/**
* Triggers the program execution asynchronously. The environment will execute all parts of the
* program that have resulted in a "sink" operation. Sink operations are for example printing
* results or forwarding them to a message queue.
*
* @param streamGraph the stream graph representing the transformations
* @return A {@link JobClient} that can be used to communicate with the submitted job, completed
* on submission succeeded.
* @throws Exception which occurs during job execution.
*/
@Internal
public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
checkNotNull(streamGraph, "StreamGraph cannot be null.");
checkNotNull(
configuration.get(DeploymentOptions.TARGET),
"No execution.target specified in your configuration file.");
final PipelineExecutorFactory executorFactory =
executorServiceLoader.getExecutorFactory(configuration);
checkNotNull(
executorFactory,
"Cannot find compatible factory for specified execution.target (=%s)",
configuration.get(DeploymentOptions.TARGET));
CompletableFuture<JobClient> jobClientFuture =
executorFactory
.getExecutor(configuration)
.execute(streamGraph, configuration, userClassloader);
try {
JobClient jobClient = jobClientFuture.get();
jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
return jobClient;
} catch (ExecutionException executionException) {
final Throwable strippedException =
ExceptionUtils.stripExecutionException(executionException);
jobListeners.forEach(
jobListener -> jobListener.onJobSubmitted(null, strippedException));
throw new FlinkException(
String.format("Failed to execute job '%s'.", streamGraph.getJobName()),
strippedException);
}
}
3 提交部署程序实现
首先说一下我们的思路:
flink内部都是将我们的算子去转换成StreamGraph,然后将StreamGraph转换为JobGraph,再将JobGraph提交到集群的。
所以我们就是要想办法将我们的程序jar包转换为JobGraph,然后利用ClusterDescriptor的实现类去完成JobGraph的提交
ClusterDescriptor
- StandaloneClusterDescriptor
- YarnClusterDescriptor
3.1 项目创建
项目模块目前只需要两个,flink-deployer-common和flink-deployer-1.13
3.1.1 模块解释
common模块包含一些公共类
deployer模块是真正的针对不同部署模式的提交程序
3.1.2 common模块引入的依赖
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
3.1.3 deployer模块引入的依赖
<properties>
<scala.binary.version>2.12</scala.binary.version>
<flink.version>1.13.0</flink.version>
</properties>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--引入flink-yarn-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-yarn_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
3.2 代码编写
下面以模块名加类名来进行表示
3.2.1 common#FlinkDeployer
public interface FlinkDeployer {
/**
* 提交Flink作业
* @return
*/
DeployResponse deployJob(DeployParam deployParam) throws Exception;
}
3.2.2 common#DeployParam
部署参数
@Data
public class DeployJobParam {
private String sql;
private String jobType;
/** 集群的地址 */
private String clusterAddress;
private String mainAppJar;
/** 对于yarn-session模式有用 */
private String yarnApplicationId;
private String mainClassName;
private String mainClassArgs;
private String parameters;
/** 作业的第三方依赖,例如连接器,udf等,例如:/home/xcchen/bdcp/udf/flink-sql-udf-test.jar */
private List<String> dependencies;
private String deployMode;
private String flinkHome;
/** used by application mode */
private List<String> providedLibDirs;
}
3.2.3 common#DeployResponse
部署的响应结果
/** the response for deploy job */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class DeployJobResponse {
private String clusterId;
private String jobId;
private String clusterWebInterfaceUrl;
private String jobStatus;
}
3.2.4 common#AbstractFlinkDeployer
抽象父类,目的是隔离类加载器,实现多版本支持
public abstract class AbstractFlinkDeployer implements FlinkDeployer{
protected abstract class FlinkDeployeHandler {
public Object call() throws Exception{
Object result;
ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader();
// 临时更改 ClassLoader
Thread.currentThread().setContextClassLoader(AbstractFlinkDeployer.class.getClassLoader());
result = handle();
// 还原为之前的 ClassLoader
Thread.currentThread().setContextClassLoader(oldClassLoader);
return result;
}
public abstract Object handle() throws Exception;
}
protected Object handle(FlinkDeployeHandler flinkDeployeHandler) throws Exception{
return flinkDeployeHandler.call();
}
}
3.2.5 deployer#FlinkDeployerBase13
这是flink1.13部署程序的基础类,后续出现的类都继承自该类,该类封装了一些通用的方法和属性
public abstract class FlinkDeployerBase13 extends AbstractFlinkDeployer {
private static final Logger logger = LoggerFactory.getLogger(FlinkDeployerBase13.class);
protected Configuration configuration;
/** FLINK根目录 */
protected String FLINK_HOME;
/** FLINK配置目录 */
protected String FLINK_CONF_DIR;
/** FLINK根目录下的lib目录 */
protected String FLINK_LIB_DIR;
/** FLINK根目录下的plugins目录 */
protected String FLINK_PLUGINS_DIR;
protected PackagedProgram program;
protected JobGraph jobGraph;
public FlinkDeployerBase13() {}
public FlinkDeployerBase13(DeployJobParam deployJobParam) {
this.FLINK_HOME = FlinkUtils.getFlinkHome(deployJobParam.getFlinkHome());
this.FLINK_CONF_DIR = FLINK_HOME + "/conf";
this.configuration = GlobalConfiguration.loadConfiguration(FLINK_CONF_DIR);
this.FLINK_LIB_DIR = FLINK_HOME + "/lib";
this.FLINK_PLUGINS_DIR = FLINK_HOME + "/plugins";
this.initConfiguration(deployJobParam);
if (!DeployModeEnum.FLINK_YARN_APPLICATION
.getName()
.equals(deployJobParam.getDeployMode())) { // application模式不需要去构建包
try {
this.program = this.buildProgram(deployJobParam);
this.jobGraph =
PackagedProgramUtils.createJobGraph(
program,
configuration,
configuration.get(CoreOptions.DEFAULT_PARALLELISM),
true);
} catch (Exception e) {
throw new RuntimeException("build program and job graph fail", e);
}
}
}
/**
* init configuration for perjob,session and application
*
* @param deployJobParam
*/
private void initConfiguration(DeployJobParam deployJobParam) {
DeployModeEnum deployMode = DeployModeEnum.getByName(deployJobParam.getDeployMode());
switch (deployMode) {
case FLINK_STANDALONE:
DeployUtils.getStandaloneConfiguration(configuration, deployJobParam);
break;
case FLINK_YARN_SESSION:
DeployUtils.getYarnSessionConfiguration(configuration, deployJobParam);
break;
case FLINK_YARN_PERJOB:
DeployUtils.getYarnPerjobConfiguration(
configuration,
deployJobParam,
FLINK_CONF_DIR,
FLINK_HOME,
FLINK_LIB_DIR,
FLINK_PLUGINS_DIR);
break;
case FLINK_YARN_APPLICATION:
DeployUtils.getApplicationConfiguration(
configuration, deployJobParam, FLINK_CONF_DIR);
break;
default:
throw new IllegalArgumentException();
}
}
/**
* @param deployJobParam
* @return
* @throws ProgramInvocationException
*/
private PackagedProgram buildProgram(DeployJobParam deployJobParam) throws Exception {
String jobType = deployJobParam.getJobType();
String[] programArgs = {};
if (deployJobParam.getMainClassArgs() != null) {
programArgs = deployJobParam.getMainClassArgs().split("\\s+");
}
String jarFilePath =
jobType.equals(JobTypeEnum.FLINK_SQL.name())
? ConstantUtils.EP_FLINK_SQL_CLIENT
: deployJobParam.getMainAppJar();
// TODO 针对不同的作业,入口类是不同的
String entryPointClass =
jobType.equals(JobTypeEnum.FLINK_SQL.name())
? ConstantUtils.EP_FLINK_SQL_CLIENT_ENTRYPOINT_CLASS
: deployJobParam.getMainClassName();
PackagedProgram.Builder builder =
PackagedProgram.newBuilder()
.setJarFile(new File(jarFilePath))
.setEntryPointClassName(entryPointClass)
.setArguments(programArgs);
if (deployJobParam.getDependencies() != null
&& deployJobParam.getDependencies().size() > 0) {
builder.setUserClassPaths(
deployJobParam.getDependencies().stream()
.map(
ele -> {
try {
return new URL("file://" + ele);
} catch (Exception e) {
throw new IllegalArgumentException(
"cant cast dependencies to user classpath", e);
}
})
.collect(Collectors.toList()));
}
logger.info("build package program success...");
return builder.build();
}
/**
* deploy job graph for flink sql client,in this case deployParam only support some needed
* config info
*
* @param jobGraph
* @param deployJobParam
* @return
*/
public abstract JobClient deployJobGraph(JobGraph jobGraph, DeployJobParam deployJobParam)
throws Exception;
protected abstract Tuple2<JobClient, ClusterClient> deployJobGraphInternal(
JobGraph jobGraph, ClassLoader userCodeClassLoader) throws Exception;
}
3.2.6 deployer#StandaloneDeployer13
提交到standalone集群的类
public class StandaloneDeployer13 extends FlinkDeployerBase13 {
public StandaloneDeployer13() {}
public StandaloneDeployer13(DeployJobParam deployJobParam) {
super(deployJobParam);
}
@Override
public DeployJobResponse deployJob(DeployJobParam deployJobParam) throws Exception {
Object jobClientObject =
this.handle(
new FlinkDeployeHandler() {
@Override
public Object handle() throws Exception {
return deployJobGraphInternal(
jobGraph, program.getUserCodeClassLoader());
}
});
return DeployUtils.tuple2ObjectToResponse(jobClientObject);
}
@Override
public JobClient deployJobGraph(JobGraph jobGraph, DeployJobParam deployJobParam)
throws Exception {
Object jobClientObject =
this.handle(
new FlinkDeployeHandler() {
@Override
public Object handle() throws Exception {
return deployJobGraphInternal(
jobGraph, getClass().getClassLoader());
}
});
return (JobClient) jobClientObject;
}
@Override
protected Tuple2<JobClient, ClusterClient> deployJobGraphInternal(
JobGraph jobGraph, ClassLoader userCodeClassLoader) throws Exception {
StandaloneClientFactory clusterClientFactory = new StandaloneClientFactory();
final StandaloneClusterDescriptor clusterDescriptor =
clusterClientFactory.createClusterDescriptor(configuration);
final StandaloneClusterId clusterId = clusterClientFactory.getClusterId(configuration);
final ClusterClientProvider<StandaloneClusterId> clusterClientProvider =
clusterDescriptor.retrieve(clusterId);
final ClusterClient<StandaloneClusterId> clusterClient =
clusterClientProvider.getClusterClient();
final CompletableFuture<JobClient> jobClientFuture =
clusterClient
.submitJob(jobGraph)
.thenApplyAsync(
FunctionUtils.uncheckedFunction(
jobId -> {
ClientUtils.waitUntilJobInitializationFinished(
() -> clusterClient.getJobStatus(jobId).get(),
() ->
clusterClient
.requestJobResult(jobId)
.get(),
userCodeClassLoader);
return jobId;
}))
.thenApplyAsync(
jobID ->
(JobClient)
new ClusterClientJobClientAdapter<>(
clusterClientProvider,
jobID,
userCodeClassLoader))
.whenCompleteAsync((ignored1, ignored2) -> clusterClient.close());
JobClient jobClient = jobClientFuture.get();
return new Tuple2<>(jobClient, clusterClient);
}
}
3.2.7 deployer#YarnDeployer13
/** 用于存储Yarn部署相关的一些公共变量 */
public abstract class YarnDeployer13 extends FlinkDeployerBase13 {
protected YarnClusterClientFactory clusterClientFactory = new YarnClusterClientFactory();
protected ClusterSpecification clusterSpecification;
protected YarnClusterDescriptor clusterDescriptor;
public YarnDeployer13() {}
public YarnDeployer13(DeployJobParam deployJobParam) {
super(deployJobParam);
try {
this.handle(
new FlinkDeployeHandler() {
@Override
public Object handle() throws Exception {
clusterSpecification =
clusterClientFactory.getClusterSpecification(configuration);
clusterDescriptor =
clusterClientFactory.createClusterDescriptor(configuration);
return null;
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
3.2.8 deployer#YarnApplicationDeployer13
提交yarn-application模式的类
public class YarnApplicationDeployer13 extends YarnDeployer13 {
public YarnApplicationDeployer13() {}
public YarnApplicationDeployer13(DeployJobParam deployJobParam) {
super(deployJobParam);
}
@Override
public DeployJobResponse deployJob(DeployJobParam deployJobParam) throws Exception {
Object clusterClientObj =
this.handle(
new FlinkDeployeHandler() {
@Override
public Object handle() throws Exception {
// 设置应用程序的主类名称和主类参数
final ApplicationConfiguration applicationConfiguration =
new ApplicationConfiguration(
deployJobParam.getMainClassArgs().split("\\s+"),
deployJobParam.getMainClassName());
final ClusterClientProvider<ApplicationId> clusterClientProvider =
clusterDescriptor.deployApplicationCluster(
clusterSpecification, applicationConfiguration);
return clusterClientProvider.getClusterClient();
}
});
ClusterClient<ApplicationId> clusterClient =
(ClusterClient<ApplicationId>) clusterClientObj;
// 这里有异步问题
List<JobStatusMessage> collect = new ArrayList<>(clusterClient.listJobs().get());
return new DeployJobResponse(
clusterClient.getClusterId().toString(),
collect.get(0).getJobId().toHexString(),
clusterClient.getWebInterfaceURL(),
collect.get(0).getJobState().toString());
}
@Override
public JobClient deployJobGraph(JobGraph jobGraph, DeployJobParam deployJobParam)
throws Exception {
throw new UnsupportedOperationException(
"yarn application mode can't deploy jobGraph,please use deployJob(DeployParam deployParam) to deploy job");
}
@Override
protected Tuple2<JobClient, ClusterClient> deployJobGraphInternal(
JobGraph jobGraph, ClassLoader userCodeClassLoader) throws Exception {
throw new UnsupportedOperationException(
"yarn application mode can't deploy jobGraph,please use deployJob(DeployParam deployParam) to deploy job");
}
}
3.2.9 deployer#YarnSessionDeployer13
提交yarn-session模式的类
public class YarnSessionDeployer13 extends YarnDeployer13 {
public YarnSessionDeployer13() {}
public YarnSessionDeployer13(DeployJobParam deployJobParam) {
super(deployJobParam);
}
@Override
public DeployJobResponse deployJob(DeployJobParam deployJobParam) throws Exception {
Object tuple2Object =
this.handle(
new FlinkDeployeHandler() {
@Override
public Object handle() throws Exception {
return deployJobGraphInternal(
jobGraph, program.getUserCodeClassLoader());
}
});
return DeployUtils.tuple2ObjectToResponse(tuple2Object);
}
@Override
public JobClient deployJobGraph(JobGraph jobGraph, DeployJobParam deployJobParam)
throws Exception {
Object jobClientObject =
this.handle(
new FlinkDeployeHandler() {
@Override
public Object handle() throws Exception {
return deployJobGraphInternal(
jobGraph, getClass().getClassLoader());
}
});
return (JobClient) jobClientObject;
}
@Override
protected Tuple2<JobClient, ClusterClient> deployJobGraphInternal(
JobGraph jobGraph, ClassLoader userCodeClassLoader) throws Exception {
final ApplicationId clusterId = clusterClientFactory.getClusterId(configuration);
final ClusterClientProvider<ApplicationId> clusterClientProvider =
clusterDescriptor.retrieve(clusterId);
final ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();
final CompletableFuture<JobClient> jobClientFuture =
clusterClient
.submitJob(jobGraph)
.thenApplyAsync(
FunctionUtils.uncheckedFunction(
jobId -> {
ClientUtils.waitUntilJobInitializationFinished(
() -> clusterClient.getJobStatus(jobId).get(),
() ->
clusterClient
.requestJobResult(jobId)
.get(),
userCodeClassLoader);
return jobId;
}))
.thenApplyAsync(
jobID ->
(JobClient)
new ClusterClientJobClientAdapter<>(
clusterClientProvider,
jobID,
userCodeClassLoader))
.whenCompleteAsync((ignored1, ignored2) -> clusterClient.close());
JobClient jobClient = jobClientFuture.get();
return new Tuple2<>(jobClient, clusterClient);
}
}
3.2.10 deployer#YarnPerjobDeployer13
提交yarn perjob模式的类
public class YarnPerjobDeployer13 extends YarnDeployer13 {
public YarnPerjobDeployer13() {}
public YarnPerjobDeployer13(DeployJobParam deployJobParam) {
super(deployJobParam);
}
@Override
public DeployJobResponse deployJob(DeployJobParam deployJobParam) throws Exception {
Object tuple2Object =
this.handle(
new FlinkDeployeHandler() {
@Override
public Object handle() throws Exception {
return deployJobGraphInternal(
jobGraph, program.getUserCodeClassLoader());
}
});
return DeployUtils.tuple2ObjectToResponse(tuple2Object);
}
@Override
public JobClient deployJobGraph(JobGraph jobGraph, DeployJobParam deployJobParam)
throws Exception {
Object jobClientObject =
this.handle(
new FlinkDeployeHandler() {
@Override
public Object handle() throws Exception {
return deployJobGraphInternal(
jobGraph, getClass().getClassLoader());
}
});
return (JobClient) jobClientObject;
}
@Override
protected Tuple2<JobClient, ClusterClient> deployJobGraphInternal(
JobGraph jobGraph, ClassLoader userCodeClassLoader) throws Exception {
final ClusterClientProvider<ApplicationId> clusterClientProvider =
clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, true);
final CompletableFuture<ClusterClientJobClientAdapter<ApplicationId>>
jobClientAdapterCompletableFuture =
CompletableFuture.completedFuture(
new ClusterClientJobClientAdapter<>(
clusterClientProvider,
jobGraph.getJobID(),
userCodeClassLoader));
final ClusterClientJobClientAdapter<ApplicationId> jobClientProvider =
jobClientAdapterCompletableFuture.get();
return new Tuple2<>(jobClientProvider, clusterClientProvider.getClusterClient());
}
}
3.2.11 deployer#DeployUtils
public class DeployUtils {
/**
* 用户配置覆盖默认配置
*
* @param configuration
* @param deployJobParam
* @return
*/
public static Configuration covertDefaultConfiguration(
Configuration configuration, DeployJobParam deployJobParam) {
configuration.set(CoreOptions.DEFAULT_PARALLELISM, 1);
// 用户配置覆盖默认配置
if (!StringUtils.isNullOrWhitespaceOnly(deployJobParam.getParameters())) {
final String[] parameters =
deployJobParam.getParameters().replaceAll("\n", "").split("\\s+");
Arrays.stream(parameters)
.map(ele -> ele.replaceAll("-D", ""))
.forEach(
ele -> {
final String[] split = ele.split("=");
configuration.setString(split[0], split[1]);
});
}
return configuration;
}
public static Configuration getYarnPerjobConfiguration(
Configuration configuration,
DeployJobParam deployJobParam,
String configurationDirectory,
String flinkHome,
String flinkLibDir,
String flinkPluginsDir) {
configuration.set(DeploymentOptions.TARGET, YarnDeploymentTarget.PER_JOB.getName());
configuration.set(DeploymentOptions.SHUTDOWN_IF_ATTACHED, true);
configuration.set(DeploymentOptionsInternal.CONF_DIR, configurationDirectory);
configuration.set(
YarnConfigOptions.FLINK_DIST_JAR,
FlinkUtils.getFlinkDistPathByFlinkHome(flinkHome));
configuration.set(
YarnConfigOptions.SHIP_FILES,
Arrays.asList(
flinkLibDir, flinkPluginsDir, deployJobParam.getDependencies().get(0)));
// 默认配置
configuration.setString(TaskManagerOptions.NUM_TASK_SLOTS.key(), "1");
return covertDefaultConfiguration(configuration, deployJobParam);
}
public static Configuration getYarnSessionConfiguration(
Configuration configuration, DeployJobParam deployJobParam) {
configuration.set(YarnConfigOptions.APPLICATION_ID, deployJobParam.getYarnApplicationId());
return covertDefaultConfiguration(configuration, deployJobParam);
}
public static Configuration getStandaloneConfiguration(
Configuration configuration, DeployJobParam deployJobParam) {
final String clusterAddress = deployJobParam.getClusterAddress();
if (clusterAddress != null
&& !"".equals(clusterAddress)
&& clusterAddress.matches("(\\d+\\.){3}\\d+:\\d+")) {
configuration.set(JobManagerOptions.ADDRESS, clusterAddress.split(":")[0]);
configuration.set(
JobManagerOptions.PORT, Integer.parseInt(clusterAddress.split(":")[1]));
}
return covertDefaultConfiguration(configuration, deployJobParam);
}
/**
* get application configuration
*
* @param configuration
* @param deployJobParam
* @return
*/
public static Configuration getApplicationConfiguration(
Configuration configuration, DeployJobParam deployJobParam, String flinkConfDir) {
configuration.set(DeploymentOptions.TARGET, YarnDeploymentTarget.APPLICATION.getName());
configuration.set(DeploymentOptions.SHUTDOWN_IF_ATTACHED, true);
configuration.set(DeploymentOptionsInternal.CONF_DIR, flinkConfDir);
// 用户应用程序jar不提前上传到hdfs上,而是随着任务的生命周期进行删除
// TODO 对于SQL作业应该提前上传ep-flink-sql-client到远程hdfs上,所以这里需要判断是否为SQL作业
configuration.set(
PipelineOptions.JARS, Collections.singletonList(deployJobParam.getMainAppJar()));
// 配置lib目录和plugin目录到provided.lib.dir下,perjob模式不能设置
// TODO 这里的目录不应该写死
configuration.set(
YarnConfigOptions.PROVIDED_LIB_DIRS,
Arrays.asList(
"hdfs://localhost:9820/flink/dist/1.13.0/lib",
"hdfs://localhost:9820/flink/dist/1.13.0/plugins",
"hdfs://localhost:9820/flink/dist/1.13.0/udf"));
// 默认配置
configuration.setString(TaskManagerOptions.NUM_TASK_SLOTS.key(), "1");
return DeployUtils.covertDefaultConfiguration(configuration, deployJobParam);
}
public static DeployJobResponse tuple2ToResponse(Tuple2<JobClient, ClusterClient> tuple2) {
try {
return new DeployJobResponse(
tuple2.f1.getClusterId().toString(),
tuple2.f0.getJobID().toString(),
tuple2.f1.getWebInterfaceURL(),
tuple2.f0.getJobStatus().get().toString());
} catch (Exception e) {
throw new RuntimeException("tuple to deployResponse fail.", e);
}
}
public static DeployJobResponse tuple2ObjectToResponse(Object tupleObject) {
if (!(tupleObject instanceof Tuple2)) {
throw new RuntimeException(
"the input only used for method:deployJobGraphInternal's return value");
}
Tuple2<JobClient, ClusterClient> tuple2 = (Tuple2<JobClient, ClusterClient>) tupleObject;
return tuple2ToResponse(tuple2);
}
}
3.3 测试样例
3.3.1 环境准备
- 需要一个flink1.13版本的客户端,如果是standalone模式请先启动集群
- 用户的应用程序jar包
- 如果有相关的依赖,例如udf,请使用
setDependencies()方法设置 - 在idea中为测试程序设置环境变量(或者通过
setFlinkHome()方法设置):例如FLINK_HOME=/home/xcchen/software/flink/flink-1.13.0 - 如果要提交到yarn,不要忘了设置
HADOOP_HOME和HADOOP_CONF_DIR哦 - 也可以通过-Dflink.hadoop 高级参数去指定hdfs地址和yarn地址,从而不配置
HADOOP_HOME
3.3.2 standalone模式提交测试
public class StandaloneDeployer13Test {
@Test
public void deploy() throws Exception{
FlinkDeployer flinkDeployer = new StandaloneDeployer13();
DeployParam deployParam = new DeployParam();
deployParam.setJobType(JobTypeEnum.FLINK_JAR.name());
deployParam.setMainClassName("top.yangc.flink.sql.connector.DataGenTest");
deployParam.setMainClassArgs("");
deployParam.setClusterAddress("192.168.58.11:8081");
deployParam.setMainAppJar("/home/xcchen/software/flink/flink-1.13.0/examples/flink-sql-example-1.0.0.jar");
deployParam.setParameters("-Dtaskmanager.numberOfTaskSlots=2 -Dparallelism.default=1");
// deployParam.setDependencies(Collections.singletonList("file:///home/xcchen/software/flink/flink-1.13.0/examples/flink-udf-example-1.0.0.jar"));
flinkDeployer.deployJob(deployParam);
}
}
3.3.3 perjob模式提交测试
public class YarnPerjobDeployer13Test {
@Test
public void deploy() throws Exception{
FlinkDeployer flinkDeployer = new YarnPerjobDeployer13();
DeployParam deployParam = new DeployParam();
deployParam.setJobType(JobTypeEnum.FLINK_JAR.name());
deployParam.setMainClassName("top.yangc.flink.sql.connector.UdfDependencyTest");
deployParam.setMainClassArgs("");
deployParam.setMainAppJar("/home/xcchen/software/flink/flink-1.13.0/examples/flink-sql-example-1.0.0.jar");
deployParam.setParameters("-Dtaskmanager.numberOfTaskSlots=2 -Dparallelism.default=2");
//连接远程hdfs集群
// System.setProperty("HADOOP_USER_NAME","root");
// deployParam.setParameters("-Dtaskmanager.numberOfTaskSlots=2 -Dparallelism.default=2 -Dflink.hadoop.fs.defaultFS=hdfs://192.168.58.11:9820 -Dflink.yarn.resourcemanager.hostname=192.168.58.11");
deployParam.setDependencies(Collections.singletonList("file:///home/xcchen/software/flink/flink-1.13.0/examples/flink-udf-example-1.0.0.jar"));
flinkDeployer.deployJob(deployParam);
}
}
3 思考
如何让提交程序去兼容多版本呢?例如计算平台不应该仅仅考虑支持flink 1.13版本的程序,还需要考虑支持其他的版本?
如何将sql转换为JobGraph同时对接我们的部署程序,实现一个类sql-client的功能呢?
这些我们下次再聊,有问题的也可以私信我~
参考资料
streamx开源项目的提交模块,项目链接:http://www.streamxhub.com/
感谢上述项目的参考