flink 启动Job加载外部jar都有哪些方法?

news/2024/9/20 15:16:40

flink 启动Job加载外部jar都有哪些方法
在 Apache Flink 版本中,启动 Job 时加载外部 Jar 包有几种不同的方法。这些方法允许用户引入自定义的 UDF(用户定义函数)或其他依赖项。以下是几种常见的方法:

1.使用 flink run 命令直接启动
你可以通过命令行工具 flink run 来指定你的 Job 的主类以及外部 Jar 包的位置。例如:

flink run -c com.example.YourMainClass /path/to/your-job.jar -d --classpaths /path/to/external-jar.jar
这里的 -c 参数指定了 Job 的入口类,-d 表示在后台运行 Job,–classpaths 参数用来指定外部 Jar 包的位置。
2. 通过 Flink Web UI 提交 Job
如果使用的是 Flink 的 Web UI,可以直接在界面上选择你要提交的 Jar 包。在 Flink 的 Web UI 页面上,找到 “Submit New Job”,然后选择要上传运行的 Jar 包,并进行任务配置。
3. 使用 Flink 的 YARN Session 模式
如果你是在 YARN 上使用 Flink 的 Session 模式,那么可以在提交 Job 时通过配置文件或者命令行参数指定额外的 ClassPath。例如,在 flink-conf.yaml 中添加额外的 ClassPath:
`
yarn.application.classpaths: file:///path/to/external-jar.jar

`
4.使用 flink yarn cluster 或 flink yarn session 命令
当使用 Flink 的 YARN 集群模式时,也可以通过命令行参数来指定外部 Jar 包的位置:

flink yarn cluster --classpaths /path/to/external-jar.jar

flink yarn session --classpaths /path/to/external-jar.jar
5.将 Jar 包打包进主 Job Jar 包中
另一种方法是将所有的依赖 Jar 包都打成一个单独的 Jar 包,然后将这个 Jar 包作为主 Job 的一部分。这样就不需要在运行时显式地指定外部 Jar 包了。
注意事项
确保外部 Jar 包与你的 Flink 版本兼容。
如果使用的是 YARN 模式,确保外部 Jar 包能够正确地分发到各个 TaskManager 上。
如果遇到内存问题,可能需要调整 YARN 或 Flink 的配置,例如增加可用的内存或者调整并行度。
不知道各位是否发现,前面几种方法,都一次指定一个jar。如果某个job依赖多个jar,比如20,30个jar ,通过-C参数一个个指定。不太现实。
第5种方法,虽然可以将多个jar打成一个jar,但是我尝试过,多个jar打包成一个jar了,但是运行时,死活找不到类。下面我介绍第6种方法
6. 修改flink 源码 ,增加支持传入jar目录参数。
这样提供一个jar目录参数,不管有多少jar都放到这个目录。一个命令参数搞定,然后增加的功能,循环从jar目录装载jar到类路径中。这样就比较省事,另外就是不会跟系统或者其他job发生jar冲突。非常棒。
下面以flink 1.13版本为例,
先从git下载,url: https://github.com/apache/flink.git
说明怎么修改代码,一共修改了两个文件,
分别是CliFrontendParser.java,ProgramOptions.java
它们在flink-clients,但是运行环境它在flink-dist.jar中,修改完后,需要替换flink-dist.jar中对应这两个类
CliFrontendParser.java
备注//新增地方就是修改地方

点击查看CliFrontendParser代码
/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.flink.client.cli;import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;import javax.annotation.Nullable;import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;/*** A simple command line parser (based on Apache Commons CLI) that extracts command line options.*/
public class CliFrontendParser {static final Option HELP_OPTION =new Option("h","help",false,"Show the help message for the CLI Frontend or the action.");static final Option JAR_OPTION = new Option("j", "jarfile", true, "Flink program JAR file.");static final Option CLASS_OPTION =new Option("c","class",true,"Class with the program entry point (\"main()\" method). Only needed if the "+ "JAR file does not specify the class in its manifest.");static final Option CLASSPATH_OPTION =new Option("C","classpath",true,"Adds a URL to each user code "+ "classloader  on all nodes in the cluster. The paths must specify a protocol (e.g. file://) and be "+ "accessible on all nodes (e.g. by means of a NFS share). You can use this option multiple "+ "times for specifying more than one URL. The protocol must be supported by the "+ "{@link java.net.URLClassLoader}.");public static final Option PARALLELISM_OPTION =new Option("p","parallelism",true,"The parallelism with which to run the program. Optional flag to override the default value "+ "specified in the configuration.");public static final Option DETACHED_OPTION =new Option("d", "detached", false, "If present, runs " + "the job in detached mode");public static final Option SHUTDOWN_IF_ATTACHED_OPTION =new Option("sae","shutdownOnAttachedExit",false,"If the job is submitted in attached mode, perform a best-effort cluster shutdown "+ "when the CLI is terminated abruptly, e.g., in response to a user interrupt, such as typing Ctrl + C.");
//**********新增地方*****************static final Option JARDIR_OPTION =new Option("jd","jardir",true,"Adds a jar dir to each user code "+ "classloader  on all nodes in the cluster. The paths must specify exists and be "+ "accessible on all nodes (e.g. by means of a NFS share). You can use this option multiple "+ "times for specifying more than one URL. ");
//**********新增地方*****************                            /*** @deprecated use non-prefixed variant {@link #DETACHED_OPTION} for both YARN and non-YARN*     deployments*/@Deprecatedpublic static final Option YARN_DETACHED_OPTION =new Option("yd","yarndetached",false,"If present, runs "+ "the job in detached mode (deprecated; use non-YARN specific option instead)");public static final Option ARGS_OPTION =new Option("a","arguments",true,"Program arguments. Arguments can also be added without -a, simply as trailing parameters.");public static final Option ADDRESS_OPTION =new Option("m","jobmanager",true,"Address of the JobManager to which to connect. "+ "Use this flag to connect to a different JobManager than the one specified in the configuration.");public static final Option SAVEPOINT_PATH_OPTION =new Option("s","fromSavepoint",true,"Path to a savepoint to restore the job from (for example hdfs:///flink/savepoint-1537).");public static final Option SAVEPOINT_ALLOW_NON_RESTORED_OPTION =new Option("n","allowNonRestoredState",false,"Allow to skip savepoint state that cannot be restored. "+ "You need to allow this if you removed an operator from your "+ "program that was part of the program when the savepoint was triggered.");static final Option SAVEPOINT_DISPOSE_OPTION =new Option("d", "dispose", true, "Path of savepoint to dispose.");// list specific optionsstatic final Option RUNNING_OPTION =new Option("r", "running", false, "Show only running programs and their JobIDs");static final Option SCHEDULED_OPTION =new Option("s", "scheduled", false, "Show only scheduled programs and their JobIDs");static final Option ALL_OPTION =new Option("a", "all", false, "Show all programs and their JobIDs");static final Option ZOOKEEPER_NAMESPACE_OPTION =new Option("z","zookeeperNamespace",true,"Namespace to create the Zookeeper sub-paths for high availability mode");static final Option CANCEL_WITH_SAVEPOINT_OPTION =new Option("s","withSavepoint",true,"**DEPRECATION WARNING**: "+ "Cancelling a job with savepoint is deprecated. Use \"stop\" instead. \n Trigger"+ " savepoint and cancel job. The target directory is optional. If no directory is "+ "specified, the configured default directory ("+ CheckpointingOptions.SAVEPOINT_DIRECTORY.key()+ ") is used.");public static final Option STOP_WITH_SAVEPOINT_PATH =new Option("p","savepointPath",true,"Path to the savepoint (for example hdfs:///flink/savepoint-1537). "+ "If no directory is specified, the configured default will be used (\""+ CheckpointingOptions.SAVEPOINT_DIRECTORY.key()+ "\").");public static final Option STOP_AND_DRAIN =new Option("d","drain",false,"Send MAX_WATERMARK before taking the savepoint and stopping the pipelne.");public static final Option PY_OPTION =new Option("py","python",true,"Python script with the program entry point. "+ "The dependent resources can be configured with the `--pyFiles` option.");public static final Option PYFILES_OPTION =new Option("pyfs","pyFiles",true,"Attach custom files for job. The standard resource file suffixes such as .py/.egg/.zip/.whl or directory are all supported. "+ "These files will be added to the PYTHONPATH of both the local client and the remote python UDF worker. "+ "Files suffixed with .zip will be extracted and added to PYTHONPATH. "+ "Comma (',') could be used as the separator to specify multiple files "+ "(e.g., --pyFiles file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip).");public static final Option PYMODULE_OPTION =new Option("pym","pyModule",true,"Python module with the program entry point. "+ "This option must be used in conjunction with `--pyFiles`.");public static final Option PYREQUIREMENTS_OPTION =new Option("pyreq","pyRequirements",true,"Specify a requirements.txt file which defines the third-party dependencies. "+ "These dependencies will be installed and added to the PYTHONPATH of the python UDF worker. "+ "A directory which contains the installation packages of these dependencies could be specified "+ "optionally. Use '#' as the separator if the optional parameter exists "+ "(e.g., --pyRequirements file:///tmp/requirements.txt#file:///tmp/cached_dir).");public static final Option PYARCHIVE_OPTION =new Option("pyarch","pyArchives",true,"Add python archive files for job. The archive files will be extracted to the working directory "+ "of python UDF worker. Currently only zip-format is supported. For each archive file, a target directory "+ "be specified. If the target directory name is specified, the archive file will be extracted to a "+ "directory with the specified name. Otherwise, the archive file will be extracted to a "+ "directory with the same name of the archive file. The files uploaded via this option are accessible "+ "via relative path. '#' could be used as the separator of the archive file path and the target directory "+ "name. Comma (',') could be used as the separator to specify multiple archive files. "+ "This option can be used to upload the virtual environment, the data files used in Python UDF "+ "(e.g., --pyArchives file:///tmp/py37.zip,file:///tmp/data.zip#data --pyExecutable "+ "py37.zip/py37/bin/python). The data files could be accessed in Python UDF, e.g.: "+ "f = open('data/data.txt', 'r').");public static final Option PYEXEC_OPTION =new Option("pyexec","pyExecutable",true,"Specify the path of the python interpreter used to execute the python UDF worker "+ "(e.g.: --pyExecutable /usr/local/bin/python3). "+ "The python UDF worker depends on Python 3.6+, Apache Beam (version == 2.27.0), "+ "Pip (version >= 7.1.0) and SetupTools (version >= 37.0.0). "+ "Please ensure that the specified environment meets the above requirements.");static {HELP_OPTION.setRequired(false);JAR_OPTION.setRequired(false);JAR_OPTION.setArgName("jarfile");CLASS_OPTION.setRequired(false);CLASS_OPTION.setArgName("classname");CLASSPATH_OPTION.setRequired(false);CLASSPATH_OPTION.setArgName("url");ADDRESS_OPTION.setRequired(false);ADDRESS_OPTION.setArgName("host:port");PARALLELISM_OPTION.setRequired(false);PARALLELISM_OPTION.setArgName("parallelism");DETACHED_OPTION.setRequired(false);SHUTDOWN_IF_ATTACHED_OPTION.setRequired(false);YARN_DETACHED_OPTION.setRequired(false);JARDIR_OPTION.setRequired(false);//新增地方ARGS_OPTION.setRequired(false);ARGS_OPTION.setArgName("programArgs");ARGS_OPTION.setArgs(Option.UNLIMITED_VALUES);RUNNING_OPTION.setRequired(false);SCHEDULED_OPTION.setRequired(false);SAVEPOINT_PATH_OPTION.setRequired(false);SAVEPOINT_PATH_OPTION.setArgName("savepointPath");SAVEPOINT_ALLOW_NON_RESTORED_OPTION.setRequired(false);ZOOKEEPER_NAMESPACE_OPTION.setRequired(false);ZOOKEEPER_NAMESPACE_OPTION.setArgName("zookeeperNamespace");CANCEL_WITH_SAVEPOINT_OPTION.setRequired(false);CANCEL_WITH_SAVEPOINT_OPTION.setArgName("targetDirectory");CANCEL_WITH_SAVEPOINT_OPTION.setOptionalArg(true);STOP_WITH_SAVEPOINT_PATH.setRequired(false);STOP_WITH_SAVEPOINT_PATH.setArgName("savepointPath");STOP_WITH_SAVEPOINT_PATH.setOptionalArg(true);STOP_AND_DRAIN.setRequired(false);PY_OPTION.setRequired(false);PY_OPTION.setArgName("pythonFile");PYFILES_OPTION.setRequired(false);PYFILES_OPTION.setArgName("pythonFiles");PYMODULE_OPTION.setRequired(false);PYMODULE_OPTION.setArgName("pythonModule");PYREQUIREMENTS_OPTION.setRequired(false);PYARCHIVE_OPTION.setRequired(false);PYEXEC_OPTION.setRequired(false);}static final Options RUN_OPTIONS = getRunCommandOptions();private static Options buildGeneralOptions(Options options) {options.addOption(HELP_OPTION);// backwards compatibility: ignore verbose flag (-v)options.addOption(new Option("v", "verbose", false, "This option is deprecated."));return options;}private static Options getProgramSpecificOptions(Options options) {options.addOption(JAR_OPTION);options.addOption(CLASS_OPTION);options.addOption(CLASSPATH_OPTION);options.addOption(PARALLELISM_OPTION);options.addOption(ARGS_OPTION);options.addOption(DETACHED_OPTION);options.addOption(SHUTDOWN_IF_ATTACHED_OPTION);options.addOption(YARN_DETACHED_OPTION);options.addOption(PY_OPTION);options.addOption(PYFILES_OPTION);options.addOption(PYMODULE_OPTION);options.addOption(PYREQUIREMENTS_OPTION);options.addOption(PYARCHIVE_OPTION);options.addOption(PYEXEC_OPTION);options.addOption(JARDIR_OPTION);  //新增地方return options;}private static Options getProgramSpecificOptionsWithoutDeprecatedOptions(Options options) {options.addOption(CLASS_OPTION);options.addOption(CLASSPATH_OPTION);options.addOption(PARALLELISM_OPTION);options.addOption(DETACHED_OPTION);options.addOption(SHUTDOWN_IF_ATTACHED_OPTION);options.addOption(PY_OPTION);options.addOption(PYFILES_OPTION);options.addOption(PYMODULE_OPTION);options.addOption(PYREQUIREMENTS_OPTION);options.addOption(PYARCHIVE_OPTION);options.addOption(PYEXEC_OPTION);options.addOption(JARDIR_OPTION); //新增地方return options;}public static Options getRunCommandOptions() {Options options = buildGeneralOptions(new Options());options = getProgramSpecificOptions(options);options.addOption(SAVEPOINT_PATH_OPTION);return options.addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION);}static Options getInfoCommandOptions() {Options options = buildGeneralOptions(new Options());return getProgramSpecificOptions(options);}static Options getListCommandOptions() {Options options = buildGeneralOptions(new Options());options.addOption(ALL_OPTION);options.addOption(RUNNING_OPTION);return options.addOption(SCHEDULED_OPTION);}static Options getCancelCommandOptions() {Options options = buildGeneralOptions(new Options());return options.addOption(CANCEL_WITH_SAVEPOINT_OPTION);}static Options getStopCommandOptions() {return buildGeneralOptions(new Options()).addOption(STOP_WITH_SAVEPOINT_PATH).addOption(STOP_AND_DRAIN);}static Options getSavepointCommandOptions() {Options options = buildGeneralOptions(new Options());options.addOption(SAVEPOINT_DISPOSE_OPTION);return options.addOption(JAR_OPTION);}// --------------------------------------------------------------------------------------------//  Help// --------------------------------------------------------------------------------------------private static Options getRunOptionsWithoutDeprecatedOptions(Options options) {Options o = getProgramSpecificOptionsWithoutDeprecatedOptions(options);o.addOption(SAVEPOINT_PATH_OPTION);return o.addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION);}private static Options getInfoOptionsWithoutDeprecatedOptions(Options options) {options.addOption(CLASS_OPTION);options.addOption(PARALLELISM_OPTION);return options;}private static Options getListOptionsWithoutDeprecatedOptions(Options options) {options.addOption(RUNNING_OPTION);options.addOption(ALL_OPTION);options.addOption(SCHEDULED_OPTION);return options;}private static Options getCancelOptionsWithoutDeprecatedOptions(Options options) {return options.addOption(CANCEL_WITH_SAVEPOINT_OPTION);}private static Options getStopOptionsWithoutDeprecatedOptions(Options options) {return options.addOption(STOP_WITH_SAVEPOINT_PATH).addOption(STOP_AND_DRAIN);}private static Options getSavepointOptionsWithoutDeprecatedOptions(Options options) {options.addOption(SAVEPOINT_DISPOSE_OPTION);options.addOption(JAR_OPTION);return options;}/** Prints the help for the client. */public static void printHelp(Collection<CustomCommandLine> customCommandLines) {System.out.println("./flink <ACTION> [OPTIONS] [ARGUMENTS]");System.out.println();System.out.println("The following actions are available:");printHelpForRun(customCommandLines);printHelpForRunApplication(customCommandLines);printHelpForInfo();printHelpForList(customCommandLines);printHelpForStop(customCommandLines);printHelpForCancel(customCommandLines);printHelpForSavepoint(customCommandLines);System.out.println();}public static void printHelpForRun(Collection<CustomCommandLine> customCommandLines) {HelpFormatter formatter = new HelpFormatter();formatter.setLeftPadding(5);formatter.setWidth(80);System.out.println("\nAction \"run\" compiles and runs a program.");System.out.println("\n  Syntax: run [OPTIONS] <jar-file> <arguments>");formatter.setSyntaxPrefix("  \"run\" action options:");formatter.printHelp(" ", getRunOptionsWithoutDeprecatedOptions(new Options()));printCustomCliOptions(customCommandLines, formatter, true);System.out.println();}public static void printHelpForRunApplication(Collection<CustomCommandLine> customCommandLines) {HelpFormatter formatter = new HelpFormatter();formatter.setLeftPadding(5);formatter.setWidth(80);System.out.println("\nAction \"run-application\" runs an application in Application Mode.");System.out.println("\n  Syntax: run-application [OPTIONS] <jar-file> <arguments>");formatter.setSyntaxPrefix("  \"run-application\" action options:");// Only GenericCLI works with application mode, the other CLIs will be phased out// in the futureList<CustomCommandLine> filteredCommandLines =customCommandLines.stream().filter((cli) -> cli instanceof GenericCLI).collect(Collectors.toList());printCustomCliOptions(filteredCommandLines, formatter, true);System.out.println();}public static void printHelpForInfo() {HelpFormatter formatter = new HelpFormatter();formatter.setLeftPadding(5);formatter.setWidth(80);System.out.println("\nAction \"info\" shows the optimized execution plan of the program (JSON).");System.out.println("\n  Syntax: info [OPTIONS] <jar-file> <arguments>");formatter.setSyntaxPrefix("  \"info\" action options:");formatter.printHelp(" ", getInfoOptionsWithoutDeprecatedOptions(new Options()));System.out.println();}public static void printHelpForList(Collection<CustomCommandLine> customCommandLines) {HelpFormatter formatter = new HelpFormatter();formatter.setLeftPadding(5);formatter.setWidth(80);System.out.println("\nAction \"list\" lists running and scheduled programs.");System.out.println("\n  Syntax: list [OPTIONS]");formatter.setSyntaxPrefix("  \"list\" action options:");formatter.printHelp(" ", getListOptionsWithoutDeprecatedOptions(new Options()));printCustomCliOptions(customCommandLines, formatter, false);System.out.println();}public static void printHelpForStop(Collection<CustomCommandLine> customCommandLines) {HelpFormatter formatter = new HelpFormatter();formatter.setLeftPadding(5);formatter.setWidth(80);System.out.println("\nAction \"stop\" stops a running program with a savepoint (streaming jobs only).");System.out.println("\n  Syntax: stop [OPTIONS] <Job ID>");formatter.setSyntaxPrefix("  \"stop\" action options:");formatter.printHelp(" ", getStopOptionsWithoutDeprecatedOptions(new Options()));printCustomCliOptions(customCommandLines, formatter, false);System.out.println();}public static void printHelpForCancel(Collection<CustomCommandLine> customCommandLines) {HelpFormatter formatter = new HelpFormatter();formatter.setLeftPadding(5);formatter.setWidth(80);System.out.println("\nAction \"cancel\" cancels a running program.");System.out.println("\n  Syntax: cancel [OPTIONS] <Job ID>");formatter.setSyntaxPrefix("  \"cancel\" action options:");formatter.printHelp(" ", getCancelOptionsWithoutDeprecatedOptions(new Options()));printCustomCliOptions(customCommandLines, formatter, false);System.out.println();}public static void printHelpForSavepoint(Collection<CustomCommandLine> customCommandLines) {HelpFormatter formatter = new HelpFormatter();formatter.setLeftPadding(5);formatter.setWidth(80);System.out.println("\nAction \"savepoint\" triggers savepoints for a running job or disposes existing ones.");System.out.println("\n  Syntax: savepoint [OPTIONS] <Job ID> [<target directory>]");formatter.setSyntaxPrefix("  \"savepoint\" action options:");formatter.printHelp(" ", getSavepointOptionsWithoutDeprecatedOptions(new Options()));printCustomCliOptions(customCommandLines, formatter, false);System.out.println();}/*** Prints custom cli options.** @param formatter The formatter to use for printing* @param runOptions True if the run options should be printed, False to print only general*     options*/private static void printCustomCliOptions(Collection<CustomCommandLine> customCommandLines,HelpFormatter formatter,boolean runOptions) {// prints options from all available command-line classesfor (CustomCommandLine cli : customCommandLines) {formatter.setSyntaxPrefix("  Options for " + cli.getId() + " mode:");Options customOpts = new Options();cli.addGeneralOptions(customOpts);if (runOptions) {cli.addRunOptions(customOpts);}formatter.printHelp(" ", customOpts);System.out.println();}}public static SavepointRestoreSettings createSavepointRestoreSettings(CommandLine commandLine) {if (commandLine.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) {String savepointPath = commandLine.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt());boolean allowNonRestoredState =commandLine.hasOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION.getOpt());return SavepointRestoreSettings.forPath(savepointPath, allowNonRestoredState);} else {return SavepointRestoreSettings.none();}}// --------------------------------------------------------------------------------------------//  Line Parsing// --------------------------------------------------------------------------------------------public static CommandLine parse(Options options, String[] args, boolean stopAtNonOptions)throws CliArgsException {final DefaultParser parser = new DefaultParser();try {return parser.parse(options, args, stopAtNonOptions);} catch (ParseException e) {throw new CliArgsException(e.getMessage());}}/*** Merges the given {@link Options} into a new Options object.** @param optionsA options to merge, can be null if none* @param optionsB options to merge, can be null if none* @return*/public static Options mergeOptions(@Nullable Options optionsA, @Nullable Options optionsB) {final Options resultOptions = new Options();if (optionsA != null) {for (Option option : optionsA.getOptions()) {resultOptions.addOption(option);}}if (optionsB != null) {for (Option option : optionsB.getOptions()) {resultOptions.addOption(option);}}return resultOptions;}
}
ProgramOptions.java 备注*//新增地方*就是修改地方
点击查看ProgramOptions代码
/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.flink.client.cli;import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;import org.apache.commons.cli.CommandLine;import java.io.File;
import java.io.FilenameFilter;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;import static org.apache.flink.client.cli.CliFrontendParser.ARGS_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.CLASSPATH_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.CLASS_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.DETACHED_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.JARDIR_OPTION;//新增地方
import static org.apache.flink.client.cli.CliFrontendParser.JAR_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.SHUTDOWN_IF_ATTACHED_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.YARN_DETACHED_OPTION;
import static org.apache.flink.client.cli.ProgramOptionsUtils.containsPythonDependencyOptions;
import static org.apache.flink.client.cli.ProgramOptionsUtils.createPythonProgramOptions;
import static org.apache.flink.client.cli.ProgramOptionsUtils.isPythonEntryPoint;/** Base class for command line options that refer to a JAR file program. */
public class ProgramOptions extends CommandLineOptions {private String jarFilePath;protected String entryPointClass;private final List<URL> classpaths;private final String[] programArgs;private final int parallelism;private final boolean detachedMode;private final boolean shutdownOnAttachedExit;private final SavepointRestoreSettings savepointSettings;protected ProgramOptions(CommandLine line) throws CliArgsException {super(line);this.entryPointClass =line.hasOption(CLASS_OPTION.getOpt())? line.getOptionValue(CLASS_OPTION.getOpt()): null;this.jarFilePath =line.hasOption(JAR_OPTION.getOpt())? line.getOptionValue(JAR_OPTION.getOpt()): null;this.programArgs = extractProgramArgs(line);List<URL> classpaths = new ArrayList<URL>();if (line.hasOption(CLASSPATH_OPTION.getOpt())) {for (String path : line.getOptionValues(CLASSPATH_OPTION.getOpt())) {try {classpaths.add(new URL(path));} catch (MalformedURLException e) {throw new CliArgsException("Bad syntax for classpath: " + path);}}}//*** 新增地方*****// load jardir all jar.if (line.hasOption(JARDIR_OPTION.getOpt())) {for (String path : line.getOptionValues(JARDIR_OPTION.getOpt())) {List<URL> jarFiles = null;try {jarFiles = loadAllJarFromPathURl(path);} catch (MalformedURLException e) {e.printStackTrace();throw new CliArgsException("Bad syntax for classpath: " + path);}// classpaths.add(new URL(path));classpaths.addAll(jarFiles);}}//*** 新增地方*****this.classpaths = classpaths;if (line.hasOption(PARALLELISM_OPTION.getOpt())) {String parString = line.getOptionValue(PARALLELISM_OPTION.getOpt());try {parallelism = Integer.parseInt(parString);if (parallelism <= 0) {throw new NumberFormatException();}} catch (NumberFormatException e) {throw new CliArgsException("The parallelism must be a positive number: " + parString);}} else {parallelism = ExecutionConfig.PARALLELISM_DEFAULT;}detachedMode =line.hasOption(DETACHED_OPTION.getOpt())|| line.hasOption(YARN_DETACHED_OPTION.getOpt());shutdownOnAttachedExit = line.hasOption(SHUTDOWN_IF_ATTACHED_OPTION.getOpt());this.savepointSettings = CliFrontendParser.createSavepointRestoreSettings(line);}//***新增地方**** startprivate List<URL> loadAllJarFromPathURl(String path) throws MalformedURLException {// 指定需要搜索的目录.List<URL> urls = new ArrayList<>();System.out.println("jar dir:" + path);// 创建File对象表示目录.File directory = new File(path);// 使用FilenameFilter过滤出以.jar结尾的文件.File[] jarFiles =directory.listFiles(new FilenameFilter() {@Overridepublic boolean accept(File dir, String name) {return name.toLowerCase().endsWith(".jar");}});System.out.println("jarFiles len:" + jarFiles.length);// 遍历找到的jar文件if (jarFiles != null) {for (File jarFile : jarFiles) {System.out.println(jarFile.getAbsolutePath());URL url = jarFile.toURI().toURL();urls.add(url);}}return urls;}//***新增地方**** endprotected String[] extractProgramArgs(CommandLine line) {String[] args =line.hasOption(ARGS_OPTION.getOpt())? line.getOptionValues(ARGS_OPTION.getOpt()): line.getArgs();if (args.length > 0 && !line.hasOption(JAR_OPTION.getOpt())) {jarFilePath = args[0];args = Arrays.copyOfRange(args, 1, args.length);}return args;}public void validate() throws CliArgsException {// Java program should be specified a JAR fileif (getJarFilePath() == null) {throw new CliArgsException("Java program should be specified a JAR file.");}}public String getJarFilePath() {return jarFilePath;}public String getEntryPointClassName() {return entryPointClass;}public List<URL> getClasspaths() {return classpaths;}public String[] getProgramArgs() {return programArgs;}public int getParallelism() {return parallelism;}public boolean getDetachedMode() {return detachedMode;}public boolean isShutdownOnAttachedExit() {return shutdownOnAttachedExit;}public SavepointRestoreSettings getSavepointRestoreSettings() {return savepointSettings;}public void applyToConfiguration(Configuration configuration) {if (getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT) {configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, getParallelism());}configuration.setBoolean(DeploymentOptions.ATTACHED, !getDetachedMode());configuration.setBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED, isShutdownOnAttachedExit());ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.CLASSPATHS, getClasspaths(), URL::toString);SavepointRestoreSettings.toConfiguration(getSavepointRestoreSettings(), configuration);}public static ProgramOptions create(CommandLine line) throws CliArgsException {if (isPythonEntryPoint(line) || containsPythonDependencyOptions(line)) {return createPythonProgramOptions(line);} else {return new ProgramOptions(line);}}
}
到底行不行呢?然后我们编写验证代码,分两部分 (1).flink job代码 这里直接复制wordcount例子,改个类名,然后调用(2)的jar中一个类 TestLoadExtJar.java
点击查看TestLoadExtJar代码
/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.flink.examples.java.testloadextjar;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;import com.test.A;/*** Implements the "WordCount" program that computes a simple word occurrence histogram over text* files.** <p>The input is a plain text file with lines separated by newline characters.** <p>Usage: <code>WordCount --input &lt;path&gt; --output &lt;path&gt;</code><br>* If no parameters are provided, the program is run with default data from {@link WordCountData}.** <p>This example shows how to:** <ul>*   <li>write a simple Flink program.*   <li>use Tuple data types.*   <li>write and use user-defined functions.* </ul>*/
public class TestLoadExtJar {// *************************************************************************//     PROGRAM// *************************************************************************public static void main(String[] args) throws Exception {final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);// set up the execution environmentfinal ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();A a = new A();a.test();// make parameters available in the web interfaceenv.getConfig().setGlobalJobParameters(params);// get input dataDataSet<String> text = null;if (params.has("input")) {// union all the inputs from text filesfor (String input : params.getMultiParameterRequired("input")) {if (text == null) {text = env.readTextFile(input);} else {text = text.union(env.readTextFile(input));}}Preconditions.checkNotNull(text, "Input DataSet should not be null.");} else {// get default test text dataSystem.out.println("Executing WordCount example with default input data set.");System.out.println("Use --input to specify file input.");text = WordCountData.getDefaultTextLineDataSet(env);}DataSet<Tuple2<String, Integer>> counts =// split up the lines in pairs (2-tuples) containing: (word,1)text.flatMap(new Tokenizer())// group by the tuple field "0" and sum up tuple field "1".groupBy(0).sum(1);// emit resultif (params.has("output")) {counts.writeAsCsv(params.get("output"), "\n", " ");// execute programenv.execute("WordCount Example");} else {System.out.println("Printing result to stdout. Use --output to specify output path.");counts.print();}}// *************************************************************************//     USER FUNCTIONS// *************************************************************************/*** Implements the string tokenizer that splits sentences into words as a user-defined* FlatMapFunction. The function takes a line (String) and splits it into multiple pairs in the* form of "(word,1)" ({@code Tuple2<String, Integer>}).*/public static final class Tokenizerimplements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) {// normalize and split the lineString[] tokens = value.toLowerCase().split("\\W+");// emit the pairsfor (String token : tokens) {if (token.length() > 0) {out.collect(new Tuple2<>(token, 1));}}}}
}

(2)模拟第三方代码
使用idea 创建一个maven项目,创建一个A类很简单,供TestLoadExtJar 调用
A.java

点击查看A代码
package com.test;public class A {public void test() {System.out.println("A");}
}
然后把TestLoadExtJar,模拟第三方代码这两个项目打包jar,假如TestLoadExtJar例子打包为TestLoadExtJar.jar 模拟第三方代码打包为testcallextjar-1.0-SNAPSHOT.jar,放在/usr/local/flink-1.13.0/extlib目录下 然后在flink 下运行,先用原来方式运行,然后看报错信息,再加jd参数,指定jar目录,看看能否解决 未加jd参数: ![](https://img2024.cnblogs.com/blog/1849636/202409/1849636-20240920151146409-50927224.png)

看到没,报类没找到
加了jd参数

然后就可以执行了
【注意】:如果不能从 https://github.com/apache/flink.git下载,可以从https://gitee.com/longsebo/flink.git下载(这个仓库了,代码已经修改)

最后
如果有问题或想沟通,可以留言

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.ryyt.cn/news/62560.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈,一经查实,立即删除!

相关文章

切片器110-112

透视表切片器抓照片场景描述 针对产品汇总分析数据透视表,制做数据看板,样例如下所示:具体操作制作产品汇总分析数据透视表插入选项卡——勾选雇员——确定——插入选项卡——插入二维柱状图——将数据透视表右侧雇员拖拽到筛选字段——在空白单元格输入INDIRECT函数和MATCH…

探索未来智能:Moonshot AI 引领AI新纪元——M1超级模型

在人工智能的快速演进中,Moonshot AI再次站在了技术创新的前沿。推出M1超级模型,这是一款旨在突破现有AI能力极限的革命性产品。M1超级模型的诞生背景 随着数据量的爆炸性增长和计算能力的提升,AI模型正变得越来越复杂和强大。M1超级模型的诞生是对这一趋势的直接响应,它代…

VSCode配置STM32HAL库开发环境

经常用MDK Keil进行STM32程序开发,但用过jetbrain全家桶等现代IDE后,对keil复古的开发界面以及代码提示不是很喜欢,因此参照网络方法配置了stm32cubemx+vscode+cmake+ninja+gcc的开发环境1. 开发工具下载下载STM32CubeMX:https://www.st.com.cn/zh/development-tools/stm32…

Kubernetes-POD生成 java dump文件

目录背景配置钩子函数验证 背景 在今天的线上业务中,某服务频繁重启。经过排查日志和事件信息,确认是由于 OOM(Out of Memory)导致服务重启。为了方便研发团队定位 OOM 的具体原因,我们决定在 OOM 发生时自动生成内存快照(heap dump),供后续分析使用。 关于 OOM 的详细…

php8:开启opcache和JIT(php 8.3.9)

一,配置文件中的项: opcache.enable=1 opcache.enable_cli=0 opcache.memory_consumption=128 opcache.max_accelerated_files=10000 opcache.revalidate_freq=240 opcache.save_comments=0 opcache.error_log=/data/logs/phplogs/opache_error.log opcache.enable=1 # …

3D游戏开发实战:QML与虚幻引擎

3D游戏开发实战:QML与虚幻引擎 使用AI技术辅助生成 QT界面美化视频课程 QT性能优化视频课程 QT原理与源码分析视频课程 QT QML C++扩展开发视频课程 免费QT视频课程 您可以看免费1000+个QT技术视频 免费QT视频课程 QT统计图和QT数据可视化视频免费看 免费QT视频课程 QT性能优化…

掌握IT资产发现的三个步骤

IT 资产生态系统非常复杂,因为资产不断变化,包括新增资产、移除过时资产或修改现有资产。在这种动态环境中,IT 资产管理者很难全面查看所有拥有的资产。 根据Gartner的预测,到 2025 年,大约 30% 的关键基础设施组织将面临安全漏洞。而标准普尔全球评级公司最近的报告指出,…

vulnhub - medium_socnet

一台不错的教学靶机medium_socnet 基本信息 kali ip:192.168.157.161 靶机 ip:192.168.157.179 主机发现与端口扫描 nmap -sT --min-rate 10000 -p- 192.168.157.179 nmap -sT -sV -sC -O -p22,5000 192.168.157.179没什么可利用信息,web页面的输入框不会执行命令 目录扫描 …