本文档描述了 清单提交器 的架构和其他实现/正确性方面的信息。
协议及其正确性在 清单提交器协议 中进行了介绍。
清单 提交器是针对 ABFS 中“实际查询”提供性能,以及在 GCS 中提供性能和正确性的工作的提交器。
此提交器使用 S3A 提交器中引入的扩展点。用户可以为 abfs://
和 gcs://
URL 声明一个新的提交器工厂。它可以通过 Hadoop MapReduce 和 Apache Spark 使用。
术语 | 含义 |
---|---|
提交器 | MR/Spark 可以调用的类,用于执行任务和作业提交操作。 |
Spark 驱动程序 | 安排工作和协调提交操作的 Spark 进程。 |
作业 | 在 MapReduce 中,是整个应用程序。在 Spark 中,这是工作链中的一个阶段 |
作业尝试 | 对作业的一次尝试。MR 支持多个作业尝试,可在部分作业失败时恢复。Spark 则表示“从头开始” |
任务 | 作业的一个子部分,例如处理一个文件或一个文件的一部分 |
任务 ID | 任务的 ID,在此作业中唯一。通常从 0 开始,并用于文件名(part-0000、part-001 等) |
任务尝试 (TA) | 执行任务的一次尝试。它可能会失败,在这种情况下,MR/spark 将安排另一次尝试。 |
任务尝试 ID | 任务尝试的唯一 ID。任务 ID + 尝试计数器。 |
目标目录 | 工作的最终目标。 |
作业尝试目录 | 作业尝试使用的临时目录。它始终位于目标目录之下,以确保它与 HDFS、其他文件系统中的存储卷等位于相同的加密区域中。 |
任务尝试目录 | (也称为“任务尝试工作目录”)。每个任务尝试独有的目录,用于写入文件 |
任务提交 | 获取任务尝试的输出,并将其作为该“成功”任务的最终/独有结果。 |
作业提交 | 聚合所有已提交任务的所有输出,并生成作业的最终结果。 |
提交者的目的是确保作业的完整输出最终进入目标,即使在任务失败的情况下也是如此。
对于 Hive 的经典分层目录结构表,作业提交要求将所有已提交任务的输出放入目录树中的正确位置。
内置于 hadoop-mapreduce-client-core
模块中的提交者是 FileOutputCommitter
。
清单提交器是 ABFS 和 GCS 存储的更高性能提交器,适用于通过许多任务在深层目录树中创建文件的作业。
它还适用于 hdfs://
,当然也适用于 file://
URL,但它经过优化,可以解决云存储中的列出和重命名性能以及限制问题。
它不会与 S3 正确配合使用,因为它依赖于原子重命名-非覆盖操作来提交清单文件。它还将遇到复制而不是移动所有生成数据的性能问题。
尽管它适用于 MapReduce,但没有处理多个作业尝试以及从之前的失败尝试中恢复。
清单文件经过设计,包含(以及 IOStatistics 和其他一些内容)
通过以下方式提交任务尝试
不进行重命名,文件保留在它们的原始位置。
目录树遍历是单线程的,然后是 O(directories)
,每个目录列出使用一个或多个分页 LIST 调用。
这很简单,对于大多数任务,扫描不在作业的关键路径上。
统计分析可能会证明将来转向并行扫描是合理的。
作业提交包括
_SUCCESS
文件,其格式与 S3A 提交器相同(用于测试;使用写入和重命名进行原子保存)作业提交阶段支持对许多任务和每个任务的许多文件进行并行化,具体而言
O(files)
,在使用 OAuth 身份验证时在 ABFS 上也是如此。可选地扫描所有祖先…如果有任何文件,则删除。
getFileStatus()
。未找到:创建目录,添加条目和所有父路径的条目已找到且为目录:添加条目和所有父路径的条目已找到且为文件:删除。然后像以前一样创建。有效地处理目录的并发创建(或删除+创建)将成为一个麻烦点;投入了一些精力来构建要创建的目录集。
文件并行重命名。
在该路径上是否存在任何内容(并将其删除)的重命名前检查将是可选的。由于 spark 为每个文件创建新的 UUID,因此不会发生这种情况,并且可以节省 HTTP 请求。
可选地扫描所有已提交的文件并验证长度,如果已知,则验证 etag。用于测试和诊断。
对于 GCS 来说,此解决方案是必要的,并且在 ABFS 上应该是有益的,因为在任务提交器中支付了列出开销。
一个关键目标是保持清单提交器独立,既不触及现有的提交器代码,也不触及 hadoop 代码库的其他部分。
它必须直接插入 MR 和 Spark,而无需任何更改,除了已经为 S3A 提交器实现的更改之外
PathOutputCommitterFactory
绑定。因此,这里有一些从其他地方复制粘贴的内容,例如 org.apache.hadoop.util.functional.TaskPool
基于 S3ACommitter 的 org.apache.hadoop.fs.s3a.commit.Tasks
。
_SUCCESS
文件必须与 S3A JSON 文件兼容。这是为了确保任何验证 S3A 提交者输出的现有测试套件都可以在不进行任何更改的情况下重新定位到由清单提交者执行的作业。
何时?建议:心跳,直到重命名最终完成。
我们希望停止整个作业提交。需要在每个任务提交者线程的迭代过程中检查某个原子布尔值“中止作业”,以遍历目录(或处理每个文件?)列出或重命名的失败需要升级为停止整个作业提交。这意味着在异步重命名操作或任务提交者线程中引发的任何 IOE 必须
commitJob()
调用结束时重新引发如果作业提交阶段使用线程池进行每个任务操作,例如加载文件,则不能将同一个线程池用于每个任务阶段内的并行操作。
由于每个 JobStage
都在任务或作业提交内按顺序执行,因此可以在各个阶段共享同一个线程池。
在当前实现中,作业提交中没有并行的“每个清单”操作,除了实际加载文件之外。创建目录和重命名文件的操作实际上是在不执行各个清单的并行处理的情况下执行的。
目录准备:合并所有清单的目录列表,然后排队创建(希望非常小的)一组唯一目录。
重命名:遍历所有清单并将它们的重命名排队到一个重命名池中。
线程池的生命周期受阶段配置的限制,该配置将限制在每个 PathOutputCommitter
方法中以进行设置、提交、中止和清理。
这避免了 S3A 提交者的线程池生命周期问题。
这是 terasorting 中的一个故障,其中许多任务各自生成许多文件;在执行之前,会构建要提交的文件的完整列表(以及每个块的 etag)并在内存中进行验证。
清单提交者假定存储在内存中的数据量较少,因为不再需要为每个正在提交的文件的每个块存储 etag。
合并所有目录列表以创建和消除重复项。
实现架构反映了 S3A 连接器的经验教训。
提交者收集其针对文件系统执行/调用的所有操作的持续时间统计信息。* 在任务提交期间收集的统计信息将保存到清单中(不包括保存和重命名该文件的时间)* 当在作业提交期间加载这些清单时,这些统计信息将合并以形成整个作业的汇总统计信息。* 保存到_SUCCESS
文件中* 以及在由mapreduce.manifest.committer.summary.report.directory
(如果设置)指定的目录中该文件的任何副本中,以进行保存。* 类org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestPrinter
可以加载并打印这些文件。
不收集查询中使用的文件系统和输入和输出流的 IO 统计信息。
通过PathOutputCommitter
API 调用ManifestCommitter
时,以下属性将添加到活动(线程)上下文中
键 | 值 |
---|---|
ji |
作业 ID |
tai |
任务尝试 ID |
st |
阶段 |
这些还全部设置在作为阶段执行的一部分执行工作的全部帮助程序线程中。
任何支持审计的存储/FS 都能够收集此数据并将其包含在日志中。
为了简化反向移植,所有审计集成都在单个类org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.AuditingIntegration
中。