清单提交器架构

本文档描述了 清单提交器 的架构和其他实现/正确性方面的信息。

协议及其正确性在 清单提交器协议 中进行了介绍。

清单 提交器是针对 ABFS 中“实际查询”提供性能,以及在 GCS 中提供性能和正确性的工作的提交器。

此提交器使用 S3A 提交器中引入的扩展点。用户可以为 abfs://gcs:// URL 声明一个新的提交器工厂。它可以通过 Hadoop MapReduce 和 Apache Spark 使用。

背景

术语

术语 含义
提交器 MR/Spark 可以调用的类,用于执行任务和作业提交操作。
Spark 驱动程序 安排工作和协调提交操作的 Spark 进程。
作业 在 MapReduce 中,是整个应用程序。在 Spark 中,这是工作链中的一个阶段
作业尝试 对作业的一次尝试。MR 支持多个作业尝试,可在部分作业失败时恢复。Spark 则表示“从头开始”
任务 作业的一个子部分,例如处理一个文件或一个文件的一部分
任务 ID 任务的 ID,在此作业中唯一。通常从 0 开始,并用于文件名(part-0000、part-001 等)
任务尝试 (TA) 执行任务的一次尝试。它可能会失败,在这种情况下,MR/spark 将安排另一次尝试。
任务尝试 ID 任务尝试的唯一 ID。任务 ID + 尝试计数器。
目标目录 工作的最终目标。
作业尝试目录 作业尝试使用的临时目录。它始终位于目标目录之下,以确保它与 HDFS、其他文件系统中的存储卷等位于相同的加密区域中。
任务尝试目录 (也称为“任务尝试工作目录”)。每个任务尝试独有的目录,用于写入文件
任务提交 获取任务尝试的输出,并将其作为该“成功”任务的最终/独有结果。
作业提交 聚合所有已提交任务的所有输出,并生成作业的最终结果。

提交者的目的是确保作业的完整输出最终进入目标,即使在任务失败的情况下也是如此。

  • 完整:输出包括所有成功任务的工作。
  • 独有:不成功的任务的输出不存在。
  • 并发:当多个任务并行提交时,输出与任务提交序列化时相同。这不是作业提交的要求。
  • 可中止:作业和任务可以在作业提交之前中止,之后它们的输出将不可见。
  • 正确性的连续性:一旦作业提交,任何失败、中止或不成功的任务的输出在未来某个时间点都不得出现。

对于 Hive 的经典分层目录结构表,作业提交要求将所有已提交任务的输出放入目录树中的正确位置。

内置于 hadoop-mapreduce-client-core 模块中的提交者是 FileOutputCommitter

清单提交者:适用于 Azure 和 Google 存储上 Spark 的高性能提交者。

清单提交器是 ABFS 和 GCS 存储的更高性能提交器,适用于通过许多任务在深层目录树中创建文件的作业。

它还适用于 hdfs://,当然也适用于 file:// URL,但它经过优化,可以解决云存储中的列出和重命名性能以及限制问题。

不会与 S3 正确配合使用,因为它依赖于原子重命名-非覆盖操作来提交清单文件。它还将遇到复制而不是移动所有生成数据的性能问题。

尽管它适用于 MapReduce,但没有处理多个作业尝试以及从之前的失败尝试中恢复。

清单

清单文件经过设计,包含(以及 IOStatistics 和其他一些内容)

  1. 如果不存在,则必须创建的目标目录列表。
  2. 要重命名的文件列表,记录为(绝对源、绝对目标、文件大小)条目。

任务提交

通过以下方式提交任务尝试

  1. 递归列出任务尝试工作目录以构建
  2. 重命名文件所在目录的列表。
  3. 要重命名的文件列表:源、目标、大小,以及可选的 etag。
  4. 使用从任务 ID 派生的文件名,将此信息保存在作业尝试目录中的清单文件中。注意:将写入临时文件,然后重命名为最终路径,以确保清单创建是原子的。

不进行重命名,文件保留在它们的原始位置。

目录树遍历是单线程的,然后是 O(directories),每个目录列出使用一个或多个分页 LIST 调用。

这很简单,对于大多数任务,扫描不在作业的关键路径上。

统计分析可能会证明将来转向并行扫描是合理的。

作业提交

作业提交包括

  1. 列出作业尝试目录中的所有清单文件。
  2. 加载每个清单文件,创建尚未存在的目录,然后重命名重命名列表中的每个文件。
  3. 保存 JSON _SUCCESS 文件,其格式与 S3A 提交器相同(用于测试;使用写入和重命名进行原子保存)

作业提交阶段支持对许多任务和每个任务的许多文件进行并行化,具体而言

  1. 清单任务在“清单处理器”线程池中加载和处理。
  2. 目录创建和文件重命名操作分别在“执行器”线程池中处理:许多重命名可以并行执行,因为它们使用最少的网络 IO。
  3. 作业清理可以并行删除任务尝试目录。这很重要,因为在 Google 云存储上目录删除是 O(files),在使用 OAuth 身份验证时在 ABFS 上也是如此。

祖先目录准备

可选地扫描所有祖先…如果有任何文件,则删除。

父目录创建

  1. 探测共享目录映射以查找目录是否存在。如果找到:操作完成。
  2. 如果映射为空,则在路径上调用 getFileStatus()。未找到:创建目录,添加条目和所有父路径的条目已找到且为目录:添加条目和所有父路径的条目已找到且为文件:删除。然后像以前一样创建。

有效地处理目录的并发创建(或删除+创建)将成为一个麻烦点;投入了一些精力来构建要创建的目录集。

文件重命名

文件并行重命名。

在该路径上是否存在任何内容(并将其删除)的重命名前检查将是可选的。由于 spark 为每个文件创建新的 UUID,因此不会发生这种情况,并且可以节省 HTTP 请求。

验证

可选地扫描所有已提交的文件并验证长度,如果已知,则验证 etag。用于测试和诊断。

优点

  • 将源树列表操作推送到任务提交阶段,该阶段通常不在执行的关键路径中
  • 对 GCS 提供原子任务提交,因为没有预期目录重命名是原子的
  • 可以从清单中的工作器传递 IOStatistics。
  • 允许一些类似于 S3A “分区暂存提交器” 的重命名前操作。可以将其配置为删除计划创建的目录中所有现有条目,或者在这些分区非空时失败。请参阅 分区暂存提交器
  • 允许可选的预飞检查(验证没有由不同任务创建的重复文件)
  • 可以在开发/调试期间查看清单、确定输出大小等。

缺点

  • 需要新的清单文件格式。
  • 可能会使任务提交变得更加复杂。

对于 GCS 来说,此解决方案是必要的,并且在 ABFS 上应该是有益的,因为在任务提交器中支付了列出开销。

实现详情

约束

一个关键目标是保持清单提交器独立,既不触及现有的提交器代码,也不触及 hadoop 代码库的其他部分。

它必须直接插入 MR 和 Spark,而无需任何更改,除了已经为 S3A 提交器实现的更改之外

  • 自包含:不得要求更改 hadoop-common 等。
  • 隔离:不得对现有提交器进行更改
  • 集成:必须通过 PathOutputCommitterFactory 绑定。

因此,这里有一些从其他地方复制粘贴的内容,例如 org.apache.hadoop.util.functional.TaskPool 基于 S3ACommitter 的 org.apache.hadoop.fs.s3a.commit.Tasks

_SUCCESS 文件必须与 S3A JSON 文件兼容。这是为了确保任何验证 S3A 提交者输出的现有测试套件都可以在不进行任何更改的情况下重新定位到由清单提交者执行的作业。

作业提交中的进度回调。

何时?建议:心跳,直到重命名最终完成。

作业提交中的错误处理和中止。

我们希望停止整个作业提交。需要在每个任务提交者线程的迭代过程中检查某个原子布尔值“中止作业”,以遍历目录(或处理每个文件?)列出或重命名的失败需要升级为停止整个作业提交。这意味着在异步重命名操作或任务提交者线程中引发的任何 IOE 必须

  1. 被捕获
  2. 存储在共享字段/变量中
  3. 触发中止
  4. commitJob() 调用结束时重新引发

避免死锁

如果作业提交阶段使用线程池进行每个任务操作,例如加载文件,则不能将同一个线程池用于每个任务阶段内的并行操作。

由于每个 JobStage 都在任务或作业提交内按顺序执行,因此可以在各个阶段共享同一个线程池。

在当前实现中,作业提交中没有并行的“每个清单”操作,除了实际加载文件之外。创建目录和重命名文件的操作实际上是在不执行各个清单的并行处理的情况下执行的。

目录准备:合并所有清单的目录列表,然后排队创建(希望非常小的)一组唯一目录。

重命名:遍历所有清单并将它们的重命名排队到一个重命名池中。

线程池生命周期

线程池的生命周期受阶段配置的限制,该配置将限制在每个 PathOutputCommitter 方法中以进行设置、提交、中止和清理。

这避免了 S3A 提交者的线程池生命周期问题。

与 S3A HADOOP-16570 相似的规模问题。

这是 terasorting 中的一个故障,其中许多任务各自生成许多文件;在执行之前,会构建要提交的文件的完整列表(以及每个块的 etag)并在内存中进行验证。

清单提交者假定存储在内存中的数据量较少,因为不再需要为每个正在提交的文件的每个块存储 etag。

在 dest dir 中重复创建目录

合并所有目录列表以创建和消除重复项。

实现架构

实现架构反映了 S3A 连接器的经验教训。

  • 将提交阶段与 MR 提交类隔离开来,因为该类具有复杂的生命周期。
  • 相反,将其分解为一系列可以在隔离中测试并链接以提供最终协议的阶段
  • 不要将 MR 数据类型(taskID 等)传递到阶段中 - 传递具有常规类型(字符串等)的配置。
  • 还传递一个存储操作回调,以方便实现伪存储。
  • 对于每个阶段:定义前提条件和后置条件、故障模式。在隔离中测试。

统计信息

提交者收集其针对文件系统执行/调用的所有操作的持续时间统计信息。* 在任务提交期间收集的统计信息将保存到清单中(不包括保存和重命名该文件的时间)* 当在作业提交期间加载这些清单时,这些统计信息将合并以形成整个作业的汇总统计信息。* 保存到_SUCCESS 文件中* 以及在由mapreduce.manifest.committer.summary.report.directory(如果设置)指定的目录中该文件的任何副本中,以进行保存。* 类org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestPrinter可以加载并打印这些文件。

不收集查询中使用的文件系统和输入和输出流的 IO 统计信息。

审计

通过PathOutputCommitter API 调用ManifestCommitter时,以下属性将添加到活动(线程)上下文中

ji 作业 ID
tai 任务尝试 ID
st 阶段

这些还全部设置在作为阶段执行的一部分执行工作的全部帮助程序线程中。

任何支持审计的存储/FS 都能够收集此数据并将其包含在日志中。

为了简化反向移植,所有审计集成都在单个类org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.AuditingIntegration中。