本文档从高层次描述了为 YARN 实现新应用程序的方法。
一般概念是应用程序提交客户端向 YARN ResourceManager (RM) 提交应用程序。这可以通过设置YarnClient
对象来完成。在启动YarnClient
后,客户端可以设置应用程序上下文,准备包含ApplicationMaster (AM) 的应用程序的第一个容器,然后提交应用程序。您需要提供诸如应用程序运行所需本地文件/jar 的详细信息、需要执行的实际命令(带有必要的命令行参数)、任何操作系统环境设置(可选)等信息。实际上,您需要描述需要为您的 ApplicationMaster 启动的 Unix 进程。
然后,YARN ResourceManager 将在已分配的容器上启动 ApplicationMaster(如指定)。ApplicationMaster 与 YARN 集群通信,并处理应用程序执行。它以异步方式执行操作。在应用程序启动期间,ApplicationMaster 的主要任务是:a) 与 ResourceManager 通信以协商和分配未来容器的资源,以及 b) 在容器分配后,与 YARN NodeManager(NM)通信以在它们上启动应用程序容器。任务 a) 可以通过AMRMClientAsync
对象异步执行,事件处理方法指定在AMRMClientAsync.CallbackHandler
类型的事件处理程序中。需要将事件处理程序明确地设置为客户端。任务 b) 可以通过启动一个可运行对象来执行,该对象在分配容器时启动容器。作为启动此容器的一部分,AM 必须指定具有启动信息(例如命令行规范、环境等)的ContainerLaunchContext
。
在应用程序执行期间,ApplicationMaster 通过NMClientAsync
对象与 NodeManager 通信。所有容器事件都由与NMClientAsync
关联的NMClientAsync.CallbackHandler
处理。典型的回调处理程序处理客户端启动、停止、状态更新和错误。ApplicationMaster 还通过处理AMRMClientAsync.CallbackHandler
的getProgress()
方法向 ResourceManager 报告执行进度。
除了异步客户端外,还有用于某些工作流的同步版本(AMRMClient
和NMClient
)。推荐使用异步客户端,因为(主观上)用法更简单,本文将主要介绍异步客户端。有关同步客户端的更多信息,请参阅AMRMClient
和NMClient
。
以下是重要接口
客户端<-->ResourceManager
通过使用YarnClient
对象。
ApplicationMaster<-->ResourceManager
通过使用 AMRMClientAsync
对象,通过 AMRMClientAsync.CallbackHandler
异步处理事件
ApplicationMaster<-->NodeManager
启动容器。通过使用 NMClientAsync
对象与 NodeManagers 通信,通过 NMClientAsync.CallbackHandler
处理容器事件
注意
YARN 应用程序的三个主要协议(ApplicationClientProtocol、ApplicationMasterProtocol 和 ContainerManagementProtocol)仍然保留。这三个客户端封装这三个协议,为 YARN 应用程序提供更简单的编程模型。
在极罕见的情况下,程序员可能希望直接使用这三个协议来实现应用程序。但是,请注意此类行为不再被鼓励用于一般用例。
客户端需要做的第一步是初始化并启动 YarnClient。
YarnClient yarnClient = YarnClient.createYarnClient(); yarnClient.init(conf); yarnClient.start();
客户端设置好后,客户端需要创建一个应用程序并获取其应用程序 ID。
YarnClientApplication app = yarnClient.createApplication(); GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
YarnClientApplication
对新应用程序的响应还包含有关群集的信息,例如群集的最小/最大资源功能。这是必需的,以便确保您可以正确设置将在其中启动 ApplicationMaster 的容器的规范。有关更多详细信息,请参阅 GetNewApplicationResponse
。
客户端的主要关键在于设置 ApplicationSubmissionContext
,它定义了 RM 启动 AM 所需的所有信息。客户端需要将以下内容设置到上下文中
应用程序信息:ID、名称
队列、优先级信息:将应用程序提交到的队列,为应用程序分配的优先级。
用户:提交应用程序的用户
ContainerLaunchContext
:定义容器的信息,AM 将在其中启动并运行。如前所述,ContainerLaunchContext
定义了运行应用程序所需的所有必需信息,例如本地R资源(二进制文件、jar、文件等)、E环境设置(CLASSPATH 等)、要执行的C命令和安全T令牌(RECT)。
// set the application submission context ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext(); ApplicationId appId = appContext.getApplicationId(); appContext.setKeepContainersAcrossApplicationAttempts(keepContainers); appContext.setApplicationName(appName); // set local resources for the application master // local files or archives as needed // In this scenario, the jar file for the application master is part of the local resources Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); LOG.info("Copy App Master jar from local filesystem and add to local environment"); // Copy the application master jar to the filesystem // Create a local resource to point to the destination jar path FileSystem fs = FileSystem.get(conf); addToLocalResources(fs, appMasterJar, appMasterJarPath, appId.toString(), localResources, null); // Set the log4j properties if needed if (!log4jPropFile.isEmpty()) { addToLocalResources(fs, log4jPropFile, log4jPath, appId.toString(), localResources, null); } // The shell script has to be made available on the final container(s) // where it will be executed. // To do this, we need to first copy into the filesystem that is visible // to the yarn framework. // We do not need to set this as a local resource for the application // master as the application master does not need it. String hdfsShellScriptLocation = ""; long hdfsShellScriptLen = 0; long hdfsShellScriptTimestamp = 0; if (!shellScriptPath.isEmpty()) { Path shellSrc = new Path(shellScriptPath); String shellPathSuffix = appName + "/" + appId.toString() + "/" + SCRIPT_PATH; Path shellDst = new Path(fs.getHomeDirectory(), shellPathSuffix); fs.copyFromLocalFile(false, true, shellSrc, shellDst); hdfsShellScriptLocation = shellDst.toUri().toString(); FileStatus shellFileStatus = fs.getFileStatus(shellDst); hdfsShellScriptLen = shellFileStatus.getLen(); hdfsShellScriptTimestamp = shellFileStatus.getModificationTime(); } if (!shellCommand.isEmpty()) { addToLocalResources(fs, null, shellCommandPath, appId.toString(), localResources, shellCommand); } if (shellArgs.length > 0) { addToLocalResources(fs, null, shellArgsPath, appId.toString(), localResources, StringUtils.join(shellArgs, " ")); } // Set the env variables to be setup in the env where the application master will be run LOG.info("Set the environment for the application master"); Map<String, String> env = new HashMap<String, String>(); // put location of shell script into env // using the env info, the application master will create the correct local resource for the // eventual containers that will be launched to execute the shell scripts env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation); env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp)); env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen)); // Add AppMaster.jar location to classpath // At some point we should not be required to add // the hadoop specific classpaths to the env. // It should be provided out of the box. // For now setting all required classpaths including // the classpath to "." for the application jar StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$$()) .append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*"); for (String c : conf.getStrings( YarnConfiguration.YARN_APPLICATION_CLASSPATH, YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) { classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR); classPathEnv.append(c.trim()); } classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append( "./log4j.properties"); // Set the necessary command to execute the application master Vector<CharSequence> vargs = new Vector<CharSequence>(30); // Set java executable command LOG.info("Setting up app master command"); vargs.add(Environment.JAVA_HOME.$$() + "/bin/java"); // Set Xmx based on am memory size vargs.add("-Xmx" + amMemory + "m"); // Set class name vargs.add(appMasterMainClass); // Set params for Application Master vargs.add("--container_memory " + String.valueOf(containerMemory)); vargs.add("--container_vcores " + String.valueOf(containerVirtualCores)); vargs.add("--num_containers " + String.valueOf(numContainers)); vargs.add("--priority " + String.valueOf(shellCmdPriority)); for (Map.Entry<String, String> entry : shellEnv.entrySet()) { vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue()); } if (debugFlag) { vargs.add("--debug"); } vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout"); vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr"); // Get final command StringBuilder command = new StringBuilder(); for (CharSequence str : vargs) { command.append(str).append(" "); } LOG.info("Completed setting up app master command " + command.toString()); List<String> commands = new ArrayList<String>(); commands.add(command.toString()); // Set up the container launch context for the application master ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance( localResources, env, commands, null, null, null); // Set up resource type requirements // For now, both memory and vcores are supported, so we set memory and // vcores requirements Resource capability = Resource.newInstance(amMemory, amVCores); appContext.setResource(capability); // Service data is a binary blob that can be passed to the application // Not needed in this scenario // amContainer.setServiceData(serviceData); // Setup security tokens if (UserGroupInformation.isSecurityEnabled()) { // Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce Credentials credentials = new Credentials(); String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL); if (tokenRenewer == null | | tokenRenewer.length() == 0) { throw new IOException( "Can't get Master Kerberos principal for the RM to use as renewer"); } // For now, only getting tokens for the default file-system. final Token<?> tokens[] = fs.addDelegationTokens(tokenRenewer, credentials); if (tokens != null) { for (Token<?> token : tokens) { LOG.info("Got dt for " + fs.getUri() + "; " + token); } } DataOutputBuffer dob = new DataOutputBuffer(); credentials.writeTokenStorageToStream(dob); ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); amContainer.setTokens(fsTokens); } appContext.setAMContainerSpec(amContainer);
// Set the priority for the application master Priority pri = Priority.newInstance(amPriority); appContext.setPriority(pri); // Set the queue to which this application is to be submitted in the RM appContext.setQueue(amQueue); // Submit the application to the applications manager // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest); yarnClient.submitApplication(appContext);
此时,RM 将接受应用程序,并在后台分配具有所需规范的容器,然后最终在已分配的容器上设置并启动 AM。
客户端有多种方法可以跟踪实际任务的进度。
- 它可以通过
YarnClient
的getApplicationReport()
方法与 RM 通信并请求应用程序报告。
// Get application report for the appId we are interested in ApplicationReport report = yarnClient.getApplicationReport(appId);
从 RM 收到的 ApplicationReport 包含以下内容
常规应用程序信息:应用程序 ID、提交应用程序的队列、提交应用程序的用户以及应用程序的开始时间。
ApplicationMaster 详细信息:运行 AM 的主机、它侦听客户端请求的 RPC 端口(如果有)以及客户端与 AM 通信所需的令牌。
应用程序跟踪信息:如果应用程序支持某种形式的进度跟踪,它可以通过
ApplicationReport
的getTrackingUrl()
方法设置一个跟踪 URL,客户端可以通过该 URL 监控进度。应用程序状态:ResourceManager 看到的应用程序状态可通过
ApplicationReport#getYarnApplicationState
获得。如果YarnApplicationState
设置为FINISHED
,客户端应参考ApplicationReport#getFinalApplicationStatus
以检查应用程序任务本身的实际成功/失败。如果发生故障,ApplicationReport#getDiagnostics
可能有助于进一步了解故障原因。
- 如果 ApplicationMaster 支持,客户端可以直接查询 AM 本身以获取进度更新,方法是从应用程序报告中获取 host:rpcport 信息。如果可用,它还可以使用从报告中获取的跟踪 URL。
YarnClient
支持 killApplication
调用,允许客户端通过 ResourceManager 向 AM 发送终止信号。如果经过如此设计,ApplicationMaster 也可能通过其 RPC 层支持中止调用,客户端也许能够利用该调用。yarnClient.killApplication(appId);
AM 是作业的实际所有者。它将由 RM 启动,并通过客户端提供有关其负责监督和完成的作业的所有必要信息和资源。
由于 AM 在容器中启动,该容器可能会(很可能将会)与其他容器共享物理主机,因此,鉴于多租户的性质和其他问题,它不能对可以侦听的预配置端口等内容做出任何假设。
当 AM 启动时,将通过环境向其提供多个参数。这些参数包括 AM 容器的 ContainerId
、应用程序提交时间以及有关运行 ApplicationMaster 的 NM(节点管理器)主机的详细信息。有关参数名称,请参阅 ApplicationConstants
。
与 RM 的所有交互都需要 ApplicationAttemptId
(如果发生故障,每个应用程序可能有多次尝试)。ApplicationAttemptId
可以从 AM 的容器 ID 中获取。有帮助程序 API 可以将从环境中获取的值转换为对象。
Map<String, String> envs = System.getenv(); String containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV); if (containerIdString == null) { // container id should always be set in the env by the framework throw new IllegalArgumentException( "ContainerId not set in the environment"); } ContainerId containerId = ConverterUtils.toContainerId(containerIdString); ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId();
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler(); amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener); amRMClient.init(conf); amRMClient.start(); containerListener = createNMCallbackHandler(); nmClientAsync = new NMClientAsyncImpl(containerListener); nmClientAsync.init(conf); nmClientAsync.start();
YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS
访问的配置设置定义,默认值由 YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS
定义。ApplicationMaster 需要在 ResourceManager 中注册自己才能开始发送心跳。// Register self with ResourceManager // This will start heartbeating to the RM appMasterHostname = NetUtils.getHostname(); RegisterApplicationMasterResponse response = amRMClient .registerApplicationMaster(appMasterHostname, appMasterRpcPort, appMasterTrackingUrl);
// Dump out information about cluster capability as seen by the // resource manager int maxMem = response.getMaximumResourceCapability().getMemory(); LOG.info("Max mem capability of resources in this cluster " + maxMem); int maxVCores = response.getMaximumResourceCapability().getVirtualCores(); LOG.info("Max vcores capability of resources in this cluster " + maxVCores); // A resource ask cannot exceed the max. if (containerMemory > maxMem) { LOG.info("Container memory specified above max threshold of cluster." + " Using max value." + ", specified=" + containerMemory + ", max=" + maxMem); containerMemory = maxMem; } if (containerVirtualCores > maxVCores) { LOG.info("Container virtual cores specified above max threshold of cluster." + " Using max value." + ", specified=" + containerVirtualCores + ", max=" + maxVCores); containerVirtualCores = maxVCores; } List<Container> previousAMRunningContainers = response.getContainersFromPreviousAttempts(); LOG.info("Received " + previousAMRunningContainers.size() + " previous AM's running containers on AM registration.");
List<Container> previousAMRunningContainers = response.getContainersFromPreviousAttempts(); LOG.info("Received " + previousAMRunningContainers.size() + " previous AM's running containers on AM registration."); int numTotalContainersToRequest = numTotalContainers - previousAMRunningContainers.size(); // Setup ask for containers from RM // Send request for containers to RM // Until we get our fully allocated quota, we keep on polling RM for // containers // Keep looping until all the containers are launched and shell script // executed on them ( regardless of success/failure). for (int i = 0; i < numTotalContainersToRequest; ++i) { ContainerRequest containerAsk = setupContainerAskForRM(); amRMClient.addContainerRequest(containerAsk); }
setupContainerAskForRM()
中,以下两件事需要进行一些设置
资源功能:目前,YARN 支持基于内存的资源要求,因此请求应定义需要多少内存。该值以 MB 为单位定义,并且必须小于群集的最大功能并且是最小功能的精确倍数。内存资源对应于对任务容器施加的物理内存限制。它还将支持基于计算的资源(vCore),如代码所示。
优先级:在请求容器集时,AM 可能会为每个集定义不同的优先级。例如,Map-Reduce AM 可能会为 Map 任务所需的容器分配更高的优先级,为 Reduce 任务的容器分配较低的优先级。
private ContainerRequest setupContainerAskForRM() { // setup requirements for hosts // using * as any host will do for the distributed shell app // set the priority for the request Priority pri = Priority.newInstance(requestPriority); // Set up resource type requirements // For now, memory and CPU are supported so we set memory and cpu requirements Resource capability = Resource.newInstance(containerMemory, containerVirtualCores); ContainerRequest request = new ContainerRequest(capability, null, null, pri); LOG.info("Requested container ask: " + request.toString()); return request; }
AMRMClientAsync
客户端的事件处理程序异步启动。该处理程序应实现 AMRMClientAsync.CallbackHandler
接口。
- 当有容器分配时,处理程序将设置一个线程来运行启动容器的代码。这里我们使用名称
LaunchContainerRunnable
来演示。我们将在本文的以下部分讨论LaunchContainerRunnable
类。
@Override public void onContainersAllocated(List<Container> allocatedContainers) { LOG.info("Got response from RM for container ask, allocatedCnt=" + allocatedContainers.size()); numAllocatedContainers.addAndGet(allocatedContainers.size()); for (Container allocatedContainer : allocatedContainers) { LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(allocatedContainer, containerListener); Thread launchThread = new Thread(runnableLaunchContainer); // launch and start the container on a separate thread to keep // the main thread unblocked // as all containers may not be allocated at one go. launchThreads.add(launchThread); launchThread.start(); } }
- 在心跳时,事件处理程序报告应用程序的进度。
@Override public float getProgress() { // set progress to deliver to RM on next heartbeat float progress = (float) numCompletedContainers.get() / numTotalContainers; return progress; }
ContainerLaunchContext
时遵循的过程。一旦定义了 ContainerLaunchContext
,AM 就可以通过 NMClientAsync
启动它。// Set the necessary command to execute on the allocated container Vector<CharSequence> vargs = new Vector<CharSequence>(5); // Set executable command vargs.add(shellCommand); // Set shell script path if (!scriptPath.isEmpty()) { vargs.add(Shell.WINDOWS ? ExecBatScripStringtPath : ExecShellStringPath); } // Set args for the shell command if any vargs.add(shellArgs); // Add log redirect params vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"); vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"); // Get final command StringBuilder command = new StringBuilder(); for (CharSequence str : vargs) { command.append(str).append(" "); } List<String> commands = new ArrayList<String>(); commands.add(command.toString()); // Set up ContainerLaunchContext, setting local resource, environment, // command and token for constructor. // Note for tokens: Set up tokens for the container too. Today, for normal // shell commands, the container in distribute-shell doesn't need any // tokens. We are populating them mainly for NodeManagers to be able to // download anyfiles in the distributed file-system. The tokens are // otherwise also useful in cases, for e.g., when one is running a // "hadoop dfs" command inside the distributed shell. ContainerLaunchContext ctx = ContainerLaunchContext.newInstance( localResources, shellEnv, commands, null, allTokens.duplicate(), null); containerListener.addContainer(container.getId(), container); nmClientAsync.startContainerAsync(container, ctx);
NMClientAsync
对象及其事件处理程序一起处理容器事件。包括容器启动、停止、状态更新以及发生错误。
在 ApplicationMaster 确定工作完成之后,它需要通过 AM-RM 客户端取消自己的注册,然后停止客户端。
try { amRMClient.unregisterApplicationMaster(appStatus, appMessage, null); } catch (YarnException ex) { LOG.error("Failed to unregister application", ex); } catch (IOException e) { LOG.error("Failed to unregister application", e); } amRMClient.stop();
您可以使用 LocalResource 向应用程序请求添加资源。这将导致 YARN 将资源分发到 ApplicationMaster 节点。如果资源是 tgz、zip 或 jar,您可以让 YARN 解压缩它。然后,您需要做的就是将解压缩的文件夹添加到您的类路径中。例如,在创建应用程序请求时
File packageFile = new File(packagePath); URL packageUrl = ConverterUtils.getYarnUrlFromPath( FileContext.getFileContext().makeQualified(new Path(packagePath))); packageResource.setResource(packageUrl); packageResource.setSize(packageFile.length()); packageResource.setTimestamp(packageFile.lastModified()); packageResource.setType(LocalResourceType.ARCHIVE); packageResource.setVisibility(LocalResourceVisibility.APPLICATION); resource.setMemory(memory); containerCtx.setResource(resource); containerCtx.setCommands(ImmutableList.of( "java -cp './package/*' some.class.to.Run " + "1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout " + "2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")); containerCtx.setLocalResources( Collections.singletonMap("package", packageResource)); appCtx.setApplicationId(appId); appCtx.setUser(user.getShortUserName); appCtx.setAMContainerSpec(containerCtx); yarnClient.submitApplication(appCtx);
如您所见,setLocalResources
命令采用名称到资源的映射。名称会成为应用程序 cwd 中的符号链接,因此您只需使用 ./package/* 即可引用内部工件。
注意:Java 的类路径 (cp) 参数非常敏感。确保您完全正确地获取语法。
一旦您的包分发到您的 AM,您需要在您的 AM 启动新容器时遵循相同的流程(假设您希望将资源发送到您的容器)。此代码相同。您只需确保为您的 AM 提供包路径(HDFS 或本地),以便它可以将资源 URL 与容器 ctx 一起发送。
ApplicationAttemptId
?ApplicationAttemptId
将通过环境传递给 AM,并且可以将环境中的值通过 ConverterUtils 帮助函数转换为 ApplicationAttemptId
对象。
这可能是由于高内存使用量超过了您请求的容器内存大小。导致这种情况的原因有很多。首先,查看 NodeManager 在杀死您的容器时转储的进程树。您感兴趣的两件事是物理内存和虚拟内存。如果您超出了物理内存限制,则您的应用程序正在使用过多的物理内存。如果您正在运行 Java 应用程序,则可以使用 -hprof 查看堆中占用的空间。如果您超出了虚拟内存,您可能需要增加集群范围配置变量 yarn.nodemanager.vmem-pmem-ratio
的值。
在启动容器时在命令行上设置 -Djava.library.path
可能会导致 Hadoop 使用的本机库无法正确加载,并可能导致错误。相反,使用 LD_LIBRARY_PATH
更加简洁。
YARN 分布式 shell:在设置开发环境后,在 hadoop-yarn-applications-distributedshell
项目中。