清单提交器协议

本文档描述了 清单提交器 的提交协议

背景

术语

术语 含义
提交者 一个类,可由 MR Spark 调用以执行任务和作业提交操作。
Spark 驱动程序 安排工作和编排提交操作的 Spark 进程。
作业:在 MapReduce 中 整个应用程序。在 Spark 中,这是工作链中的一个阶段
作业尝试 作业的一次尝试。MR 支持在作业部分失败时进行恢复的多项作业尝试。Spark 则表示“从头开始”。
任务 作业的一个子部分,例如处理一个文件或一个文件的一部分
任务 ID 任务的 ID,在这个作业中是唯一的。通常从 0 开始,并用于文件名(part-0000、part-001 等)。
任务尝试 (TA) 执行任务的尝试。它可能会失败,在这种情况下,MR/spark 将安排另一个尝试。
任务尝试 ID 任务尝试的唯一 ID。任务 ID + 尝试计数器。
目标目录 工作的最终目标。
作业尝试目录 作业尝试使用的临时目录。它始终位于目标目录之下,以确保它与 HDFS、其他文件系统中的存储卷等位于相同的加密区域中。
任务尝试目录 作业尝试目录下的目录,任务尝试在其中为自己的工作创建子目录
任务尝试工作目录 每个任务尝试独占的目录,在该目录下写入文件
任务提交 获取任务尝试的输出,并使其成为该“成功”任务的最终/独占结果。
作业提交 汇总所有已提交任务的所有输出,并生成作业的最终结果。

提交者的目的是确保作业的完整输出最终进入目标,即使在任务失败的情况下也是如此。

  • 完整:输出包括所有成功任务的工作。
  • 独占:不存在未成功任务的输出。
  • 并发:当多个任务并行提交时,输出与任务提交序列化时相同。这不是作业提交的要求。
  • 可中止:作业和任务可以在作业提交之前中止,之后它们的输出不可见。
  • 正确性连续性:作业一旦提交,任何失败、中止或不成功的任务的输出都不得在将来某个时间点出现。

对于 Hive 的经典分层目录结构表,作业提交需要将所有已提交任务的输出放入目录树中的正确位置。

内置于 hadoop-mapreduce-client-core 模块的提交器是 FileOutputCommitter

它有两种算法,v1 和 v2。

v1 算法对所有形式的任务失败具有弹性,但在提交最终聚合输出时很慢,因为它将每个新创建的文件重命名为表中正确的位置,一个接一个。

v2 算法不被认为是安全的,因为在各个任务提交时输出是可见的,而不是延迟到作业提交。多个任务尝试可能会将它们的数据放入输出目录树中,如果作业在作业提交之前失败/中止,则该输出是可见的。

文件输出提交器 V1 和 V2

文件输出提交器 V1 和 V2 提交算法

任务尝试执行(V1 和 V2)

$dest/__temporary/$jobAttemptId/ 中的作业尝试目录包含正在进行的作业的所有输出,每个任务尝试都分配有自己的任务尝试目录 $dest/__temporary/$jobAttemptId/__temporary/$taskAttemptId

任务的所有工作都写入任务尝试目录下。如果输出是根目录中带有文件的深度树,则任务尝试目录最终将具有类似的结构,其中包含它生成的文件及其上方的目录。

MapReduce V1 算法

v1 任务提交

任务尝试目录直接重命名到作业尝试目录下方

rename(
  $dest/__temporary/$jobAttemptId/__temporary/$taskAttemptId
  $dest/__temporary/$jobAttemptId/$taskId)

V1 作业提交

对于每个已提交的任务,其下的所有文件都重命名为目标目录,其中文件名从任务的基本目录重新映射到目标目录的基本目录。

也就是说,$dest/__temporary/$jobAttemptId/$taskId 下的所有内容都转换为 $dest 下的路径。

递归树遍历会识别出每个 TA 目录中需要重命名的路径。如果任务目录树包含一个目标目录下不存在的子目录,则会进行一些优化:在这种情况下,可以重命名整个目录。如果目录已经存在,则会对该目录进行逐个文件合并,子目录的操作再次取决于目标目录是否存在。

因此,如果每个任务的输出都进入一个单独的最终目录(例如,最终分区对单个任务是唯一的),则目录的重命名为 O(1),与子目录无关。如果输出与其他任务在同一个目录中(或更新现有目录),则重命名性能变为 O(文件)。

最后,当且仅当 mapreduce.fileoutputcommitter.marksuccessfuljobs 为 true 时,才会写入一个 0 字节的 _SUCCESS 文件。

MapReduce V2 算法

V2 任务提交

任务尝试目录下的文件会被逐个重命名到目标目录中。没有尝试优化目录重命名,因为其他任务可能同时提交其工作。因此,它为 O(文件) + 列出目录树的成本。同样:使用递归树遍历完成,而不是深度 listFiles(path, recursive=true) API,后者在 HDFS 和(尽管此处不相关)S3 上会更快。

V2 作业提交

当且仅当 mapreduce.fileoutputcommitter.marksuccessfuljobs 为 true 时,才会写入一个 0 字节的 _SUCCESS 文件。

为什么 V2 提交器不正确/不安全

如果对于任务 T1,任务尝试 1 (T1A1) 在提交之前失败,驱动程序将安排一个新的尝试“T1A2”并提交它。一切顺利。

但是:如果 T1A1 获得了提交权限,并且在提交过程中失败,则其一些输出可能已被写入目标目录。

如果随后指示尝试 T1A2 提交,则当且仅当其输出具有完全相同的文件名集时,任何已重命名的文件才会被覆盖。如果生成了不同的文件名,则输出将包含 T1A1 和 T1A2 的文件。

如果 T1A1 在提交过程中发生分区,那么作业提交器将计划另一次尝试并提交其工作。然而,如果 T1A1 仍然与文件系统保持连接,它仍然可以重命名文件。即使使用了相同的文件名,两个任务的输出也可能混杂在一起。

背景:S3A 提交器

论文 零重命名提交器,Loughran 等人,涵盖了这些提交器

它还描述了提交问题,定义了正确性,并描述了 v1 和 v2 提交器的算法,以及 S3A 提交器、IBM Stocator 提交器和我们所了解的 EMR 的 Spark 提交器的算法。

hadoop-aws JAR 包含一对提交器,“Staging”和“Magic”。这两个提交器都是同一问题的实现:安全且快速地将工作提交到 S3 对象存储。

提交器利用了 S3 提供创建文件的原子方式这一事实:PUT 请求。

文件要么存在,要么不存在。文件可以直接上传到其目标位置,并且只有在上传完成后,文件才会显现 - 覆盖任何现有副本。

对于大文件,多部分上传允许将此上传操作拆分为一系列 POST 请求

1 initiate-upload (路径 -> 上传 ID) 1. upload part(路径,上传 ID,数据[]) -> 校验和。 这可以并行化。最多可以将 10,000 个部分上传到单个对象。除最后一部分外,所有部分都必须 >= 5MB。1. complete-upload (路径,上传 ID,List<checksum>) 这会显现文件,根据校验和排序定义的块序列中的部分构建文件。

S3A 提交器的秘密在于,即使文件在任务尝试执行/提交期间上传,也可以将最终 POST 请求延迟到作业提交阶段。任务尝试需要确定每个文件的最终目标位置,将数据作为多部分操作的一部分上传,然后将完成上传所需的信息保存在一个文件中,该文件稍后由作业提交器读取并在 POST 请求中使用。

Staging 提交器

Staging 提交器基于 Netflix 的 Ryan Blue 的贡献。它依赖于 HDFS 作为一致的存储来传播 .pendingset 文件。

每个任务尝试的工作目录都在本地文件系统中,“暂存目录”。使用 v1 FileOutputCommitter 与群集 HDFS 文件系统配合,将完成上传所需的信息从任务尝试传递给作业提交程序。这可确保提交程序具有与 v1 算法相同的正确性保证。

  1. 任务提交包括将本地文件系统任务尝试工作目录下的所有文件上传到其最终目标路径,保留最终清单 POST。
  2. 将包含完成任务尝试中所有文件上传所需的所有信息的一个 JSON 文件写入与 HDFS 配合工作的包装提交程序的作业尝试目录。
  3. 作业提交:加载 HDFS 作业尝试目录中的所有清单文件,然后发出 POST 请求以完成上传。这些是并行的。

Magic 提交程序

Magic 提交程序是纯 S3A,并利用作者可以在文件系统客户端本身中进行更改这一事实。

定义了“Magic”路径,当在该路径下打开以进行写入时,会启动到最终目标目录的多方上传。当输出流close()d 时,会将零字节标记文件写入到 magic 路径,并保存包含完成上传所需的所有信息的 JSON .pending 文件。

任务提交:1. 列出每个任务尝试的 magic 目录下的所有.pending文件;1. 聚合到.pendingset文件 1. 使用任务 ID 保存到作业尝试目录。

作业提交

  1. 列出作业尝试目录中的.pendingset文件
  2. 使用 POST 请求完成上传。

Magic 提交程序绝对需要一个一致的 S3 存储 - 最初使用 S3Guard。现在 S3 是一致的,可以使用原始 S3。它不需要 HDFS 或任何其他具有rename()的文件系统。

正确性

S3A 提交程序被认为是正确的,因为

  1. 在作业提交之前,没有任何内容被具体化。
  2. 只能将一个任务尝试的清单保存到作业尝试目录。因此:仅独占提交具有相同任务 ID 的 TA 文件。
  3. 暂存提交程序使用 HDFS 将清单从 TA 传递到作业提交程序,确保 S3 的最终一致性不会导致清单丢失。
  4. 在 S3 一致之前,magic 提交程序依赖于 S3Guard 在任务和作业提交期间提供所需的列表一致性。
  5. 作者和更广泛的社区修复了在生产中出现的与提交程序相关的所有问题。

已修复的重要问题包括

  • HADOOP-15961。S3A 提交者:确保定期进行 progress() 调用。
  • HADOOP-16570。S3A 提交者遇到规模问题。
  • HADOOP-16798。S3A 提交者线程池关闭问题。
  • HADOOP-17112。S3A 提交者无法处理路径中的空白。
  • HADOOP-17318。支持具有相同应用程序尝试 ID 的并发 S3A 提交作业。
  • HADOOP-17258。MagicS3GuardCommitter 在 pendingset 已存在时失败
  • HADOOP-17414。Magic 提交者文件没有 spark 收集的已写入字节数
  • SPARK-33230 Hadoop 提交者在 spark.sql.sources.writeJobUUID 中获取唯一的作业 ID
  • SPARK-33402 在同一秒启动的作业具有重复的 MapReduce 作业 ID
  • SPARK-33739。通过 S3A Magic 提交者提交的作业不报告已写入的字节(取决于 HADOOP-17414)

在影响正确性而非规模/性能/UX 的问题中:HADOOP-17258 涉及在 TA1 任务提交完成后从故障中恢复——但未报告。SPARK-33402、SPARK-33230 和 HADOOP-17318 都相关:如果两个 spark 作业/阶段在同一秒启动,则它们具有相同的作业 ID。这导致临时提交者使用的 HDFS 目录混杂在一起。

值得注意的是:这些都是最小集成测试套件未发现的问题。

好消息是:我们现在知道了这些问题,并且更有可能避免再次复制它们。并且知道要编写哪些测试。

V1 提交者:在 Azure 中速度慢,在 GCS 中速度慢且不安全。

V1 提交者在 ABFS 中表现不佳,原因是

  1. 与 HDFS 相比,ABFS 的目录列出和文件重命名速度稍慢。
  2. v1 提交者通过列出每个已提交任务的输出,顺序提交每个任务的输出,在目标中不存在目录时移动目录,将文件合并到现有目录中。

V2 提交者在作业提交中速度快得多,因为它在任务提交中执行列表和重命名过程。由于它是非原子的,因此被认为使用起来很危险。V2 任务提交算法显示,可以通过仅使用逐个文件重命名来并行提交不同任务的输出。

V1 提交器在 GCS 上表现不佳,因为即使任务提交操作(即目录重命名)也是一个非原子的 O(files) 操作。这也意味着它是不安全的。

如果任务尝试已分区,而 Spark 驱动程序计划/提交另一个 TA,则任务目录可能包含来自第一次尝试的 1 个或多个文件。


清单提交器协议

存储的要求

此提交器支持的存储/文件系统必须

  • 具有始终如一的列表。
  • 具有原子 O(1) 文件重命名操作。

此提交器支持的存储/文件系统应

  • 成功重命名文件,即使在负载下也是如此。ABFS 不会这样做,因此在那里提供了特殊的恢复功能。
  • 实现 HADOOP-17979 的 EtagSource 接口。这用于 ABFS 重命名恢复,以及最终输出的可选验证。

此提交器支持的存储/文件系统可能

  • 具有高延迟的列表操作。
  • 在负载下拒绝带有节流响应的调用,这些调用必须在文件系统连接器中处理。

此提交器支持的存储/文件系统可能不会

  • 支持原子目录重命名。除了在清理中选择性地使用外,此功能从未使用过。
  • 支持 O(1) 目录删除。CleanupJobStage 假设情况并非如此,因此并行删除任务尝试目录。
  • 支持原子 create(Path, overwrite=false) 操作。清单通过写入包含任务尝试 ID 的路径来提交,然后重命名为其最终路径。
  • 支持快速的 listFiles(path, recursive=true) 调用。此 API 调用未使用。

FileOutputCommitter 相比,已删除的要求是

  • 原子目录重命名。
  • O(1) 目录删除。
  • 快速目录列表。
  • 隐式不存在节流行为。

HDFS 满足所有这些要求,因此不会从该提交器中受益很多,尽管它仍可在此处使用。

即使现在 S3 存储已满足此提交者的重命名要求,它也不符合一致性要求。此提交者在 S3 上使用不安全。

任务和作业 ID

每个作业都必须具有一个唯一 ID。

该实现希望 Spark 运行时具有相关补丁以确保这一点。

作业 ID 用于命名临时目录,而不是使用 _temporary/0/ 的经典递增自然编号方案。该方案源自 MapReduce,其中尝试 ID > 1 的作业尝试查找由前代提交的任务,并将该任务合并到其结果中。

此提交者针对 Spark,其中没有恢复尝试。通过在路径中使用作业 ID,如果将作业配置为在作业清理/中止中删除所有 _temporary,则可以使用同一表作为其目标执行多个作业。

任务 ID 和任务尝试 ID 将像往常一样从作业 ID 派生。

应保证写入文件的的文件名是唯一的。这是 Spark 在 ORC 和 Parquet 文件中完成的,并允许默认情况下省略对目标文件的检查。

目录结构

给定目标目录 destDir: Path

ID 为 jobID: String 且尝试次数为 jobAttemptNumber:int 的作业将使用目录

$destDir/_temporary/manifest_$jobID/$jobAttemptNumber/

作为其工作(注意:它实际上将使用 %02d 格式化该最终子目录)。

这称为作业尝试目录

在作业尝试目录下,创建一个子目录 tasks。这称为任务尝试目录。每个任务尝试都将拥有自己的子目录,其工作将保存在其中。

在作业尝试目录下,创建一个子目录 manifests。这称为y

所有已提交任务的清单都将保存到此目录,文件名格式为 $taskId-manifest.json

完整路径

$destDir/_temporary/manifest_$jobID/$jobAttemptNumber/manifests/$taskId-manifest.json

是已提交任务创建的所有文件的清单的最终位置。它被称为已提交任务的清单路径

任务尝试将使用临时文件名 $taskAttemptId-manifest.json.tmp 将其清单保存到此目录。

这称为任务尝试清单的临时路径

然后,对于作业和任务操作,定义以下路径。

let jobDirectory = "$destDir/_temporary/manifest_$jobID/"
let jobAttemptDirectory = jobDirectory + "$jobAttemptNumber/"
let manifestDirectory = jobAttemptDirectory + "manifests/"
let taskAttemptDirectory = jobAttemptDirectory + "tasks/"

并且对于每个任务尝试,还定义以下路径

let taskAttemptWorkingDirectory = taskAttemptDirectory + "$taskAttemptId"
let taskManifestPath = manifestDirectory + "$taskId-manifest.json"
let taskAttemptTemporaryManifestPath = manifestDirectory + "$taskAttemptId-manifest.json"

协议的核心算法

  1. 每个任务尝试将其所有文件写入任务尝试目录下唯一的目录树。
  2. 任务提交包括对该任务尝试的目录进行递归扫描,创建目录列表和文件列表。
  3. 这些列表被保存为 JSON 清单文件。
  4. 作业提交包括列出所有 JSON 清单文件,加载其内容,创建目标目录的聚合集,并将所有文件重命名为其最终目标。

中间清单

此 JSON 文件经过设计,其中包含(以及 IOStatistics 和一些诊断)

  1. 如果不存在,则必须创建的目标目录列表。
  2. 要重命名的文件列表(绝对源、绝对目标、文件大小)条目。

作业设置

mkdir(jobAttemptDirectory)
mkdir(manifestDirectory)
mkdir(taskAttemptDirectory)

任务设置

mkdir(taskAttemptWorkingDirectory)

任务提交

通过以下方式提交任务尝试

  1. 递归列出任务尝试工作目录以构建
  2. 文件将在其下重命名的目标目录列表及其状态(存在、未找到、文件)
  3. 要重命名的文件列表:源、目标、大小和可选的 etag。
  4. 这些列表填充 JSON 文件,即“中间清单”。
  5. 任务尝试将其文件保存到其“任务尝试清单的临时路径”。
  6. 然后,任务尝试删除“已提交任务的清单路径”,并将自己的清单文件重命名为该路径。
  7. 如果重命名成功,则任务提交被视为成功。

此时不会进行重命名:文件保留在其原始位置,直到在作业提交中重命名。

let (renames, directories) = scan(taskAttemptWorkingDirectory)
let manifest = new Manifest(renames, directories)

manifest.save(taskAttemptTemporaryManifestPath)
rename(taskAttemptTemporaryManifestPath, taskManifestPath)

任务中止/清理

delete(taskAttemptWorkingDirectory)

作业提交

作业提交包括

  1. 列出作业尝试目录中的所有清单文件。
  2. 加载每个清单文件,创建尚不存在的目录,然后重命名重命名列表中的每个文件。
  3. 可以选择保存与 S3A 提交者相同格式的 JSON _SUCCESS 文件(用于测试;使用写入和重命名进行原子保存)

作业提交阶段支持对许多任务和每个任务的许多文件进行并行化,具体来说,有一个线程池用于并行存储 IO

  1. 清单任务并行加载和处理。
  2. 删除打算创建目录的文件。
  3. 创建叶目录。
  4. 文件重命名。
  5. 在清理和中止中:删除任务尝试目录
  6. 如果为测试/调试启用了输出验证:getFileStatus 调用以比较文件长度,如果可能,则比较 etag。
let manifestPaths = list("$manifestDirectory/*-manifest.json")
let manifests = manifestPaths.map(p -> loadManifest(p))
let directoriesToCreate = merge(manifests.directories)
let filesToRename = concat(manifests.files)

directoriesToCreate.map(p -> mkdirs(p))
filesToRename.map((src, dest, etag) -> rename(src, dest, etag))

if mapreduce.fileoutputcommitter.marksuccessfuljobs then
  success.save("$destDir/_SUCCESS")

实施说明

为了帮助调试和开发,摘要可以保存到相同或不同文件系统中的某个位置;中间清单可以重命名为目标文件系统中的某个位置。

if summary.report.directory != "" then
  success.save("${summary.report.directory}/$jobID.json")
if diagnostics.manifest.directory != null then
  rename($manifestDirectory, "${diagnostics.manifest.directory}/$jobID")

即使由于任何原因导致作业提交失败,摘要报告也会被保存

作业中止/清理

作业清理通常是删除作业目录

delete(jobDirectory)

为了解决对象存储的规模问题,这应在所有任务尝试工作目录的(并行)删除之前

let taskAttemptWorkingDirectories = list("taskAttemptDirectory")
taskAttemptWorkingDirectories.map(p -> delete(p))

新协议的优点

  • 将源树列表操作推送到任务提交阶段,该阶段通常不在执行的关键路径上。
  • 将探测/创建的目录数量减少到输出目录的聚合集,并消除所有重复项。
  • 文件重命名可以并行化,其限制为配置的线程池大小和/或任何速率限制约束。
  • 为 GCS 提供原子任务提交,因为没有期望目录重命名是原子的。
  • 通过清单允许将 IOStatistics 从任务尝试传递到作业提交器。
  • 允许在作业提交器中进行一些重命名前操作,类似于 S3A “分区暂存提交器”。可以将其配置为删除计划创建的目录中的所有现有条目,或者在这些分区非空时失败。请参阅分区暂存提交器
  • 允许进行可选的预检验证检查(验证没有由不同任务创建的重复文件)。
  • 在开发/调试期间,可以查看清单、确定输出大小等。

与 v1 算法相比,新协议的缺点

  • 需要新的清单文件格式。
  • 如果任务创建许多文件和/或子目录,或者如果收集了 etag 并且这些标签的长度很大,则清单可能会变大。HTTP 协议将每个 etag 限制为 8 KiB,因此成本可能是每个文件 8 KiB。
  • 使任务提交比 v1 算法更复杂。
  • 对于各个任务创建唯一输出目录的作业,可能是次优的,因为目录重命名永远不会用于提交目录。