Azure 和 Google Cloud Storage 的清单提交程序

本文档介绍如何使用清单提交程序

清单提交程序是一个提交程序,用于在 ABFS 上为“实际”查询提供性能,并在 GCS 上提供性能和正确性。它还适用于其他文件系统,包括 HDFS。但是,该设计针对列出操作缓慢且昂贵的对象存储进行了优化。

提交程序的架构和实现涵盖在清单提交程序架构中。

协议及其正确性涵盖在清单提交程序协议中。

它于 2022 年 3 月添加,在早期版本中应被视为不稳定。

问题

唯一可安全用于 Spark 到 Azure ADLS Gen 2 “abfs://” 存储的工作提交器是“v1 文件提交器”。

这是“正确的”,因为如果任务尝试失败,则保证其输出不会包含在最终输出中。“v2” 提交算法无法满足该保证,这就是它不再是默认算法的原因。

但是:它很慢,尤其是在使用输出的深度目录树的作业中。为什么它慢?很难指出一个具体的原因,主要是因为 FileOutputCommitter 中缺少任何检测工具。正在运行的作业的堆栈跟踪通常显示 rename(),尽管列表操作也会浮出水面。

在 Google GCS 上,v1 和 v2 算法都不是安全的,因为 google 文件系统没有 v1 算法所需的原子目录重命名。

另一个问题是 Azure 和 GCS 存储都可能遇到删除具有许多后代的目录的规模问题。这可能会触发超时,因为 FileOutputCommitter 假设在作业完成后进行清理是对 delete("_temporary", true) 的快速调用。

解决方案。

中间清单提交器是一个新的提交器,用于在 ABFS 上为“实际”查询提供性能,并在 GCS 上提供性能和正确性。

此提交器使用 S3A 提交器引入的扩展点。用户可以为 abfs://gcs:// URL 声明一个新的提交器工厂。配置适当的 spark 部署将选取新的提交器。

可以通过两个选项解决作业清理中的目录性能问题 1. 提交器将在删除 _temporary 目录之前并行删除任务尝试目录。1. 可以禁用清理。

提交器可与具有“真实”文件 rename() 操作的任何文件系统客户端一起使用。它针对列出和文件探测很昂贵的远程对象存储进行了优化 - 该设计不太可能在 HDFS 上提供如此显着的加速 - 尽管与经典的 v1 算法相比,并行重命名操作将加速那里的作业。

工作原理

完整详细信息在 Manifest Committer 架构 中介绍。

使用提交器

用于支持 S3A 提交器的钩子旨在允许每个文件系统模式提供自己的提交器。请参阅 切换到 S3A 提交器

abfs 模式的工厂将在 mapreduce.outputcommitter.factory.scheme.abfs 中定义;gcs 的工厂类似。

需要进行一些匹配的 Spark 配置更改,特别是对于 Parquet 绑定。如果在 mapred-default.xml JAR 中未定义,可以在 core-site.xml 中进行这些更改。

<property>
  <name>mapreduce.outputcommitter.factory.scheme.abfs</name>
  <value>org.apache.hadoop.fs.azurebfs.commit.AzureManifestCommitterFactory</value>
</property>
<property>
  <name>mapreduce.outputcommitter.factory.scheme.gs</name>
  <value>org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory</value>
</property>

在 Spark 中绑定到 manifest 提交器。

在 Apache Spark 中,可以通过命令行选项(在“–conf”之后)或使用 spark-defaults.conf 文件来完成配置。下面是使用 spark-defaults.conf 的示例,其中还包括 Parquet 的配置,以及使用工厂机制在内部的 Parquet 提交器的子类。

spark.hadoop.mapreduce.outputcommitter.factory.scheme.abfs org.apache.hadoop.fs.azurebfs.commit.AzureManifestCommitterFactory
spark.hadoop.mapreduce.outputcommitter.factory.scheme.gs org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory
spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol

使用 Cloudstore committerinfo 命令探测提交器绑定。

可以在 cloudstore 的最新版本中验证 Hadoop 提交器设置及其 committerinfo 命令。此命令通过与 MR 和 Spark 作业相同的工厂机制为该路径实例化一个提交器,然后打印其 toString 值。

hadoop jar cloudstore-1.0.jar committerinfo abfs://[email protected]/

2021-09-16 19:42:59,731 [main] INFO  commands.CommitterInfo (StoreDurationInfo.java:<init>(53)) - Starting: Create committer
Committer factory for path abfs://[email protected]/ is
 org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory@3315d2d7
  (classname org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory)
2021-09-16 19:43:00,897 [main] INFO  manifest.ManifestCommitter (ManifestCommitter.java:<init>(144)) - Created ManifestCommitter with
   JobID job__0000, Task Attempt attempt__0000_r_000000_1 and destination abfs://[email protected]/
Created committer of class org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitter:
 ManifestCommitter{ManifestCommitterConfig{destinationDir=abfs://[email protected]/,
   role='task committer',
   taskAttemptDir=abfs://[email protected]/_temporary/manifest_job__0000/0/_temporary/attempt__0000_r_000000_1,
   createJobMarker=true,
   jobUniqueId='job__0000',
   jobUniqueIdSource='JobID',
   jobAttemptNumber=0,
   jobAttemptId='job__0000_0',
   taskId='task__0000_r_000000',
   taskAttemptId='attempt__0000_r_000000_1'},
   iostatistics=counters=();

gauges=();

minimums=();

maximums=();

means=();
}

验证已使用提交器

新的提交器将在 _SUCCESS 文件中写入操作的 JSON 摘要,包括统计信息。

如果此文件存在且长度为零字节:则使用了经典的 FileOutputCommitter

如果此文件存在且长度大于零字节,则使用了 manifest 提交器,或者对于 S3A 文件系统,则使用了其中一个 S3A 提交器。它们都使用相同的 JSON 格式。

配置选项

以下是提交器的主要配置选项。

选项 含义 默认值
mapreduce.manifest.committer.delete.target.files 删除目标文件? false
mapreduce.manifest.committer.io.threads 并行操作的线程数 64
mapreduce.manifest.committer.summary.report.directory 保存报告的目录。 ""
mapreduce.manifest.committer.cleanup.parallel.delete 并行删除临时目录 true
mapreduce.fileoutputcommitter.cleanup.skipped 跳过 _temporary 目录的清理 false
mapreduce.fileoutputcommitter.cleanup-failures.ignored 忽略清理期间的错误 false
mapreduce.fileoutputcommitter.marksuccessfuljobs 在成功完成时创建一个 _SUCCESS 标记文件(并在作业设置中删除任何现有文件) true

还有更多内容,如(高级)[#advanced] 部分中所述。

扩展作业 mapreduce.manifest.committer.io.threads

此提交器比经典 FileOutputCommitter 更快的主要原因是,它尝试在作业提交期间尽可能并行化文件 IO,具体而言

  • 任务清单加载
  • 删除将创建目录的文件
  • 目录创建
  • 逐个文件重命名
  • 在作业清理中删除任务尝试目录

所有这些操作都在同一个线程池中执行,其大小在选项 mapreduce.manifest.committer.io.threads 中设置。

可以使用较大的值。

Hadoop XML 配置

<property>
  <name>mapreduce.manifest.committer.io.threads</name>
  <value>32</value>
</property>

spark-defaults.conf

spark.hadoop.mapreduce.manifest.committer.io.threads 32

大于分配给 MapReduce AM 或 Spark 驱动的内核数量的较大值不会直接使 CPU 过载,因为线程通常等待针对对象存储/文件系统的(慢)IO 完成。

作业提交中的清单加载可能是内存密集型的;线程数量越多,同时加载的清单就越多。

注意事项 * 在 Spark 中,可以在同一进程中提交多个作业,每个作业在作业提交或清理期间都会创建自己的线程池。 * 如果对存储发出了太多 IO 请求,可能会触发 Azure 速率限制。速率限制选项 mapreduce.manifest.committer.io.rate 可以帮助避免这种情况。

mapreduce.manifest.committer.writer.queue.capacity

这是一个辅助扩展选项。它控制队列的大小,用于存储从目标文件系统加载的清单、从工作线程池加载的清单以及将每个清单中的条目保存到本地文件系统中的中间文件中的单个线程的文件重命名列表。

队列满后,所有清单加载线程都将阻塞。

<property>
  <name>mapreduce.manifest.committer.writer.queue.capacity</name>
  <value>32</value>
</property>

由于本地文件系统通常比任何云存储都更快地写入,因此此队列大小不应成为清单加载性能的限制。

它可以帮助限制作业提交期间清单加载期间消耗的内存量。加载的清单的最大数量将是

mapreduce.manifest.committer.writer.queue.capacity + mapreduce.manifest.committer.io.threads

可选:在作业提交中删除目标文件

经典的 FileOutputCommitter 会在将作业文件重命名到指定位置之前删除目标路径中的文件。

这是清单提交器中的可选设置,在选项 mapreduce.manifest.committer.delete.target.files 中设置,默认值为 false

这可以提高性能,并且在作业创建的所有文件都具有唯一文件名时安全使用。

SPARK-8406 添加 UUID 到输出文件名以避免意外覆盖 以来,Apache Spark 会为 ORC 和 Parquet 生成唯一文件名

避免检查/删除目标文件可以为每个提交的文件节省一次删除调用,因此可以节省大量的存储 IO。

在追加到现有表时,使用除 ORC 和 parquet 之外的格式,除非确信每个文件名都添加了唯一标识符,否则启用删除目标文件。

spark.hadoop.mapreduce.manifest.committer.delete.target.files true

注意 1:当提交器创建将文件重命名的目录时,它将跳过删除操作。这会使其效率略高,至少在追加数据的作业创建和写入新分区时是这样。

注意 2:提交器仍要求单个作业中的任务创建唯一文件。这是任何作业生成正确数据的基础。

Spark 动态分区覆盖

Spark 有一个称为“动态分区覆盖”的功能,

这可以在 SQL 中启动

INSERT OVERWRITE TABLE ...

或通过 DataSet 写入(其中模式为 overwrite 且分区与现有表相匹配)

sparkConf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
// followed by an overwrite of a Dataset into an existing partitioned table.
eventData2
  .write
  .mode("overwrite")
  .partitionBy("year", "month")
  .format("parquet")
  .save(existingDir)

此功能在 Spark 中实现,它 1. 指示作业将其新数据写入临时目录 1. 在作业提交完成后,扫描输出以识别写入数据的叶目录“分区”。 1. 删除目标表中这些目录的内容 1. 将新文件重命名为分区。

所有这些都在 spark 中完成,它接管了扫描中间输出树、删除分区和重命名新文件等任务。

此功能还增加了作业完全在目标表外部写入数据的能力,方法是 1. 将新文件写入工作目录 1. spark 在作业提交时将它们移动到最终目标

清单提交器与 Azure 和 Google 云存储上的动态分区覆盖兼容,因为它们一起满足了扩展的核心要求: 1. getWorkPath() 中返回的工作目录与最终输出位于同一文件系统中。 2. rename() 是一个 O(1) 操作,在提交作业时安全且快速使用。

没有 S3A 提交器支持此功能。条件 (1) 未被暂存提交器满足,而条件 (2) 未被 S3 本身满足。

若要将清单提交程序与动态分区覆盖一起使用,Spark 版本必须包含 SPARK-40034路径输出提交程序才能与动态分区覆盖一起使用

请注意,如果重命名了多个文件,则操作的重命名阶段会很慢 - 这是按顺序进行的。并行重命名会加快此过程,但可能会触发清单提交程序旨在最大程度降低风险并支持从中恢复的 ABFS 过载问题

提交操作的 Spark 端将列出/遍历临时输出目录(一些开销),然后使用经典文件系统 rename() 调用进行文件提升。此处不会有明确的速率限制。

这意味着什么?

这意味着对于创建了数千个文件的 Azure 存储 SQL 查询/Spark 数据集操作,不应使用动态分区。在限制规模问题浮出水面之前,这些操作将遭受性能问题,这一点应被视为警告。

_SUCCESS 文件中的作业摘要

原始 Hadoop 提交程序在输出目录的根目录中创建一个零字节 _SUCCESS 文件,除非已禁用。

此提交程序写入一个 JSON 摘要,其中包括 * 提交程序的名称。 * 诊断信息。 * 创建的一些文件的列表(用于测试;已排除完整列表,因为它可能会很大)。 * IO 统计信息。

如果在运行查询后,此 _SUCCESS 文件的长度为零字节,则表示未曾使用过新提交程序

如果它不为空,则可以检查它。

通过 ManifestPrinter 工具查看 _SUCCESS 文件文件。

摘要文件为 JSON,可以在任何文本编辑器中查看。

若要获得更简洁的摘要,包括更好的统计信息显示,请使用 ManifestPrinter 工具。

hadoop org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestPrinter <path>

这适用于保存在输出目录的底部以及保存到报告目录的任何报告的文件。

收集作业摘要 mapreduce.manifest.committer.summary.report.directory

可以通过在选项 mapreduce.manifest.committer.summary.report.directory 中设置文件系统路径,将提交程序配置为将 _SUCCESS 摘要文件保存到报告目录,无论作业是否成功或失败。

该路径不必与工作的目标位于同一存储/文件系统上。例如,可以使用本地文件系统。

XML

<property>
  <name>mapreduce.manifest.committer.summary.report.directory</name>
  <value>file:///tmp/reports</value>
</property>

spark-defaults.conf

spark.hadoop.mapreduce.manifest.committer.summary.report.directory file:///tmp/reports

这允许收集作业的统计信息,无论其结果如何,无论是否启用了保存 _SUCCESS 标记,以及不会因一系列查询覆盖标记而导致问题。

清理

作业清理很复杂,因为它旨在解决云存储中可能出现的许多问题。

  • 删除目录时的性能缓慢。
  • 删除非常深且宽的目录树时超时。
  • 一般弹性,可防止清理问题升级为作业故障。
选项 含义 默认值
mapreduce.fileoutputcommitter.cleanup.skipped 跳过 _temporary 目录的清理 false
mapreduce.fileoutputcommitter.cleanup-failures.ignored 忽略清理期间的错误 false
mapreduce.manifest.committer.cleanup.parallel.delete 并行删除任务尝试目录 true

算法是

if `mapreduce.fileoutputcommitter.cleanup.skipped`:
  return
if `mapreduce.manifest.committer.cleanup.parallel.delete`:
  attempt parallel delete of task directories; catch any exception
if not `mapreduce.fileoutputcommitter.cleanup.skipped`:
  delete(`_temporary`); catch any exception
if caught-exception and not `mapreduce.fileoutputcommitter.cleanup-failures.ignored`:
  throw caught-exception

它有点复杂,但目标是执行快速/可扩展的删除,如果不起作用,则抛出有意义的异常。

在使用 ABFS 和 GCS 时,这些设置通常应保持不变。如果在清理期间出现错误,启用忽略故障的选项将确保作业仍能完成。禁用清理甚至可以避免清理开销,但需要工作流或手动操作定期清理所有 _temporary 目录。

使用 Azure ADLS Gen2 存储

要切换到清单提交程序,必须将具有 abfs:// URL 的目标的提交程序工厂切换到清单提交程序工厂,适用于应用程序或整个群集。

<property>
  <name>mapreduce.outputcommitter.factory.scheme.abfs</name>
  <value>org.apache.hadoop.fs.azurebfs.commit.AzureManifestCommitterFactory</value>
</property>

这允许在提交程序中使用 ADLS Gen2 特定的性能和一致性逻辑。特别是:* Etag 标头可以在清单中收集并在作业提交阶段使用。* IO 重命名操作受到速率限制 * 当节流触发重命名故障时,会尝试恢复。

警告 此提交程序与较旧的 Azure 存储服务(WASB 或 ADLS Gen 1)不兼容。

经过 Azure 优化的核心选项集变为

<property>
  <name>mapreduce.outputcommitter.factory.scheme.abfs</name>
  <value>org.apache.hadoop.fs.azurebfs.commit.AzureManifestCommitterFactory</value>
</property>

<property>
  <name>spark.hadoop.fs.azure.io.rate.limit</name>
  <value>10000</value>
</property>

以及用于调试/性能分析的可选设置

<property>
  <name>mapreduce.manifest.committer.summary.report.directory</name>
  <value>abfs:// Path within same store/separate store</value>
  <description>Optional: path to where job summaries are saved</description>
</property>

适用于 spark 的 ABFS 选项的完整集

spark.hadoop.mapreduce.outputcommitter.factory.scheme.abfs org.apache.hadoop.fs.azurebfs.commit.AzureManifestCommitterFactory
spark.hadoop.fs.azure.io.rate.limit 10000
spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol

spark.hadoop.mapreduce.manifest.committer.summary.report.directory  (optional: URI of a directory for job summaries)

实验性:ABFS 重命名速率限制 fs.azure.io.rate.limit

为了避免触发存储节流和退避延迟以及其他与节流相关的故障条件,作业提交期间的文件重命名通过“速率限制器”进行节流,该限制器限制 ABFS FileSystem 客户端的单个实例每秒可以发出的重命名操作数。

选项 含义
fs.azure.io.rate.limit IO 操作的每秒操作速率限制。

将选项设置为 0 以移除所有速率限制。

此选项的默认值设置为 10000,这是 ADLS 存储帐户的默认 IO 容量。

<property>
  <name>fs.azure.io.rate.limit</name>
  <value>10000</value>
  <description>maximum number of renames attempted per second</description>
</property>

此容量在文件系统客户端级别设置,因此不会在单个应用程序中的所有进程之间共享,更不用说共享相同存储帐户的其他应用程序了。

它将与由同一 Spark 驱动程序提交的所有作业共享,因为它们共享该文件系统连接器。

如果实施速率限制,则统计信息 store_io_rate_limited 将报告获取用于提交文件的许可证的时间。

如果发生服务器端限制,则可以在以下位置看到此限制的迹象:* 存储服务的日志及其限制状态代码(通常为 503 或 500)。* 作业统计信息 commit_file_rename_recovered。此统计信息表明 ADLS 限制表现为重命名失败,这些失败在提交程序中已恢复。

如果看到这些迹象,或者同时运行的其他应用程序遇到限制/限制触发的故障,请考虑降低 fs.azure.io.rate.limit 的值,和/或向 Microsoft 请求更高的 IO 容量。

重要提示 如果您确实从 Microsoft 获得了额外的容量,并且您希望使用它来加速作业提交,请跨集群或专门针对您希望分配额外优先级的那些作业增加 fs.azure.io.rate.limit 的值。

此功能仍在开发中;它可能会扩展以支持单个文件系统实例执行的所有 IO 操作。

使用 Google Cloud Storage

清单提交程序与 Google 云存储兼容,并通过 google 的 gcs-connector 库针对 Google 云存储进行了测试,该库为模式 gs 提供了 Hadoop 文件系统客户端。

Google 云存储具有使提交协议能够安全工作的语义。

切换到此提交程序的 Spark 设置为

spark.hadoop.mapreduce.outputcommitter.factory.scheme.gs org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory
spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol

spark.hadoop.mapreduce.manifest.committer.summary.report.directory  (optional: URI of a directory for job summaries)

存储的目录删除操作为 O(files),因此 mapreduce.manifest.committer.cleanup.parallel.delete 的值应保留为默认值 true

对于 mapreduce,在 core-site.xmlmapred-site.xml 中声明绑定

<property>
  <name>mapreduce.outputcommitter.factory.scheme.gcs</name>
  <value>org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory</value>
</property>

使用 HDFS

此提交者确实与 HDFS 协同工作,它刚刚针对对象存储进行调整,在某些操作(尤其是列出和重命名)中性能降低,语义也过于简化,导致经典的 FileOutputCommitter 无法依赖(尤其是 GCS)。

要在 HDFS 上使用,请将 ManifestCommitterFactory 设置为 hdfs:// URL 的提交者工厂。

由于 HDFS 执行快速目录删除,因此无需在清理期间并行删除任务尝试目录,因此将 mapreduce.manifest.committer.cleanup.parallel.delete 设置为 false

最终的 Spark 绑定变为

spark.hadoop.mapreduce.outputcommitter.factory.scheme.hdfs org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory
spark.hadoop.mapreduce.manifest.committer.cleanup.parallel.delete false
spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol

spark.hadoop.mapreduce.manifest.committer.summary.report.directory  (optional: URI of a directory for job summaries)

高级主题

高级配置选项

有一些高级选项,它们适用于开发和测试,而不是生产用途。

选项 含义 默认值
mapreduce.manifest.committer.store.operations.classname Manifest 存储操作的类名 ""
mapreduce.manifest.committer.validate.output 执行输出验证? false
mapreduce.manifest.committer.writer.queue.capacity 写入中间文件的队列容量 32

验证输出 mapreduce.manifest.committer.validate.output

选项 mapreduce.manifest.committer.validate.output 触发对每个重命名文件的检查,以验证它是否具有预期的长度。

这增加了每个文件一个 HEAD 请求的开销,因此仅建议用于测试。

没有对实际内容进行验证。

控制存储集成 mapreduce.manifest.committer.store.operations.classname

Manifest 提交者通过 ManifestStoreOperations 接口的实现与文件系统进行交互。可以为特定于存储的功能提供自定义实现。ABFS 有一个这样的实现;当使用特定于 abfs 的提交者工厂时,会自动设置此实现。

可以显式设置它。

<property>
  <name>mapreduce.manifest.committer.store.operations.classname</name>
  <value>org.apache.hadoop.fs.azurebfs.commit.AbfsManifestStoreOperations</value>
</property>

默认实现也可以配置。

<property>
  <name>mapreduce.manifest.committer.store.operations.classname</name>
  <value>org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperationsThroughFileSystem</value>
</property>

无需更改这些值,除非为其他存储编写新实现,只有当存储为提交者提供额外的集成支持时才需要这样做。

支持对同一目录的并发作业

可能可以运行针对同一目录树的多个作业。

要实现此目的,必须满足以下条件

  • 使用 Spark 时,必须设置唯一的作业 ID。这意味着 Spark 分发版必须包含 SPARK-33402SPARK-33230 的补丁。
  • 必须通过将 mapreduce.fileoutputcommitter.cleanup.skipped 设置为 true 来禁用 _temporary 目录的清理。
  • 所有作业/任务都必须创建具有唯一文件名的文件。
  • 所有作业都必须创建具有相同目录分区结构的输出。
  • 作业/查询不得使用 Spark 动态分区“INSERT OVERWRITE TABLE”;数据可能会丢失。这适用于所有提交程序,而不仅仅是清单提交程序。
  • 别忘了稍后删除 _temporary 目录!

尚未经过测试