输出:OutputStreamSyncableStreamCapabilities

简介

本文档涵盖了 Hadoop 文件系统规范 中的输出流。

它使用 Hadoop 文件系统模型 中定义的文件系统模型,并使用 符号 中定义的符号。

目标受众为:1. API 用户。虽然 java.io.OutputStream 是一个标准接口,但本文档阐明了它在 HDFS 和其他地方的实现方式。Hadoop 特定的接口 SyncableStreamCapabilities 是新的;Syncable 特别之处在于它提供的持久性和可见性保证超过了 OutputStream。1. 文件系统和客户端的实现者。

如何将数据写入文件系统

通过 Hadoop FileSystem API 将数据写入文件的主要机制是通过调用 FileSystem.create()FileSystem.append()FSDataOutputStreamBuilder.build() 获得的 OutputStream 子类。

这些都返回 FSDataOutputStream 实例,可以通过各种 write() 方法通过该实例写入数据。在调用流的 close() 方法后,必须将写入流的所有数据持久保存到文件系统,并且通过 FileSystem.open() 尝试从该路径读取数据的其他所有客户端都可见。

除了写入数据的操作外,Hadoop 的 OutputStream 实现还提供将缓冲数据刷新回文件系统的方法,以确保数据可靠地持久保存和/或对其他调用者可见。这是通过 Syncable 接口完成的。最初的目的是,此接口的存在可以解释为对流支持其方法的保证。但是,事实证明无法保证这一点,因为接口的静态特性与可同步语义可能因存储/路径而异的文件系统不兼容。例如,HDFS 中的纠删码文件不支持同步操作,即使它们是作为 Syncable 输出流的子类实现的。

一个新接口:StreamCapabilities。这允许调用者探测流的确切功能,甚至可以通过流链进行传递。

输出流模型

对于此规范,可以将输出流视为存储在客户端中的字节列表;hsync()hflush() 是将数据传播到文件其他读取器可见和/或使其持久化的操作。

buffer: List[byte]

一个标志 open 跟踪流是否打开:在流关闭后,不能再向其中写入更多数据

open: bool
buffer: List[byte]

可以跟踪流的目标路径 path 以形成三元组 path, open, buffer

Stream = (path: Path, open: Boolean, buffer: byte[])

已刷新数据的可见性

在将数据刷新到文件系统的 Syncable 操作(立即)之后,流的目标路径处的数据必须与 buffer 相匹配。也就是说,必须满足以下条件

FS'.Files(path) == buffer

读取路径处数据的任何客户端都必须看到新数据。Syncable 操作在持久性保证方面有所不同,而不是数据的可见性。

Filesystem.create() 之后的流和文件系统状态

文件系统 FSFileSystem.create(path)FileSystem.createFile(path).build() 返回的输出流可以建模为包含一个空数据数组的三元组

Stream' = (path, true, [])

文件系统 FS' 必须在路径处包含一个 0 字节文件

FS' = FS where data(FS', path) == []

因此,Stream'.buffer 的初始状态隐式地与文件系统中的数据一致。

对象存储:请参阅下文“对象存储”部分中的注意事项。

Filesystem.append() 之后的流和文件系统状态

文件系统 FSFileSystem.append(path, buffersize, progress) 调用返回的输出流可以建模为一个流,其 buffer 初始化为原始文件的 buffer

Stream' = (path, true, data(FS, path))

持久化数据

当流将数据写回其存储时,无论是在任何受支持的刷新操作中、在 close() 操作中,还是在流选择执行此操作的任何其他时间,文件的内容都会被当前缓冲区替换

Stream' = (path, true, buffer)
FS' = FS where data(FS', path) == buffer

调用 close() 之后,流将关闭所有操作,除了 close();它们可能会因 IOExceptionRuntimeException 而失败。

Stream' = (path, false, [])

close() 操作必须是幂等的,仅尝试在第一次调用中写入数据。

  1. 如果 close() 成功,后续调用将为 no-ops。
  2. 如果 close() 再次失败,后续调用将为 no-ops。它们可能会重新抛出以前的异常,但它们不能重试写入。

FSDataOutputStream

public class FSDataOutputStream
  extends DataOutputStream
  implements Syncable, CanSetDropBehind, StreamCapabilities {
 // ...
}

FileSystem.create()FileSystem.append()FSDataOutputStreamBuilder.build() 调用返回 FSDataOutputStream 类的一个实例,它是 java.io.OutputStream 的子类。

基类包装一个 OutputStream 实例,该实例可以实现 SyncableCanSetDropBehindStreamCapabilities

本文档涵盖此类实现的要求。

HDFS 的 FileSystem 实现 DistributedFileSystem 返回 HdfsDataOutputStream 的一个实例。此实现至少有两种行为,这是基本 Java 实现未明确声明的

  1. 写入是同步的:多个线程可以写入同一个输出流。这是 HBase 依赖的一种使用模式。

  2. OutputStream.flush() 在文件关闭时是无操作的。Apache Druid 过去曾就此进行过调用 HADOOP-14346

由于 HDFS 实现被认为是 FileSystem API 的事实规范,因此 write() 是线程安全的这一事实非常重要。

为了兼容性,其他 FS 客户端不仅应该线程安全,而且新的 HDFS 功能(例如加密和纠删码)也应该实现与核心 HDFS 输出流一致的行为。

换句话说

输出流实现 java.io.OutputStream 的核心语义还不够:它们需要实现 HdfsDataOutputStream 的额外语义,特别是为了让 HBase 正常工作。

并发 write() 调用是对 Java 规范最重大的强化。

java.io.OutputStream

Java OutputStream 允许应用程序将一系列字节写入目标。在 Hadoop 文件系统中,该目标是文件系统中路径下的数据。

public abstract class OutputStream implements Closeable, Flushable {
  public abstract void write(int b) throws IOException;
  public void write(byte b[]) throws IOException;
  public void write(byte b[], int off, int len) throws IOException;
  public void flush() throws IOException;
  public void close() throws IOException;
}

write(Stream, data)

向流中写入一个字节的数据。

先决条件

Stream.open else raise ClosedChannelException, PathIOException, IOException

当尝试写入已关闭的文件时,HDFS 输出流中会引发异常 java.nio.channels.ClosedChannelExceptionn。此异常不包括目标路径;并且 Exception.getMessage()null。因此,它在堆栈跟踪中的价值有限。实现者可能希望引发更详细的异常,例如 PathIOException

后置条件

缓冲区追加了数据参数的低 8 位。

Stream'.buffer = Stream.buffer + [data & 0xff]

缓存数据的大小可能有一个明确的限制,或者基于目标文件系统的可用容量有一个隐式限制。达到限制时,write() 应该使用 IOException 失败。

write(Stream, byte[] data, int offset, int len)

先决条件

所有前提条件都在 OutputStream.write() 中定义

Stream.open else raise ClosedChannelException, PathIOException, IOException
data != null else raise NullPointerException
offset >= 0 else raise IndexOutOfBoundsException
len >= 0 else raise IndexOutOfBoundsException
offset < data.length else raise IndexOutOfBoundsException
offset + len < data.length else raise IndexOutOfBoundsException

操作返回后,可以重新使用缓冲区。在 write() 操作进行期间,对缓冲区的更新结果是未定义的。

后置条件

Stream'.buffer = Stream.buffer + data[offset...(offset + len)]

write(byte[] data)

这被定义为等同于

write(data, 0, data.length)

flush()

请求刷新数据。ObjectStream.flush() 的规范声明,这应将数据写入“预期目标”。

它明确排除了对持久性的任何保证。

因此,本文档未提供任何规范的行为规范。

先决条件

无。

后置条件

无。

如果实现选择实现流刷新操作,则数据可能会保存到文件系统中,以便其他人可以看到

FS' = FS where data(FS', path) == buffer

当流关闭时,如果 flush() 尚未成为无操作,则应降级为无操作。这是为了与应用程序和库配合使用,这些应用程序和库可以完全以这种方式调用它。

问题flush() 是否应转发到 hflush()

否。或者至少,使其成为可选的。

许多应用程序代码假设 flush() 成本低,并且应在输出每行后、在写入 4KB 小块或类似内容后调用。

将此转发到分布式文件系统上的完全刷新,或者更糟的是,将此转发到远程对象存储,效率非常低。将 flush() 转换为 hflush() 的文件系统客户端最终将不得不回滚该功能:HADOOP-16548

close()

close() 操作将所有数据保存到文件系统,并释放用于写入数据的任何资源。

close() 调用应阻塞,直到写入完成(与 Syncable.hflush() 一样),可能直到写入到持久性存储中。

close() 完成后,文件中的数据必须可见,并且与最近写入的数据一致。文件元数据必须与数据和写入历史记录本身一致(即,更新任何修改时间字段)。

在调用 close() 后,对流的所有后续 write() 调用都必须因 IOException 而失败。

任何锁定/租赁持有机制都必须释放其锁定/租赁。

Stream'.open = false
FS' = FS where data(FS', path) == buffer

close() 调用可能会在其操作期间失败。

  1. API 的调用者必须预期某些对 close() 的调用会失败,并且应编写适当的代码。捕获并吞咽异常虽然很常见,但并不总是理想的解决方案。
  2. 即使在失败后,close() 也必须将流置于关闭状态。对 close() 的后续调用将被忽略,并且对其他方法的调用将被拒绝。也就是说:不能指望调用者重复调用 close(),直到它成功。
  3. close() 操作的持续时间是不确定的。依赖于来自远程系统确认以满足持久性保证的操作隐含地必须等待这些确认。某些对象存储输出流在 close() 操作中上传整个数据文件。这可能需要大量时间。许多用户应用程序都假设 close() 既快速又不会失败,这意味着此行为很危险。

调用方安全使用的建议

  • 计划引发异常,无论是捕获并记录还是将异常抛出到更上层。捕获并静默吞咽异常可能会隐藏严重的问题。
  • 心跳操作应在单独的线程上进行,这样 close() 中的长时间延迟就不会阻塞线程,直到心跳超时为止。

实现者

  • 查看 HADOOP-16785 以查看 close 中的复杂情况示例。
  • 在 close 操作之前逐步写入块会导致行为更好地匹配客户端期望:写入失败会提前显示,并且 close 更像是实际上传的维护工作。
  • 如果块上传在单独的线程中执行,则输出流 close() 调用必须阻塞,直到所有异步上传完成为止;必须报告任何引发的错误。如果引发了多个错误,则流可以选择传播哪个错误。重要的是:当 close() 返回时没有错误,应用程序希望数据已成功写入。

HDFS 和 OutputStream.close()

除非配置为 dfs.datanode.synconclose 为 true,否则 HDFS 不会在 OutputStream.close() 时立即将已写入文件的内容 sync() 到磁盘。这已导致 一些应用程序出现问题

绝对需要保证文件已持久化的应用程序必须在关闭文件之前调用 Syncable.hsync()

org.apache.hadoop.fs.Syncable

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Syncable {


  /** Flush out the data in client's user buffer. After the return of
   * this call, new readers will see the data.
   * @throws IOException if any error occurs
   */
  void hflush() throws IOException;

  /** Similar to posix fsync, flush out the data in client's user buffer
   * all the way to the disk device (but the disk may have it in its cache).
   * @throws IOException if error occurs
   */
  void hsync() throws IOException;
}

Syncable 接口的目的是为写入文件系统的数据提供可见性和持久性保证。

SYNC-1:实现 Syncable 且在调用时不会引发 UnsupportedOperationExceptionOutputStream 明确声明它可以满足这些保证。

SYNC-2:如果流声明接口已实现,但未提供持久性,则接口的方法必须引发 UnsupportedOperationException

Syncable 接口已由除 OutputStream 子类之外的其他类实现,例如 org.apache.hadoop.io.SequenceFile.Writer

SYNC-3 类实现 Syncable 的事实并不能保证 extends OutputStream 成立。

也就是说,对于任何类 C(C instanceof Syncable) 并不意味着 (C instanceof OutputStream)

此规范仅涵盖实现 SyncableOutputStream 子类的必需行为。

SYNC-4: FileSystem.create(Path) 的返回值是 FSDataOutputStream 的实例。

SYNC-5: FSDataOutputStream implements Syncable

SYNC-5 和 SYNC-1 意味着所有可以使用 FileSystem.create(Path) 创建的输出流都必须支持 Syncable 的语义。这显然不成立:如果其包装的流不是 Syncable,则 FSDataOutputStream 仅降级为 flush()。因此,声明 SYNC-1 和 SYNC-2 不成立:您不能信任 Syncable

换句话说:调用者不得依赖接口的存在作为支持 Syncable 语义的证据。相反,他们必须使用 StreamCapabilities 接口(如果可用)动态探测。

Syncable.hflush()

刷新客户端用户缓冲区中的数据。在此调用返回后,新读取器将看到数据。hflush() 操作不包含任何关于数据持久性的保证,只保证其可见性。

因此,实现可以在内存中缓存写入的数据——对所有人可见,但尚未持久化。

先决条件

hasCapability(Stream, "hflush")
Stream.open else raise IOException

后置条件

FS' = FS where data(path) == cache

在调用返回后,数据必须对 FileSystem.open(path)FileSystem.openFile(path).build() 的所有新调用者可见。

没有要求或保证通过对 (FS, path) 的调用创建的现有 DataInputStream 的客户端将看到更新的数据,也没有保证他们不会在当前或后续读取中看到更新的数据。

实现说明:由于正确的 hsync() 实现还必须提供 hflush() 调用的所有语义,因此 hflush() 的实现可能只调用 hsync()

public void hflush() throws IOException {
  hsync();
}

hflush() 性能

hflush() 调用必须阻塞,直到存储确认已收到数据并且现在对其他人可见。这可能会很慢,因为它将包括从客户端上传任何未完成数据以及文件系统本身处理数据的时间。

通常,文件系统仅提供 Syncable.hsync() 保证:持久性以及可见性。这意味着返回时间甚至更长。

应用程序代码不得在每行末尾或(除非它们正在写入 WAL)在每条记录末尾调用 hflush()hsync()。谨慎使用。

Syncable.hsync()

类似于 POSIX fsync(),此调用将客户端用户缓冲区中的数据一直保存到磁盘设备(但磁盘可能将其保留在缓存中)。

也就是说:底层文件系统需要将所有数据保存到磁盘硬件本身,并且预期数据在此处具有持久性。

先决条件

hasCapability(Stream, "hsync")
Stream.open else raise IOException

后置条件

FS' = FS where data(path) == buffer

需要实现阻止,直至存储确认该写入。

这样做是为了让调用方确信,一旦调用成功返回,数据便已写入。

接口 StreamCapabilities

@InterfaceAudience.Public
@InterfaceStability.Evolving

org.apache.hadoop.fs.StreamCapabilities 接口的存在是为了允许调用方动态确定流的行为。

  public boolean hasCapability(String capability) {
    switch (capability.toLowerCase(Locale.ENGLISH)) {
      case StreamCapabilities.HSYNC:
      case StreamCapabilities.HFLUSH:
        return supportFlush;
      default:
        return false;
    }
  }

一旦流关闭,hasCapability() 调用必须执行以下一项操作

  • 返回打开流的功能。
  • 返回 false。

也就是说:它不得引发有关文件已关闭的异常;

有关 PathCapabilities API 的具体信息,请参阅 pathcapabilities;要求类似:对于流缺乏支持的功能,流不得返回 true,原因可能是

  • 功能未知。
  • 功能已知且已知不受支持。

标准流功能在 StreamCapabilities 中定义;有关完整选项集,请参阅 javadoc。

名称 探测对以下内容的支持
dropbehind CanSetDropBehind.setDropBehind()
hsync Syncable.hsync()
hflush Syncable.hflush()。已弃用:仅探测对 HSYNC 的支持。
in:readahead CanSetReadahead.setReadahead()
in:unbuffer" CanUnbuffer.unbuffer()
in:readbytebuffer ByteBufferReadable#read(ByteBuffer)
in:preadbytebuffer ByteBufferPositionedReadable#read(long, ByteBuffer)

流实现 MAY 添加自己的自定义选项。这些选项 MUST 以 fs.SCHEMA. 为前缀,其中 SCHEMA 是文件系统的模式。

接口 CanSetDropBehind

@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface CanSetDropBehind {
  /**
   * Configure whether the stream should drop the cache.
   *
   * @param dropCache     Whether to drop the cache.  null means to use the
   *                      default value.
   * @throws IOException  If there was an error changing the dropBehind
   *                      setting.
   *         UnsupportedOperationException  If this stream doesn't support
   *                                        setting the drop-behind.
   */
  void setDropBehind(Boolean dropCache)
      throws IOException, UnsupportedOperationException;
}

此接口允许调用方更改 HDFS 中使用的策略。

实现 MUST 为调用返回 true

StreamCapabilities.hasCapability("dropbehind");

流输出的持久性、并发性、一致性和可见性。

这些是系统行为的各个方面,在该(非常简单的)文件系统模型中没有直接涉及,但在生产中可见。

持久性

  1. OutputStream.write() MAY 同步或异步地持久化数据
  2. OutputStream.flush() 将数据刷新到目标。没有严格的持久性要求。
  3. Syncable.hflush() 将所有未完成的数据同步发送到目标文件系统。返回到调用方后,数据 MUST 对其他读取器可见,MAY 持久化。也就是说:它不必持久化,仅仅保证对尝试在该路径读取数据的新流的所有客户端始终可见。
  4. Syncable.hsync() MUST 根据 hflush 传输数据,并将该数据持久化到底层持久化存储。
  5. close()close() 的第一个调用 MUST 刷新缓冲区中所有剩余数据,并将其持久化,作为对 hsync() 的调用。

许多应用程序过于频繁地调用 flush() - 例如在写入每行末尾时。如果这触发了持久化存储中数据及任何伴随元数据的更新,分布式存储将快速过载。因此:flush() 通常最多被视为将数据刷新到网络缓冲区的提示 - 但不提交写入任何数据。

只有 Syncable 接口提供保证。

两个 Syncable 操作 hsync()hflush() 仅因 hsync() 的额外保证而有所不同:数据必须持久化。如果实现了 hsync(),则可以通过调用 hsync() 来简单地实现 hflush()

public void hflush() throws IOException {
  hsync();
}

这作为一种实现是完全可以接受的:hflush() 的语义得到满足。不可接受的是将 hsync() 降级为 hflush(),因为不再满足持久性保证。

并发性

  1. 多个进程写入同一文件的结果是未定义的。

  2. 在文件打开进行写入之前打开的用于读取文件的输入流可能会获取由写入到 OutputStream 更新的数据。由于缓冲和缓存,这不是一个要求——如果输入流确实获取了更新的数据,则读取更新数据的点是不确定的。这出现在对象存储中,其中关闭并重新打开连接的 seek() 调用可能会获取更新的数据,而前向流读取不会。类似地,在面向块的文件系统中,数据可能一次缓存一个块——并且仅在读取不同的块时才获取更改。

  3. 文件系统可能会允许在流向其写入时操作目标路径——例如,路径或父级的 rename();路径或父级的 delete()。在这种情况下,输出流上未来写入操作的结果是不确定的。一些文件系统可能会实现锁定以防止冲突。然而,由于文献中众所周知的原因,这在分布式文件系统中往往很少见。

  4. java.io.OutputStream 的 Java API 规范不要求该类的实例是线程安全的。然而,org.apache.hadoop.hdfs.DFSOutputStream 具有更强的线程安全模型(可能无意中)。在 Apache HBase 中依赖这一事实,如在 HADOOP-11708 中发现的那样。实现应该线程安全。注意:即使是 DFSOutputStream 同步模型也允许在 hsync() 操作中等待来自数据节点或名称节点写入的确认时调用输出流的 close()

一致性和可见性

没有要求数据立即对其他应用程序可见——直到发出冲洗缓冲区或将其持久保存到底层存储介质的特定调用为止。

如果使用 FileSystem.create(path, overwrite==true) 创建输出流,并且路径处存在现有文件,即 exists(FS, path) 成立,那么现有数据立即不可用;路径末尾的数据必须包含一个空字节序列 [],并具有始终如一元数据。

exists(FS, path)
(Stream', FS') = create(FS, path)
exists(FS', path)
getFileStatus(FS', path).getLen() = 0

文件元数据(特别是 length(FS, path))在 flush()sync() 之后应该与文件内容一致。

(Stream', FS') = create(FS, path)
(Stream'', FS'') = write(Stream', data)
(Stream''', FS''') hsync(Stream'')
exists(FS''', path)
getFileStatus(FS''', path).getLen() = len(data)

HDFS 仅在写入跨越块边界时执行此操作;否则会使名称节点过载。其他存储可能会复制此行为。

因此,在写入文件时,length(Filesystem, Path) 可能小于 data(Filesystem, Path) 的长度。

close() 操作之后,元数据必须与文件内容一致。

在输出流的内容已持久化(hflush()/hsync())后,所有新的 open(FS, Path) 操作都必须返回更新后的数据。

在对输出流调用 close() 后,对 getFileStatus(path) 的调用必须返回已写入文件的最终元数据,包括长度和修改时间。在任何 FileSystem list 操作中返回的文件的元数据都必须与此元数据一致。

在向流写入时,getFileStatus(path).getModificationTime() 的值未定义。在写入文件时,尤其是 Syncable.hsync() 调用后,时间戳可能会更新。在文件关闭后,时间戳必须更新为服务器在 close() 调用期间观察到的时钟值。它很可能在文件系统的时间和时区内,而不是客户端的时间和时区内。

正式地说,如果 close() 操作触发与服务器的交互,该交互在服务器端时间 t1 开始,在时间 t2 完成,并成功写入文件,则最后修改时间应为时间 t,其中 t1 <= t <= t2

Hadoop 输出流模型的问题。

Hadoop 提供的输出流模型存在一些已知问题,特别是关于何时写入和持久化数据以及何时同步元数据时的保证。这些问题在于 HDFS 和“本地”文件系统的实现方面不遵循本规范中使用的文件系统的简单模型。

HDFS

HDFS:hsync() 仅同步最新块

参考实现 DFSOutputStream 将阻塞,直到从数据节点收到确认:即副本写入链中的所有主机都已成功写入文件。

这意味着调用者可能期望方法调用的返回包含其他实现必须维护的可见性和持久性保证。

但是,请注意,参考 DFSOutputStream.hsync() 调用实际上只持久化当前块。如果自上次同步以来进行了一系列写入,则可能会跨越块边界。hsync() 调用仅声明写入最新内容。

来自 DFSOutputStream.hsync(EnumSet<SyncFlag> syncFlags) 的 javadocs

请注意,只有当前块会刷新到磁盘设备。要保证跨块边界进行持久同步,应使用 {@link CreateFlag#SYNC_BLOCK} 创建流。

这是一个重要的 HDFS 实现细节,任何依赖 HDFS 提供预写式日志或其他数据库结构的人员都不得忽略,其中应用程序的要求是“在 WAL 中刷新提交标志之前,所有前置字节都必须已持久化”

有关此主题的讨论,请参阅 [Stonebraker81],Michael Stonebraker,数据库管理的操作系统支持,1981 年。

如果您确实需要 hsync() 同步非常大的写入中的每个块,请定期调用它。

HDFS:元数据更新的延迟可见性。

HDFS 文件元数据通常滞后于正在写入的文件的内容,这一点并非每个人都期望,也不便于任何试图获取正在写入的文件中更新数据的程序。最明显的是在各种 list 命令和 getFileStatus 中返回的文件长度,这通常是过时的。

由于 HDFS 仅在其输出操作中支持文件增长,这意味着元数据中列出的文件大小可能小于或等于可用字节数,但绝不会更大。这也是一项保证

确定 HDFS 中的文件是否已更新的一种算法是

  1. 记住文件中上次读取的位置 pos,如果这是初始读取,则使用 0
  2. 使用 getFileStatus(FS, Path) 查询元数据中记录的文件的更新长度。
  3. 如果 Status.length &gt; pos,则文件已增长。
  4. 如果数字未更改,则
    1. 重新打开文件。
    2. seek(pos) 到该位置
    3. 如果 read() != -1,则有新数据。

此算法适用于与元数据和数据一致的文件系统以及 HDFS。重要的是要知道,对于打开的文件 getFileStatus(FS, path).getLen() == 0 并不意味着 data(FS, path) 为空。

当 HDFS 中的输出流关闭时;除非 HDFS 部署了将 dfs.datanode.synconclose 设置为 true,否则新写入的数据不会立即写入磁盘。否则,它将被缓存并稍后写入磁盘。

本地文件系统,file:

LocalFileSystemfile:(或基于 ChecksumFileSystem 的任何其他 FileSystem 实现)存在不同的问题。如果从 create() 获取输出流,并且尚未在文件系统上调用 FileSystem.setWriteChecksum(false),则该流仅刷新可写入到数据完整校验和块的本地数据。

也就是说,hsync/hflush 操作不保证在文件最终关闭之前写入所有待定数据。

出于此原因,通过 file:// URL 访问的本地文件系统不支持 Syncable,除非在该 FileSystem 实例上调用 setWriteChecksum(false) 以禁用校验和创建。之后,显然不会为任何文件生成校验和。

校验和输出流

由于 org.apache.hadoop.fs.FSOutputSummerorg.apache.hadoop.fs.ChecksumFileSystem.ChecksumFSOutputSummer 实现 HDFS 和其他文件系统使用的基础校验和输出流,因此它提供了一些输出流行为的核心语义。

  1. close() 调用未同步,可重入,并且可能尝试多次关闭流。
  2. 可以在已关闭的流上调用 write(int)(但不能调用 write(byte[], int, int))。
  3. 可以在已关闭的流上调用 flush()

行为 1 和 2 确实必须被视为需要修复的错误,尽管要小心。

行为 3 必须被视为事实上的标准,以便其他实现进行复制。

对象存储

对象存储流可能会缓冲整个流的输出,直到最终的 close() 操作触发数据的单个 PUT 和最终输出的具体化。

与 POSIX 文件系统和本文档中指定的行为相比,这极大地改变了它们的行为。

新创建对象的可见性

无法保证在创建输出流后,任何文件在输出流的路径中都是可见的。

也就是说:虽然 create(FS, path, boolean) 返回一个新流

Stream' = (path, true, [])

该操作的其他后置条件 data(FS', path) == [] 可能不成立,在这种情况下

  1. exists(FS, p) 可能返回 false。
  2. 如果使用 overwrite = True 创建文件,则现有数据可能仍然可见:data(FS', path) = data(FS, path)
  3. create() 调用中,对现有数据的检查(overwrite=False)可能发生在 create() 调用本身中,发生在写入之前/期间的 close() 调用中,或介于两者之间的某个时间点。在对象存储支持原子 PUT 操作的特殊情况下,对现有数据存在的检查和随后在该路径处创建数据包含一个竞争条件:其他客户端可能在存在性检查和后续写入之间在该路径处创建数据。

  4. create(FS, Path, overwrite=false) 的调用可能会成功,返回一个新的 OutputStream,即使另一个流已打开并写入目标路径。

这允许以下操作序列,如果针对 HDFS 调用,则在第二个 open() 调用中会引发异常

Stream1 = open(FS, path, false)
sleep(200)
Stream2 = open(FS, path, false)
Stream.write('a')
Stream1.close()
Stream2.close()

对于想知道为什么客户端不在 create() 调用中创建 0 字节文件的人来说,这会在 close() 之后造成问题——标记文件可能会在 open() 调用中返回,而不是最终数据。

close() 之后流输出的可见性

对象存储应该做出的一个保证与 POSIX 文件系统相同:在流 close() 调用返回后,数据必须持久保存并对所有调用者可见。不幸的是,即使是这个保证也不总是能得到满足

  1. 路径上的现有数据可能在不确定的时间段内可见。

  2. 如果存储具有任何形式的创建不一致性或对否定存在探测进行缓冲,那么即使在流的 close() 操作返回后,getFileStatus(FS, path)open(FS, path) 仍可能因 FileNotFoundException 而失败。

有利的是,存储的 PUT 操作的原子性确实提供了自己的保证:新创建的对象不存在或其所有数据都存在:实例化对象的行为虽然可能表现出创建不一致性,但它是原子的。应用程序可能能够利用这一事实为自己谋取利益。

Abortable 接口公开这种在数据可见之前中止输出流的能力,因此可用于检查点和类似操作。

实现者注释。

始终实现 Syncable - 即使只是为了抛出 UnsupportedOperationException

由于 FSDataOutputStream 会将 Syncable.hflush()Syncable.hsync() 静默降级为 wrappedStream.flush(),因此 API 的调用者可能会误以为在同步到不支持这些 API 的流之后,其数据已刷新/同步。

实现应实现 API,但抛出 UnsupportedOperationException

StreamCapabilities

文件系统客户端的实现者应实现 StreamCapabilities 接口及其 hasCapabilities() 方法,以声明输出流是否提供 Syncable 的可见性和持久性保证。

StreamCapabilities.hasCapabilities() 的实现者不得声明它们在并非如此的流上支持 hflushhsync 功能。

有时流会将数据传递给存储,但远端可能不会将其全部同步到磁盘。这不是客户端可以确定的。在此:如果客户端代码正在将 hflush/hsync 传递给分布式文件系统,则应声明它支持它们。

元数据更新

实现者可能不会在每次 hsync() 调用后更新文件元数据(长度、日期等)。HDFS 不会这样做,除非写入的数据跨越块边界。

close() 是否同步并持久化数据?

默认情况下,当流关闭时,HDFS 不会立即将数据保存到磁盘;它将异步保存到磁盘。

这并不意味着用户不希望这样做。

实现的行为类似于 NFS 的 缓存 的回写方面。DFSClient.close() 对客户端执行 hflush(),以将所有数据上传到数据节点。

  1. close() 应在满足 hflush() 的保证后返回:数据对其他人可见。
  2. 对于持久性保证,必须首先调用 hsync()