本文档描述了 清单提交器 的提交协议
术语 | 含义 |
---|---|
提交者 | 一个类,可由 MR Spark 调用以执行任务和作业提交操作。 |
Spark 驱动程序 | 安排工作和编排提交操作的 Spark 进程。 |
作业:在 MapReduce 中 | 整个应用程序。在 Spark 中,这是工作链中的一个阶段 |
作业尝试 | 作业的一次尝试。MR 支持在作业部分失败时进行恢复的多项作业尝试。Spark 则表示“从头开始”。 |
任务 | 作业的一个子部分,例如处理一个文件或一个文件的一部分 |
任务 ID | 任务的 ID,在这个作业中是唯一的。通常从 0 开始,并用于文件名(part-0000、part-001 等)。 |
任务尝试 (TA) | 执行任务的尝试。它可能会失败,在这种情况下,MR/spark 将安排另一个尝试。 |
任务尝试 ID | 任务尝试的唯一 ID。任务 ID + 尝试计数器。 |
目标目录 | 工作的最终目标。 |
作业尝试目录 | 作业尝试使用的临时目录。它始终位于目标目录之下,以确保它与 HDFS、其他文件系统中的存储卷等位于相同的加密区域中。 |
任务尝试目录 | 作业尝试目录下的目录,任务尝试在其中为自己的工作创建子目录 |
任务尝试工作目录 | 每个任务尝试独占的目录,在该目录下写入文件 |
任务提交 | 获取任务尝试的输出,并使其成为该“成功”任务的最终/独占结果。 |
作业提交 | 汇总所有已提交任务的所有输出,并生成作业的最终结果。 |
提交者的目的是确保作业的完整输出最终进入目标,即使在任务失败的情况下也是如此。
对于 Hive 的经典分层目录结构表,作业提交需要将所有已提交任务的输出放入目录树中的正确位置。
内置于 hadoop-mapreduce-client-core
模块的提交器是 FileOutputCommitter
。
它有两种算法,v1 和 v2。
v1 算法对所有形式的任务失败具有弹性,但在提交最终聚合输出时很慢,因为它将每个新创建的文件重命名为表中正确的位置,一个接一个。
v2 算法不被认为是安全的,因为在各个任务提交时输出是可见的,而不是延迟到作业提交。多个任务尝试可能会将它们的数据放入输出目录树中,如果作业在作业提交之前失败/中止,则该输出是可见的。
$dest/__temporary/$jobAttemptId/
中的作业尝试目录包含正在进行的作业的所有输出,每个任务尝试都分配有自己的任务尝试目录 $dest/__temporary/$jobAttemptId/__temporary/$taskAttemptId
任务的所有工作都写入任务尝试目录下。如果输出是根目录中带有文件的深度树,则任务尝试目录最终将具有类似的结构,其中包含它生成的文件及其上方的目录。
任务尝试目录直接重命名到作业尝试目录下方
rename( $dest/__temporary/$jobAttemptId/__temporary/$taskAttemptId $dest/__temporary/$jobAttemptId/$taskId)
对于每个已提交的任务,其下的所有文件都重命名为目标目录,其中文件名从任务的基本目录重新映射到目标目录的基本目录。
也就是说,$dest/__temporary/$jobAttemptId/$taskId
下的所有内容都转换为 $dest
下的路径。
递归树遍历会识别出每个 TA 目录中需要重命名的路径。如果任务目录树包含一个目标目录下不存在的子目录,则会进行一些优化:在这种情况下,可以重命名整个目录。如果目录已经存在,则会对该目录进行逐个文件合并,子目录的操作再次取决于目标目录是否存在。
因此,如果每个任务的输出都进入一个单独的最终目录(例如,最终分区对单个任务是唯一的),则目录的重命名为 O(1),与子目录无关。如果输出与其他任务在同一个目录中(或更新现有目录),则重命名性能变为 O(文件)。
最后,当且仅当 mapreduce.fileoutputcommitter.marksuccessfuljobs 为 true 时,才会写入一个 0 字节的 _SUCCESS 文件。
任务尝试目录下的文件会被逐个重命名到目标目录中。没有尝试优化目录重命名,因为其他任务可能同时提交其工作。因此,它为 O(文件) + 列出目录树的成本。同样:使用递归树遍历完成,而不是深度 listFiles(path, recursive=true) API,后者在 HDFS 和(尽管此处不相关)S3 上会更快。
当且仅当 mapreduce.fileoutputcommitter.marksuccessfuljobs 为 true 时,才会写入一个 0 字节的 _SUCCESS 文件。
如果对于任务 T1,任务尝试 1 (T1A1) 在提交之前失败,驱动程序将安排一个新的尝试“T1A2”并提交它。一切顺利。
但是:如果 T1A1 获得了提交权限,并且在提交过程中失败,则其一些输出可能已被写入目标目录。
如果随后指示尝试 T1A2 提交,则当且仅当其输出具有完全相同的文件名集时,任何已重命名的文件才会被覆盖。如果生成了不同的文件名,则输出将包含 T1A1 和 T1A2 的文件。
如果 T1A1 在提交过程中发生分区,那么作业提交器将计划另一次尝试并提交其工作。然而,如果 T1A1 仍然与文件系统保持连接,它仍然可以重命名文件。即使使用了相同的文件名,两个任务的输出也可能混杂在一起。
论文 零重命名提交器,Loughran 等人,涵盖了这些提交器
它还描述了提交问题,定义了正确性,并描述了 v1 和 v2 提交器的算法,以及 S3A 提交器、IBM Stocator 提交器和我们所了解的 EMR 的 Spark 提交器的算法。
hadoop-aws
JAR 包含一对提交器,“Staging”和“Magic”。这两个提交器都是同一问题的实现:安全且快速地将工作提交到 S3 对象存储。
提交器利用了 S3 提供创建文件的原子方式这一事实:PUT 请求。
文件要么存在,要么不存在。文件可以直接上传到其目标位置,并且只有在上传完成后,文件才会显现 - 覆盖任何现有副本。
对于大文件,多部分上传允许将此上传操作拆分为一系列 POST 请求
1 initiate-upload (路径 -> 上传 ID)
1. upload part(路径,上传 ID,数据[]) -> 校验和。
这可以并行化。最多可以将 10,000 个部分上传到单个对象。除最后一部分外,所有部分都必须 >= 5MB。1. complete-upload (路径,上传 ID,List<checksum>)
这会显现文件,根据校验和排序定义的块序列中的部分构建文件。
S3A 提交器的秘密在于,即使文件在任务尝试执行/提交期间上传,也可以将最终 POST 请求延迟到作业提交阶段。任务尝试需要确定每个文件的最终目标位置,将数据作为多部分操作的一部分上传,然后将完成上传所需的信息保存在一个文件中,该文件稍后由作业提交器读取并在 POST 请求中使用。
Staging 提交器基于 Netflix 的 Ryan Blue 的贡献。它依赖于 HDFS 作为一致的存储来传播 .pendingset
文件。
每个任务尝试的工作目录都在本地文件系统中,“暂存目录”。使用 v1 FileOutputCommitter 与群集 HDFS 文件系统配合,将完成上传所需的信息从任务尝试传递给作业提交程序。这可确保提交程序具有与 v1 算法相同的正确性保证。
Magic 提交程序是纯 S3A,并利用作者可以在文件系统客户端本身中进行更改这一事实。
定义了“Magic”路径,当在该路径下打开以进行写入时,会启动到最终目标目录的多方上传。当输出流close()
d 时,会将零字节标记文件写入到 magic 路径,并保存包含完成上传所需的所有信息的 JSON .pending 文件。
任务提交:1. 列出每个任务尝试的 magic 目录下的所有.pending
文件;1. 聚合到.pendingset
文件 1. 使用任务 ID 保存到作业尝试目录。
作业提交
.pendingset
文件Magic 提交程序绝对需要一个一致的 S3 存储 - 最初使用 S3Guard。现在 S3 是一致的,可以使用原始 S3。它不需要 HDFS 或任何其他具有rename()
的文件系统。
S3A 提交程序被认为是正确的,因为
已修复的重要问题包括
pendingset
已存在时失败spark.sql.sources.writeJobUUID
中获取唯一的作业 ID在影响正确性而非规模/性能/UX 的问题中:HADOOP-17258 涉及在 TA1 任务提交完成后从故障中恢复——但未报告。SPARK-33402、SPARK-33230 和 HADOOP-17318 都相关:如果两个 spark 作业/阶段在同一秒启动,则它们具有相同的作业 ID。这导致临时提交者使用的 HDFS 目录混杂在一起。
值得注意的是:这些都是最小集成测试套件未发现的问题。
好消息是:我们现在知道了这些问题,并且更有可能避免再次复制它们。并且知道要编写哪些测试。
V1 提交者在 ABFS 中表现不佳,原因是
V2 提交者在作业提交中速度快得多,因为它在任务提交中执行列表和重命名过程。由于它是非原子的,因此被认为使用起来很危险。V2 任务提交算法显示,可以通过仅使用逐个文件重命名来并行提交不同任务的输出。
V1 提交器在 GCS 上表现不佳,因为即使任务提交操作(即目录重命名)也是一个非原子的 O(files)
操作。这也意味着它是不安全的。
如果任务尝试已分区,而 Spark 驱动程序计划/提交另一个 TA,则任务目录可能包含来自第一次尝试的 1 个或多个文件。
此提交器支持的存储/文件系统必须
O(1)
文件重命名操作。此提交器支持的存储/文件系统应
EtagSource
接口。这用于 ABFS 重命名恢复,以及最终输出的可选验证。此提交器支持的存储/文件系统可能
此提交器支持的存储/文件系统可能不会
O(1)
目录删除。CleanupJobStage
假设情况并非如此,因此并行删除任务尝试目录。create(Path, overwrite=false)
操作。清单通过写入包含任务尝试 ID 的路径来提交,然后重命名为其最终路径。listFiles(path, recursive=true)
调用。此 API 调用未使用。与 FileOutputCommitter
相比,已删除的要求是
O(1)
目录删除。HDFS 满足所有这些要求,因此不会从该提交器中受益很多,尽管它仍可在此处使用。
即使现在 S3 存储已满足此提交者的重命名要求,它也不符合一致性要求。此提交者在 S3 上使用不安全。
每个作业都必须具有一个唯一 ID。
该实现希望 Spark 运行时具有相关补丁以确保这一点。
作业 ID 用于命名临时目录,而不是使用 _temporary/0/
的经典递增自然编号方案。该方案源自 MapReduce,其中尝试 ID > 1 的作业尝试查找由前代提交的任务,并将该任务合并到其结果中。
此提交者针对 Spark,其中没有恢复尝试。通过在路径中使用作业 ID,如果将作业配置为在作业清理/中止中不删除所有 _temporary
,则可以使用同一表作为其目标执行多个作业。
任务 ID 和任务尝试 ID 将像往常一样从作业 ID 派生。
应保证写入文件的的文件名是唯一的。这是 Spark 在 ORC 和 Parquet 文件中完成的,并允许默认情况下省略对目标文件的检查。
给定目标目录 destDir: Path
ID 为 jobID: String
且尝试次数为 jobAttemptNumber:int
的作业将使用目录
$destDir/_temporary/manifest_$jobID/$jobAttemptNumber/
作为其工作(注意:它实际上将使用 %02d
格式化该最终子目录)。
这称为作业尝试目录
在作业尝试目录下,创建一个子目录 tasks
。这称为任务尝试目录。每个任务尝试都将拥有自己的子目录,其工作将保存在其中。
在作业尝试目录下,创建一个子目录 manifests
。这称为y。
所有已提交任务的清单都将保存到此目录,文件名格式为 $taskId-manifest.json
完整路径
$destDir/_temporary/manifest_$jobID/$jobAttemptNumber/manifests/$taskId-manifest.json
是已提交任务创建的所有文件的清单的最终位置。它被称为已提交任务的清单路径。
任务尝试将使用临时文件名 $taskAttemptId-manifest.json.tmp
将其清单保存到此目录。
这称为任务尝试清单的临时路径。
然后,对于作业和任务操作,定义以下路径。
let jobDirectory = "$destDir/_temporary/manifest_$jobID/" let jobAttemptDirectory = jobDirectory + "$jobAttemptNumber/" let manifestDirectory = jobAttemptDirectory + "manifests/" let taskAttemptDirectory = jobAttemptDirectory + "tasks/"
并且对于每个任务尝试,还定义以下路径
let taskAttemptWorkingDirectory = taskAttemptDirectory + "$taskAttemptId" let taskManifestPath = manifestDirectory + "$taskId-manifest.json" let taskAttemptTemporaryManifestPath = manifestDirectory + "$taskAttemptId-manifest.json"
此 JSON 文件经过设计,其中包含(以及 IOStatistics 和一些诊断)
mkdir(jobAttemptDirectory) mkdir(manifestDirectory) mkdir(taskAttemptDirectory)
mkdir(taskAttemptWorkingDirectory)
通过以下方式提交任务尝试
此时不会进行重命名:文件保留在其原始位置,直到在作业提交中重命名。
let (renames, directories) = scan(taskAttemptWorkingDirectory) let manifest = new Manifest(renames, directories) manifest.save(taskAttemptTemporaryManifestPath) rename(taskAttemptTemporaryManifestPath, taskManifestPath)
delete(taskAttemptWorkingDirectory)
作业提交包括
_SUCCESS
文件(用于测试;使用写入和重命名进行原子保存)作业提交阶段支持对许多任务和每个任务的许多文件进行并行化,具体来说,有一个线程池用于并行存储 IO
let manifestPaths = list("$manifestDirectory/*-manifest.json") let manifests = manifestPaths.map(p -> loadManifest(p)) let directoriesToCreate = merge(manifests.directories) let filesToRename = concat(manifests.files) directoriesToCreate.map(p -> mkdirs(p)) filesToRename.map((src, dest, etag) -> rename(src, dest, etag)) if mapreduce.fileoutputcommitter.marksuccessfuljobs then success.save("$destDir/_SUCCESS")
实施说明
为了帮助调试和开发,摘要可以保存到相同或不同文件系统中的某个位置;中间清单可以重命名为目标文件系统中的某个位置。
if summary.report.directory != "" then success.save("${summary.report.directory}/$jobID.json") if diagnostics.manifest.directory != null then rename($manifestDirectory, "${diagnostics.manifest.directory}/$jobID")
即使由于任何原因导致作业提交失败,摘要报告也会被保存
作业清理通常是删除作业目录
delete(jobDirectory)
为了解决对象存储的规模问题,这应在所有任务尝试工作目录的(并行)删除之前
let taskAttemptWorkingDirectories = list("taskAttemptDirectory") taskAttemptWorkingDirectories.map(p -> delete(p))