org.apache.hadoop.fs.FutureDataInputStreamBuilder

一个接口,提供构建器模式,用于创建 Java FutureFSDataInputStream 及其子类的引用。它用于初始化(可能异步)操作,以打开现有文件进行读取。

历史记录

Hadoop 3.3.0:引入的 API

HADOOP-15229 添加基于 FileSystem 构建器的 openFile() API 以匹配 createFile()

  • 没有可用的 opt(String key, long value) 方法。
  • withFileStatus(status) 调用需要一个非空参数。
  • 处理选项和文件状态的唯一文件系统是 S3A;
  • 只有 S3 特定的选项是 S3 选择和 fs.s3a.experimental.input.fadvise
  • 如果传入文件状态且文件状态的路径与 openFile(path) 调用的路径不匹配,S3A 文件系统会引发 IllegalArgumentException

这是基准实现。要编写保证与该版本编译的代码,请使用 opt(String, String)must(String, String) 方法,将数字显式转换为字符串。

fs.open("s3a://bucket/file")
  .opt("fs.option.openfile.length", Long.toString(length))
  .build().get()

Hadoop 3.3.5:标准化和扩展

HADOOP-16202 增强 openFile() 以提高针对对象存储的读取性能

  • 需要接受(并忽略)withFileStatus(null)
  • openFile(path) 中传入的任何 FileStatus 路径,只有文件名部分必须与文件名匹配。
  • 添加了 opt(String key, long value) 选项。*现在已弃用,因为它导致了回归
  • 定义了标准 fs.option.openfile 选项。
  • S3A FS 使用 openfile 长度选项,尚未使用 seek 开始/结束选项。
  • Azure ABFS 连接器采用提供的 VersionedFileStatus 并省略对对象的任何 HEAD 探测。

Hadoop 3.3.6:API 更改以解决操作符重载错误。

新的 optLong()optDouble()mustLong()mustDouble() 构建器方法。

  • 请参阅 HADOOP-18724 S3AFileSystem 的 open file 失败并显示 NumberFormatException,这在某种程度上是由重载的 opt(long) 引起的。
  • 已更新规范,声明无法解析的数字必须视为“未设置”,并使用默认值。

不变量

FutureDataInputStreamBuilder 接口在调用 build() 和/或在异步打开操作本身期间之前不需要参数或 FileSystem 的状态。

文件系统的状态的某些方面可以在初始 openFile() 调用中检查,前提是已知它们是不变量,并且不会在 openFile()build().get() 序列之间更改。例如,路径验证。

`与实现无关的参数。

FutureDataInputStreamBuilder bufferSize(int bufSize)

设置要使用的缓冲区的大小。

FutureDataInputStreamBuilder withFileStatus(FileStatus status)

一个引用正在打开的文件的 FileStatus 实例。

实现可以将其用于短路文件检查,因此可以节省远程调用,特别是对对象存储的调用。

要求

  • status != null
  • status.getPath().getName() == 正在打开的文件的名称。

如果存储在打开文件时使用 FileStatus,则必须进行路径验证,否则可以执行。验证应推迟到 build() 操作。

此操作应被视为对文件系统的提示。

如果文件系统实现扩展了 FileStatus,则其实现中返回的 FileStatus 在打开文件时可以使用此信息。

这与返回版本/etag 信息的存储相关,- 它们可以使用此信息来保证打开的文件与列表中返回的文件完全相同。

提供的状态的最终 status.getPath().getName() 元素必须等于提供给 openFile(path) 调用的路径的名称值。

文件系统不得验证路径的其余部分。需要这样做以支持 viewfs 和其他挂载点包装文件系统,其中架构和路径不同。这些通常会创建它们自己的 FileStatus 结果

先决条件

status == null or status.getPath().getName() == path.getName()

文件系统不得要求 status 的类等于其实现返回的任何特定子类的类,这些子类在 filestatus/list 操作中返回。这是为了支持包装文件系统和状态的序列化/反序列化。

设置可选或强制参数

FutureDataInputStreamBuilder opt(String key, String value)
FutureDataInputStreamBuilder opt(String key, int value)
FutureDataInputStreamBuilder opt(String key, boolean value)
FutureDataInputStreamBuilder optLong(String key, long value)
FutureDataInputStreamBuilder optDouble(String key, double value)
FutureDataInputStreamBuilder must(String key, String value)
FutureDataInputStreamBuilder must(String key, int value)
FutureDataInputStreamBuilder must(String key, boolean value)
FutureDataInputStreamBuilder mustLong(String key, long value)
FutureDataInputStreamBuilder mustDouble(String key, double value)

向构建器设置可选或强制参数。使用 opt()must(),客户端可以指定特定于 FS 的参数,而无需检查 FileSystem 的具体类型。

示例

out = fs.openFile(path)
    .must("fs.option.openfile.read.policy", "random")
    .optLong("fs.http.connection.timeout", 30_000L)
    .withFileStatus(statusFromListing)
    .build()
    .get();

此处已指定 random 的读取策略,要求文件系统实现必须理解该选项。已提供一个特定于 http 的选项,任何存储都可以解释该选项;如果打开文件的文件系统无法识别该选项,则可以安全地忽略它。

何时使用 optmust

optmust 的区别在于打开文件的 FileSystem 对其不识别的选项的反应方式。

def must(name, value):
  if not name in known_keys:
    raise IllegalArgumentException
  if not name in supported_keys:
    raise UnsupportedException


def opt(name, value):
  if not name in known_keys:
     # ignore option

对于任何已知密钥,value 参数的验证必须相同,无论如何声明 (key, value) 对。

  1. 对于特定于文件系统的选项,如何验证条目是实现的选择。
  2. 对于标准选项,有效 value 的规范在此文件系统规范中定义,通过契约测试验证。

实现说明

必须在 build() 操作中检查受支持的选项。

  1. 如果未识别通过 must(key, value)) 声明的强制性参数,则必须抛出 IllegalArgumentException

  2. 如果通过 must(key, value) 声明的强制性参数依赖于特定 FileSystem/FileContext 实例中识别但不受支持的功能,则必须抛出 UnsupportedException

应修剪任何字符串的数字值的解析,如果无法将该值解析为数字,则降级为提供的任何默认值。这是为了解决 HADOOP-18724 S3AFileSystem 的 Open file 失败,并出现 NumberFormatException,这是由于当传入长值时,重载的 opt() 构建器参数绑定到 opt(String, double) 而不是 opt(String, long) 导致的。

解决由构建器方法(即 bufferSize())和 opt()/must() 设置的参数之间的冲突的行为如下

指定的最后一个选项定义值及其可选/强制状态。

如果在 withFileStatus() 中传入 FileStatus 选项,则实现必须接受 FileStatus 的所有子类,包括 LocatedFileStatus,而不仅仅是实现实现的任何 FS 特定子类(例如 S3AFileStatus)。它们可以简单地忽略那些不是自定义子类的子类。

这对于确保安全使用该功能至关重要:目录列表/状态序列化/反序列化可能导致 withFileStatus() 参数不是文件系统实例自己的 getFileStatus()listFiles()listLocatedStatus() 调用等返回的自定义子类。

在这种情况下,实现必须

  1. 验证 status.getPath().getName() 是否与当前 path.getName() 值匹配。路径的其余部分不得验证。
  2. 按需使用任何状态字段,例如文件长度。

即使不使用状态值,参数的存在也可以解释为调用者声明他们认为文件存在且具有给定大小。

Builder 接口

CompletableFuture<FSDataInputStream> build()

返回一个 CompletableFuture<FSDataInputStream>,当成功完成时,返回一个可以从文件系统读取数据的输入流。

build() 操作可以执行文件是否存在、其类型的验证,因此拒绝尝试从目录或不存在的文件中读取。或者,* 文件存在/状态检查可以在返回的 CompletableFuture<> 中异步执行。* 文件存在/状态检查可以推迟到在 read()PositionedRead 等任何读取中读取第一个字节为止。

也就是说,前提条件 exists(FS, path)isFile(FS, path) 仅在对返回的 future 调用 get() 并且尝试读取流之后才保证满足。

因此,即使在文件不存在或是一个目录而不是文件的情况下,以下调用也必须成功,返回一个要评估的 CompletableFuture

Path p = new Path("file://tmp/file-which-does-not-exist");

CompletableFuture<FSDataInputStream> future = p.getFileSystem(conf)
      .openFile(p)
      .build();

无法访问/读取文件必须在 future 的 get() 调用中引发 IOException 或子类,或者对于延迟绑定操作,在调用读取数据操作时引发。

因此,当在先前示例返回的 future 上调用时,以下序列将失败。

  future.get().read();

访问权限检查具有相同的可见性要求:权限失败必须延迟到 get() 调用,并且可以延迟到后续操作中。

注意:对输入流的一些操作(例如 seek())可能根本不会尝试任何 IO。当与不存在/不可读的文件进行交互时,此类操作可能不会引发异常。

自 Hadoop 3.3.3 以来的标准 openFile() 选项

这些是 FileSystemFileContext 实现必须识别并可能通过适当地更改其输入流的行为来支持的选项。

Hadoop 3.3.0 添加了 openFile() API;这些标准选项是在较新的版本中定义的。因此,虽然它们是“众所周知的”,但除非确信应用程序只会针对知道这些选项的 Hadoop 版本执行,否则应用程序应通过 opt() 调用而不是 must() 来设置选项。

通过 openFile() 构建器 API 打开文件时,调用者可以使用 .opt(key, value).must(key, value) 调用来设置标准选项和特定于文件系统的选项。

如果作为 opt() 参数设置,则必须忽略不受支持的“标准”选项,并且必须忽略无法识别的标准选项。

如果作为 must() 参数设置,则必须忽略不受支持的“标准”选项。必须拒绝无法识别的标准选项。

标准 openFile() 选项在 org.apache.hadoop.fs.OpenFileOptions 中定义;它们都应以 fs.option.openfile. 开头。

请注意,虽然所有 FileSystem/FileContext 实例都应支持这些选项,以至于 must() 声明不会失败,但这些实现可能支持它们以解释值。这意味着存储实际读取读取策略或文件长度值并在打开文件时使用它们并不是必需的。

除非另有说明,否则应将它们视为提示。

注意:如果添加了这样的标准选项:如果设置但不受支持,则会出错,那么实现将拒绝它。例如,S3A 文件系统客户端支持将 SQL 命令下推的能力。如果将类似内容标准化,那么对于不支持此功能的文件系统,必须拒绝在 opt()must() 参数中使用该选项。

选项:fs.option.openfile.buffer.size

以字节为单位的读取缓冲区大小。

这将使用选项 io.file.buffer.size 覆盖配置中设置的默认值。

它受所有文件系统客户端支持,这些客户端允许通过 FileSystem.open(path, buffersize) 设置特定于流的缓冲区大小。

选项:fs.option.openfile.read.policy

声明输入流的读取策略。这是对输入流预期读取模式的提示。这可能会控制预读、缓冲和其他优化。

顺序读取可以通过预取数据和/或以更大的块读取数据进行优化。某些应用程序(例如 distCp)即使在列式数据上也会执行顺序 IO。

相反,随机 IO 使用一系列 seek()/read() 或通过 PositionedReadableByteBufferPositionedReadable API 在文件的不同部分读取数据。

如果几乎不进行预取或进行其他可能的优化,随机 IO 性能可能会最佳

对 Apache ORC 和 Apache Parquet 等列式格式的查询执行此类随机 IO;其他数据格式可能最适合使用顺序或全文件策略读取。

关键在于,为顺序读取优化读取可能会损害随机性能,反之亦然。

  1. 查找策略是一个提示;即使声明为 must() 选项,文件系统也可能会忽略它。
  2. 策略的解释/实现是特定于文件系统行为的,并且它可能会随着 Hadoop 版本和/或特定存储子系统而改变。
  3. 如果未识别策略,文件系统客户端必须忽略它。
策略 含义
adaptive 存储实现的任何自适应策略。
default 此存储的默认策略。通常为“自适应”。
random 针对随机访问进行优化。
sequential 针对顺序访问进行优化。
vector 打算使用矢量化 IO API。
whole-file 将读取整个文件。

为输入源选择错误的读取策略可能会导致效率低下。

可以提供读取策略列表;文件系统识别/支持的第一个策略应为所使用的策略。这允许支持自定义策略,例如针对 HBase HFile 优化的 hbase-hfile 策略。

S3A 和 ABFS 输入流都实现了 IOStatisticsSource API,并且可以查询它们的 IO 性能。

提示:DEBUG 记录输入流的 toString() 值。S3A 和 ABFS 输入流记录读取统计信息,这些统计信息可以提供有关读取是否高效执行的见解。

进一步阅读

读取策略 adaptive

尝试将查找策略调整为应用程序的读取模式。

S3A 客户端的 normal 策略和 wasb: 客户端支持的唯一策略都是自适应的 - 它们假定顺序 IO,但一旦进行向后寻址/定位读取调用,流就会切换到随机 IO。

其他文件系统实现可能希望采用类似的策略,和/或扩展算法以检测前向寻址和/或在认为更有效时从随机 IO 切换到顺序 IO。

自适应读取策略无法在 open() API 中声明寻址策略,因此要求在群集/应用程序配置中声明它(如果可配置)。但是,从顺序寻址策略切换到随机寻址策略可能会很昂贵。

如果应用程序显式设置 fs.option.openfile.read.policy 选项,并且它们知道自己的读取计划,则它们应该声明哪个策略最合适。

读取策略 ``

文件系统实例的默认策略。特定于实现/安装。

读取策略 sequential

期望从第一个读取的字节到文件末尾/流关闭顺序读取。

读取策略 random

期望 seek()/read() 序列,或使用 PositionedReadableByteBufferPositionedReadable API。

读取策略 vector

这声明调用方打算使用 HADOOP-11867 添加高性能矢量化读取 API 的矢量化读取 API。

这是一个提示:在使用 API 时这不是必需的。它确实会告知实现,如果已实现此类功能,则应将流配置为实现最佳矢量化 IO 性能。

不是排他性的:同一个流仍然可以用于经典的 InputStreamPositionedRead API 调用。实现应该对这些操作使用 random 读取策略。

读取策略 whole-file

这声明整个文件将从头到尾读取;文件系统客户端可以自由启用任何策略来最大化性能。特别是,更大的范围读取/GET 可以通过降低套接字/TLS 设置成本并提供足够长的时间连接来确定最佳下载速率,从而提供高带宽。

策略可以包括

  • openFile() 操作中启动整个文件的 HTTP GET。
  • 预取大块中的数据,可能在并行读取操作中。

应用程序知道整个文件将从打开的流中读取,应声明此读取策略。

选项:fs.option.openfile.length

声明文件的长度。

客户端可以使用此选项来跳过在打开文件时向远程存储查询文件大小/是否存在,类似于通过 withFileStatus() 选项声明文件状态。

如果文件系统连接器支持,则此选项必须解释为声明文件的最小长度

  1. 如果值为负,则应将选项视为未设置。
  2. 如果文件的实际长度大于此值,则不应出错。
  3. read()seek() 和定位的读取调用可以使用跨越/超出此长度但低于文件实际长度的位置。在这些情况下,实现可能会引发 EOFExceptions,或者它们可能会返回数据。

如果文件系统实现使用此选项

实现者注释

  • 必须忽略 fs.option.openfile.length < 0 的值。
  • 如果文件状态与 fs.opt.openfile.length 中的值一起提供;文件状态值优先。

选项:fs.option.openfile.split.startfs.option.openfile.split.end

当文件已拆分为多个部分进行处理时,声明拆分的开始和结束。

  1. 如果值为负,则应将选项视为未设置。
  2. 文件系统可能会假设文件长度大于或等于 fs.option.openfile.split.end 的值。
  3. 并且如果客户端应用程序读取了 fs.option.openfile.split.end 中设置的值,它们可能会引发异常。
  4. 可以将这对选项用于优化读取计划,例如设置 GET 请求的内容范围,或使用拆分结束作为对文件最小保证长度的隐式声明。
  5. 如果设置了这两个选项,并且声明的拆分开始大于拆分结束,则应将拆分开始重置为零,而不是拒绝操作。

拆分结束值可以提供输入流结束的提示。拆分开始可用于优化文件系统客户端的任何初始读取偏移量。

*实施者须知:当应用程序需要读取记录/行的末尾时,它们将读取超过拆分的末尾,而该记录/行始于拆分的末尾之前。

因此,如果文件实际长度大于该值,则客户端必须被允许seek()/read()超过 fs.option.openfile.split.end 中设置的长度。

S3A 特定的选项

S3A 连接器支持用于预读和搜索策略的自定义选项。

名称 类型 含义
fs.s3a.readahead.range long 以字节为单位的预读范围
fs.s3a.experimental.input.fadvise String 搜索策略。由 fs.option.openfile.read.policy 取代
fs.s3a.input.async.drain.threshold long 切换到流异步排水的阈值。(自 3.3.5 起)

如果选项集中包含 fs.s3a.select.sql 语句中的 SQL 语句,则该文件将作为 S3 Select 查询打开。有关更多详细信息,请查阅 S3A 文档。

ABFS 特定的选项

ABFS 连接器支持自定义输入流选项。

名称 类型 含义
fs.azure.buffered.pread.disable boolean 禁用定位读取操作上的缓存。

禁用通过 PositionedReadable API 读取的数据上的缓存。

有关更多详细信息,请查阅 ABFS 文档。

示例

在打开文件时声明搜索策略和拆分限制。

以下是一个概念验证 org.apache.parquet.hadoop.util.HadoopInputFile 读取器的示例,它使用(可为空)文件状态和拆分开始/结束。

FileStatus 值始终传入 - 但如果它为空,则拆分结束用于声明文件的长度。

protected SeekableInputStream newStream(Path path, FileStatus stat,
     long splitStart, long splitEnd)
     throws IOException {

   FutureDataInputStreamBuilder builder = fs.openFile(path)
   .opt("fs.option.openfile.read.policy", "vector, random")
   .withFileStatus(stat);

   builder.optLong("fs.option.openfile.split.start", splitStart);
   builder.optLong("fs.option.openfile.split.end", splitEnd);
   CompletableFuture<FSDataInputStream> streamF = builder.build();
   return HadoopStreams.wrap(FutureIO.awaitFuture(streamF));
}

因此,无论是直接由文件列表驱动,还是从 (path, splitStart, splitEnd) 的查询计划中打开文件,都不需要探测远程存储以获取文件长度。在使用远程对象存储时,即使异步执行此类探测,也可以节省几十到几百毫秒。

如果设置了文件长度和分割结束位置,则文件长度必须被视为“更”权威,即它确实应该定义文件长度。如果设置了分割结束位置,则调用者可能不会读取它之后的内容。

如果 CompressedSplitLineReader 正在处理压缩记录,则它可以读取分割结束位置之后的内容。也就是说:它假设不完整的记录读取意味着文件长度大于分割长度,并且它必须读取部分读取记录的全部内容。其他读取器可能表现得类似。

因此

  1. FileStatusfs.option.openfile.length 中提供的文件长度应设置文件的严格上限
  2. fs.option.openfile.split.end 中设置的分割结束位置必须被视为提示,而不是文件的严格结束位置。

使用标准和非标准选项打开文件

标准和非标准选项可以在同一个 openFile() 操作中组合。

Future<FSDataInputStream> f = openFile(path)
  .must("fs.option.openfile.read.policy", "random, adaptive")
  .opt("fs.s3a.readahead.range", 1024 * 1024)
  .build();

FSDataInputStream is = f.get();

must() 中设置的选项必须被所有文件系统理解,或至少被识别并忽略。在此示例中,S3A 特定的选项可能会被所有其他文件系统客户端忽略。

使用较旧版本打开文件

并非所有 Hadoop 版本都识别 fs.option.openfile.read.policy 选项。

如果通过 opt() 构建器参数添加该选项,则可以安全地在应用程序代码中使用该选项,因为它将被视为可以丢弃的未知可选键。

Future<FSDataInputStream> f = openFile(path)
  .opt("fs.option.openfile.read.policy", "vector, random, adaptive")
  .build();

FSDataInputStream is = f.get();

注意 1 如果选项名称由对 org.apache.hadoop.fs.Options.OpenFileOptions 中常量的引用设置,则程序将不会链接到没有特定选项的 Hadoop 版本。因此,为了弹性链接到较旧版本,请使用该值的副本。

注意 2 由于选项验证是在文件系统连接器中执行的,因此旨在与多个 Hadoop 版本配合使用的第三方连接器可能不支持该选项。

将选项传递到 MapReduce

Hadoop MapReduce 将自动读取前缀为 mapreduce.job.input.file.option.mapreduce.job.input.file.must. 的 MR 作业选项,并在删除特定于 MapReduce 的前缀后,将这些值分别应用为 .opt()must()

这使得将选项传递给 MR 作业变得简单。例如,要声明作业应使用随机 IO 读取其数据

JobConf jobConf = (JobConf) job.getConfiguration()
jobConf.set(
    "mapreduce.job.input.file.option.fs.option.openfile.read.policy",
    "random");

MapReduce 输入格式传播选项

记录读取器向其打开的文件传递选项的示例。

  public void initialize(InputSplit genericSplit,
                     TaskAttemptContext context) throws IOException {
    FileSplit split = (FileSplit)genericSplit;
    Configuration job = context.getConfiguration();
    start = split.getStart();
    end = start + split.getLength();
    Path file = split.getPath();

    // open the file and seek to the start of the split
    FutureDataInputStreamBuilder builder =
      file.getFileSystem(job).openFile(file);
    // the start and end of the split may be used to build
    // an input strategy.
    builder.optLong("fs.option.openfile.split.start", start);
    builder.optLong("fs.option.openfile.split.end", end);
    FutureIO.propagateOptions(builder, job,
        "mapreduce.job.input.file.option",
        "mapreduce.job.input.file.must");

    fileIn = FutureIO.awaitFuture(builder.build());
    fileIn.seek(start)
    /* Rest of the operation on the opened stream */
  }

FileContext.openFile

org.apache.hadoop.fs.AvroFSInput;使用顺序输入打开文件。由于已经探测了文件长度,因此将长度向下传递

  public AvroFSInput(FileContext fc, Path p) throws IOException {
    FileStatus status = fc.getFileStatus(p);
    this.len = status.getLen();
    this.stream = awaitFuture(fc.openFile(p)
        .opt("fs.option.openfile.read.policy",
            "sequential")
        .optLong("fs.option.openfile.length",
            Long.toString(status.getLen()))
        .build());
    fc.open(p);
  }

在此示例中,长度作为字符串(通过 Long.toString())向下传递,而不是直接作为长整型传递。这是为了确保输入格式将链接到没有 opt(String, long)must(String, long) 构建器参数的 $Hadoop 版本。类似地,这些值作为可选值传递,因此即使应用程序无法识别,它仍会成功。

示例:读取整个文件

这来自 org.apache.hadoop.util.JsonSerialization

它的 load(FileSystem, Path, FileStatus) 方法 * 声明要从头到尾读取整个文件。 * 传递文件状态

public T load(FileSystem fs,
        Path path,
        status)
        throws IOException {

 try (FSDataInputStream dataInputStream =
          awaitFuture(fs.openFile(path)
              .opt("fs.option.openfile.read.policy", "whole-file")
              .withFileStatus(status)
              .build())) {
   return fromJsonStream(dataInputStream);
 } catch (JsonProcessingException e) {
   throw new PathIOException(path.toString(),
       "Failed to read JSON file " + e, e);
 }
}