【flink原始碼】從Flink client 提交原始碼看第三方jar包的動態載入的解決方案

語言: CN / TW / HK

1. flink run 提交流程原始碼分析

檢視flink指令碼找到執行run命令的入口類,如下:

exec $JAVA_RUN $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@

入口類為:org.apache.flink.client.cli.CliFrontend。 最終會呼叫 parseParameters(String[] args) 方法來執行命令解析,run 命令會呼叫 run(params) 方法,如下:

switch (action) {
	case ACTION_RUN:
		run(params);
		return 0;
	case ACTION_RUN_APPLICATION:
		runApplication(params);
		return 0;
	case ACTION_LIST:
		list(params);
		return 0;
	case ACTION_INFO:
		info(params);
		return 0;
	case ACTION_CANCEL:
		cancel(params);
		return 0;
	case ACTION_STOP:
		stop(params);
		return 0;
	case ACTION_SAVEPOINT:
		savepoint(params);
		return 0;
}

run 方法程式碼如下

protected void run(String[] args) throws Exception {
		LOG.info("Running 'run' command.");

		final Options commandOptions = CliFrontendParser.getRunCommandOptions();
		final CommandLine commandLine = getCommandLine(commandOptions, args, true);

		// evaluate help flag
		if (commandLine.hasOption(HELP_OPTION.getOpt())) {
			CliFrontendParser.printHelpForRun(customCommandLines);
			return;
		}

		final CustomCommandLine activeCommandLine =
				validateAndGetActiveCommandLine(checkNotNull(commandLine));

		final ProgramOptions programOptions = ProgramOptions.create(commandLine);
        # 建立 PackagedProgram 物件
		final PackagedProgram program =
				getPackagedProgram(programOptions);
        #解析獲取相關依賴jar
		final List<URL> jobJars = program.getJobJarAndDependencies();
		# 生成最終提交配置
        final Configuration effectiveConfiguration = getEffectiveConfiguration(
				activeCommandLine, commandLine, programOptions, jobJars);

		LOG.debug("Effective executor configuration: {}", effectiveConfiguration);

		try {
			executeProgram(effectiveConfiguration, program);
		} finally {
			program.deleteExtractedLibraries();
		}
	}

run方法根據使用者傳入的引數如 main函式,jar包等資訊創建出 PackagedProgram 物件,這個物件封裝了使用者提交的資訊。從 getPackagedProgram()方法裡可以看出。

return PackagedProgram.newBuilder()
			.setJarFile(jarFile)
			.setUserClassPaths(classpaths)
			.setEntryPointClassName(entryPointClass)
			.setConfiguration(configuration)
			.setSavepointRestoreSettings(runOptions.getSavepointRestoreSettings())
			.setArguments(programArgs)
			.build();

檢視PackagedProgram構造方法,裡面會建立幾個關鍵成員變數:

  • classpaths:使用者-C 引數傳入的資訊
  • jarFile : 使用者的主函式的jar
  • extractedTempLibraries :提取出上面主jar包裡 lib/ 資料夾下的所有jar包資訊,供後面classloader使用
  • userCodeClassLoader : 使用者code的classloader,這個classloader會把classpaths,jarFile,extractedTempLibraries 都加入到classpath裡。該userCodeClassLoader預設採用child_first優先策略
  • mainClass :使用者main函式方法 該構造方法如下:
private PackagedProgram(
			@Nullable File jarFile,
			List<URL> classpaths,
			@Nullable String entryPointClassName,
			Configuration configuration,
			SavepointRestoreSettings savepointRestoreSettings,
			String... args) throws ProgramInvocationException {
		this.classpaths = checkNotNull(classpaths);
		this.savepointSettings = checkNotNull(savepointRestoreSettings);
		this.args = checkNotNull(args);

		checkArgument(jarFile != null || entryPointClassName != null, "Either the jarFile or the entryPointClassName needs to be non-null.");

		// whether the job is a Python job.
		this.isPython = isPython(entryPointClassName);

		// load the jar file if exists
		this.jarFile = loadJarFile(jarFile);

		assert this.jarFile != null || entryPointClassName != null;

		// now that we have an entry point, we can extract the nested jar files (if any)
		this.extractedTempLibraries = this.jarFile == null ? Collections.emptyList() : extractContainedLibraries(this.jarFile);
		this.userCodeClassLoader = ClientUtils.buildUserCodeClassLoader(
			getJobJarAndDependencies(),
			classpaths,
			getClass().getClassLoader(),
			configuration);

		// load the entry point class
		this.mainClass = loadMainClass(
			// if no entryPointClassName name was given, we try and look one up through the manifest
			entryPointClassName != null ? entryPointClassName : getEntryPointClassNameFromJar(this.jarFile),
			userCodeClassLoader);

		if (!hasMainMethod(mainClass)) {
			throw new ProgramInvocationException("The given program class does not have a main(String[]) method.");
		}
	}

PackagedProgram 裡 getJobJarAndDependencies 方法,該方法收集了job所有依賴的jar包,這些jar包後續會提交到叢集並加入到classpath路徑中。

PackagedProgram物件構造完成之後,便是建立最終的Configuration物件了,如下方法

final Configuration effectiveConfiguration = getEffectiveConfiguration(
				activeCommandLine, commandLine, programOptions, jobJars);

這個方法會設定兩個引數:

  • pipeline.classpaths: 值為getJobJarAndDependencies()和classpaths裡的url
  • pipeline.jars: 值為getJobJarAndDependencies()返回的jar和lib資料夾下的依賴,後續提交叢集的時候會根據這個把jar一起提交到叢集

準備好 PackagedProgram和Configuration後,就開始執行使用者程式了,

executeProgram(effectiveConfiguration, program);

詳細程式碼如下:

public static void executeProgram(
			PipelineExecutorServiceLoader executorServiceLoader,
			Configuration configuration,
			PackagedProgram program,
			boolean enforceSingleJobExecution,
			boolean suppressSysout) throws ProgramInvocationException {
		checkNotNull(executorServiceLoader);
		final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
		final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
		try {
# 設定使用者上下文使用者類載入器
Thread.currentThread().setContextClassLoader(userCodeClassLoader);

			LOG.info("Starting program (detached: {})", !configuration.getBoolean(DeploymentOptions.ATTACHED));

			ContextEnvironment.setAsContext(
				executorServiceLoader,
				configuration,
				userCodeClassLoader,
				enforceSingleJobExecution,
				suppressSysout);

			StreamContextEnvironment.setAsContext(
				executorServiceLoader,
				configuration,
				userCodeClassLoader,
				enforceSingleJobExecution,
				suppressSysout);

			try {
                # 反射呼叫戶的 main 函式執行job提交
				program.invokeInteractiveModeForExecution();
			} finally {
				ContextEnvironment.unsetAsContext();
				StreamContextEnvironment.unsetAsContext();
			}
		} finally {
			Thread.currentThread().setContextClassLoader(contextClassLoader);
		}
	}

最後總結一下整個流程:

  1. 執行flink run 命名傳入相關引數
  2. 建立PackagedProgram物件,準備相關jar,使用者類載入器,Configuration物件
  3. 通過反射呼叫使用者Main方法
  4. 構建Pipeline StreamGraph,提交job到叢集

2. 提交job時,動態載入第三方jar(如udf等)

通過分析流程我們可以發現可以有兩種方式來實現動態jar的新增

  1. 動態的 把三方jar 放入 主函式jar包的lib目錄下(可以通過jar uf 命名搞定) 因為在PackagedProgram構造方法裡會通過extractContainedLibraries()方法獲取jar lib目錄裡的所有jar,並且這些jar會一併上傳到叢集
  2. 在使用者任務main函式裡,通過反射動態設定 Configuration 物件的 pipeline.classpaths , pipeline.jars 這兩個屬性 。並且還需要把第三方jar載入到Thread.contextClassLoader裡。(可參見:https://zhuanlan.zhihu.com/p/278482766)

本人在專案中直接採用的是第一種方案,不會新增更多程式碼。