org.apache.hadoop.fs.FSDataInputStream

FSDataInputStream 扩展 DataInputStream

FSDataInputStream 的核心行为由 java.io.DataInputStream 定义,其扩展向系统添加了关键假设。

  1. 源是本地或远程文件系统。
  2. 正在读取的流引用有限的字节数组。
  3. 数据长度在读取过程中不会改变。
  4. 数据内容在过程中不会改变。
  5. 源文件在读取过程中保持存在。
  6. 调用者可以使用 Seekable.seek() 偏移到字节数组内的偏移量,以后的读取从该偏移量开始。
  7. 向前和向后查找的成本很低。
  8. 流实现不需要线程安全。
  9. 但是,如果流实现了 PositionedReadable,“定位读取”必须是线程安全的。

通过 FileSystem.open(p) 打开文件,如果成功,则返回

result = FSDataInputStream(0, FS.Files[p])

该流可以建模为

FSDIS = (pos, data[], isOpen)

带有访问函数

pos(FSDIS)
data(FSDIS)
isOpen(FSDIS)

隐式不变式:数据流的大小等于 FileSystem.getFileStatus(Path p) 返回的文件大小

forall p in dom(FS.Files[p]) :
len(data(FSDIS)) == FS.getFileStatus(p).length

Closeable.close()

java.io.Closeable 的语义在 JRE 中的接口定义中定义。

操作必须是幂等的;以下序列不是错误

FSDIS.close();
FSDIS.close();

实施说明

  • 实现应能承受故障。如果内部流关闭,则应首先检查它是否为 null

  • 实现不应在此操作期间引发 IOException 异常(或任何其他异常)。客户端应用程序通常会忽略这些异常,或可能意外失败。

后置条件

FSDIS' = ((undefined), (undefined), False)

Seekable.getPos()

返回当前位置。流关闭时的结果未定义。

前置条件

isOpen(FSDIS)

后置条件

result = pos(FSDIS)

InputStream.read()

返回当前位置的数据。

  1. 当流关闭时,实现应失败。
  2. read() 完成所需的时间没有限制。

前置条件

isOpen(FSDIS)

后置条件

if ( pos < len(data) ):
   FSDIS' = (pos + 1, data, True)
   result = data[pos]
else
    result = -1

InputStream.read(buffer[], offset, length)

从偏移量 offset 开始,将 length 字节的数据读入目标缓冲区。数据的来源是流的当前位置,如 pos 中隐式设置的那样。

前置条件

isOpen(FSDIS)
buffer != null else raise NullPointerException
length >= 0
offset < len(buffer)
length <= len(buffer) - offset
pos >= 0 else raise EOFException, IOException

前置条件失败时可能引发的异常是

InvalidArgumentException
ArrayIndexOutOfBoundsException
RuntimeException

并非所有文件系统都检查 isOpen 状态。

后置条件

if length == 0 :
  result = 0

else if pos > len(data):
  result = -1

else
  let l = min(length, len(data)-length) :
    buffer' = buffer where forall i in [0..l-1]:
       buffer'[o+i] = data[pos+i]
    FSDIS' = (pos+l, data, true)
    result = l

java.io API 规定,如果要读取的数据量(即 length),则调用必须阻塞,直到可用的数据量大于零——即,直到有数据为止。该调用不必在缓冲区已满时返回,或者确实阻塞,直到流中没有数据为止。

也就是说,l 并不是简单地定义为 min(length, len(data)-length),它严格地是范围 1..min(length, len(data)-length) 中的一个整数。虽然调用者可能希望尽可能多地填充缓冲区,但实现始终返回较小的数字(可能仅为 1 字节)是符合规范的。

关键的是,除非目标缓冲区大小为 0,否则调用必须阻塞,直到至少返回一个字节。因此,对于长度大于零的任何数据源,此 read() 操作的重复调用最终将读取所有数据。

Seekable.seek(s)

前置条件

并非所有子类都实现 Seek 操作

supported(FSDIS, Seekable.seek) else raise [UnsupportedOperationException, IOException]

如果支持该操作,则应打开文件

isOpen(FSDIS)

某些文件系统不会执行此检查,而是依赖于 read() 合约来拒绝对已关闭流的读取(例如 RawLocalFileSystem)。

seek(0) 必须始终成功,因为寻址位置必须为正且小于流的长度

s > 0 and ((s==0) or ((s < len(data)))) else raise [EOFException, IOException]

如果未满足此条件,某些文件系统不会引发异常。相反,当 len(data(FSDIS)) < pos(FSDIS) 时,它们会在任何 read() 操作中返回 -1。

在寻址失败后,pos(FSDIS) 的值可能会更改。例如,寻址到 EOF 之后可能会将读取位置移动到文件末尾,同时引发 EOFException

后置条件

FSDIS' = (s, data, True)

有一个隐式不变条件:寻址到当前位置是无操作的

seek(getPos())

实现可能会识别此操作并绕过所有其他前提条件检查,保持输入流不变。

最近连接到对象存储的所有连接器都实现某种形式的“延迟寻址”:seek() 调用可能看起来会更新流,并且 getPos() 的值会更新,但直到实际读取数据之前,文件不会被打开/重新打开。延迟寻址的实现仍然必须根据文件的已知长度验证新的寻址位置。但是,此时无需刷新文件的状态(即文件是否存在,其当前长度是多少)。文件已被删除或截断这一事实可能要等到 read() 调用时才会浮出水面。

Seekable.seekToNewSource(offset)

此操作指示源从当前源的不同源检索 data[]。仅当文件系统支持文件的多个副本并且在偏移量 offset 处有多个数据副本时,此操作才相关。

前置条件

并非所有子类都实现此操作,而是引发异常或返回 False

supported(FSDIS, Seekable.seekToNewSource) else raise [UnsupportedOperationException, IOException]

示例:CompressionInputStreamHttpFSFileSystem

如果支持,则文件必须处于打开状态

isOpen(FSDIS)

后置条件

大多数未实现此操作的子类都会失败。

if not supported(FSDIS, Seekable.seekToNewSource(s)):
    result = False

示例:RawLocalFileSystemHttpFSFileSystem

如果支持该操作并且有数据的新位置

    FSDIS' = (pos, data', true)
    result = True

新数据是原始数据(或其更新版本,如下面一致性部分所述),但包含 offset 处数据的块来自不同的副本。

如果没有其他副本,则不会更新 FSDIS;响应会指示这一点

    result = False

在测试方法之外,此方法的主要用途是在 {{FSInputChecker}} 类中,该类可以通过尝试在其他位置获取数据来对读取中的校验和错误做出反应。如果可以找到新的源,它将尝试重新读取并重新检查文件的该部分。

CanUnbuffer.unbuffer()

此操作指示源释放当前持有的任何系统资源,例如缓冲区、套接字、文件描述符等。任何后续 IO 操作可能都必须重新获取这些资源。在需要保持流打开但预计在不久的将来不会从流中进行任何 IO 操作的情况下,取消缓冲很有用(示例包括文件句柄缓存)。

前置条件

并非所有子类都实现此操作。除了实现 CanUnbuffer 之外,子类还必须实现 StreamCapabilities 接口,并且 StreamCapabilities.hasCapability(UNBUFFER) 必须返回 true。如果子类实现了 CanUnbuffer 但未通过 StreamCapabilities 报告该功能,则对 unbuffer 的调用将不起作用。如果子类报告它确实实现了 UNBUFFER,但未实现 CanUnbuffer 接口,则会抛出 UnsupportedOperationException

supported(FSDIS, StreamCapabilities.hasCapability && FSDIS.hasCapability(UNBUFFER) && CanUnbuffer.unbuffer)

此方法不是线程安全的。如果在 read 进行时调用 unbuffer,则结果是未定义的。

unbuffer 可以针对已关闭的文件调用,在这种情况下,unbuffer 将不执行任何操作。

后置条件

大多数未实现此操作的子类只是不执行任何操作。

如果支持该操作,unbuffer 将释放与流关联的任何和所有系统资源。这些资源的确切列表通常取决于实现,但是,通常情况下,它可能包括缓冲区、套接字、文件描述符等。

接口 PositionedReadable

PositionedReadable 操作提供“定位读取”(“pread”)。它们提供了从数据流中的特定位置将数据读入缓冲区的能力。定位读取相当于在特定偏移量处执行 Seekable.seek,然后执行 InputStream.read(buffer[], offset, length),只是有一个方法调用,而不是 seek 然后 read,并且两个定位读取可以可选地通过 FSDataInputStream 流的单个实例并发运行。

该接口声明定位读取是线程安全的(某些实现不遵循此保证)。

任何与流操作(例如 Seekable.seekSeekable.getPos()InputStream.read())并发的定位读取都必须独立运行;不得相互干扰。

并发定位读取和流操作必须是可序列化的;一个可能会阻塞另一个,因此它们会按顺序运行,但为了获得更好的吞吐量和“实时性”,它们应该并发运行。

给定两个并行的定位读取,一个在 pos1 处,长度为 len1,进入缓冲区 dest1,另一个在 pos2 处,长度为 len2,进入缓冲区 dest2,并且给定在寻找到 pos3 后运行的并发流读取,即使读取在底层流上重叠,结果缓冲区也必须按如下方式填充

// Positioned read #1
read(pos1, dest1, ... len1) -> dest1[0..len1 - 1] =
  [data(FS, path, pos1), data(FS, path, pos1 + 1) ... data(FS, path, pos1 + len1 - 1]

// Positioned read #2
read(pos2, dest2, ... len2) -> dest2[0..len2 - 1] =
  [data(FS, path, pos2), data(FS, path, pos2 + 1) ... data(FS, path, pos2 + len2 - 1]

// Stream read
seek(pos3);
read(dest3, ... len3) -> dest3[0..len3 - 1] =
  [data(FS, path, pos3), data(FS, path, pos3 + 1) ... data(FS, path, pos3 + len3 - 1]

请注意,实现不需要是原子的;操作的中间状态(getPos() 值的变化)可能可见。

实现前提条件

并非所有 FSDataInputStream 实现都支持这些操作。那些不实现 Seekable.seek() 的实现不实现 PositionedReadable 接口。

supported(FSDIS, Seekable.seek) else raise [UnsupportedOperationException, IOException]

这可以认为是显而易见的:如果流不是 Seekable,客户端无法查找位置。这也是使用 Seekable.seek() 的基类实现的副作用。

隐式不变式:对于所有 PositionedReadable 操作,pos 的值在操作结束时保持不变

pos(FSDIS') == pos(FSDIS)

失败状态

对于任何失败的操作,目标 buffer 的内容都是未定义的。在报告失败之前,实现可能会覆盖部分或全部缓冲区。

int PositionedReadable.read(position, buffer, offset, length)

尽可能多地将数据读入为其分配的缓冲区空间。

前置条件

position >= 0 else raise [EOFException, IOException, IllegalArgumentException, RuntimeException]
len(buffer) - offset >= length else raise [IndexOutOfBoundException, RuntimeException]
length >= 0
offset >= 0

后置条件

读取的数据量是长度或从指定位置可用的数据量中较小的一个

let available = min(length, len(data)-position)
buffer'[offset..(offset+available-1)] = data[position..position+available -1]
result = available
  1. -1 的返回值表示流不再有可用数据。
  2. 使用 length==0 的调用隐式不会读取任何数据;实现可能会缩短操作并省略任何 IO。在这种情况下,可能会省略对流是否位于文件末尾的检查。
  3. 如果在读取操作期间发生 IO 异常,buffer 的最终状态将是未定义的。

void PositionedReadable.readFully(position, buffer, offset, length)

将恰好 length 字节的数据读入缓冲区,如果可用数据不足,则失败。

前置条件

position >= 0 else raise [EOFException, IOException, IllegalArgumentException, RuntimeException]
length >= 0
offset >= 0
len(buffer) - offset >= length else raise [IndexOutOfBoundException, RuntimeException]
(position + length) <= len(data) else raise [EOFException, IOException]

如果在读取操作期间发生 IO 异常,buffer 的最终状态将是未定义的。

如果输入流中没有足够的数据来满足请求,buffer 的最终状态将是未定义的。

后置条件

从偏移量 offset 开始的缓冲区将填充从 position 开始的数据

buffer'[offset..(offset+length-1)] = data[position..(position + length -1)]

PositionedReadable.readFully(position, buffer)

其语义与以下完全等效

readFully(position, buffer, 0, len(buffer))

也就是说,缓冲区完全填充自位置 position 开始的输入源的内容

default void readVectored(List<? extends FileRange> ranges, IntFunction<ByteBuffer> allocate)

异步读取一系列范围的完整数据。默认实现遍历这些范围,尝试基于 minSeekForVectorReadsmaxReadSizeForVectorReads 的值合并这些范围,然后同步读取每个合并的范围,但目的是子类可以实现高效的实现。支持在直接和堆字节缓冲区中读取。此外,建议客户端使用 WeakReferencedElasticByteBufferPool 分配缓冲区,以便即使直接缓冲区在不再被引用时也会被垃圾回收。

readVectored() 之后,getPos() 返回的位置是未定义的。

如果在 readVectored() 操作进行期间更改了文件,则输出是未定义的。某些范围可能包含旧数据,某些可能包含新数据,而某些可能同时包含两者。

readVectored() 操作进行期间,正常的读取 API 调用可能会被阻塞。

注意:不要使用直接缓冲区从 ChecksumFileSystem 中读取,因为这可能导致 HADOOP-18296 中解释的内存碎片。

前置条件

对于每个请求的范围

range.getOffset >= 0 else raise IllegalArgumentException
range.getLength >= 0 else raise EOFException

后置条件

对于每个请求的范围

range.getData() returns CompletableFuture<ByteBuffer> which will have data
from range.getOffset to range.getLength.

minSeekForVectorReads()

最小的合理寻址。如果第一个范围的末尾和下一个范围的开始之间的差值大于此值,则不会将两个范围合并在一起。

maxReadSizeForVectorReads()

合并范围后一次可以读取的最大字节数。如果要读取的合并数据大于此值,则不会合并两个范围。从本质上讲,将此值设置为 0 将禁用范围合并。

一致性

  • FileSystem.open(p) 提供的数据流 FSDIS 的所有本地和远程读取器都应在打开时获得对 FS.Files[p] 数据的访问权限。
  • 如果在读取过程中更改了基础数据,则这些更改可能或可能不可见。
  • 可见的此类更改可能是部分可见的。

在时间 t0

FSDIS0 = FS'read(p) = (0, data0[])

在时间 t1

FS' = FS' where FS'.Files[p] = data1

从时间 t >= t1 开始,FSDIS0 的值是未定义的。

它可能保持不变

FSDIS0.data == data0

forall l in len(FSDIS0.data):
  FSDIS0.read() == data0[l]

它可能会获取新数据

FSDIS0.data == data1

forall l in len(FSDIS0.data):
  FSDIS0.read() == data1[l]

它可能不一致,以至于读取偏移量会返回来自任一数据集的数据

forall l in len(FSDIS0.data):
  (FSDIS0.read(l) == data0[l]) or (FSDIS0.read(l) == data1[l]))

也就是说,读取的每个值可能来自原始文件或更新后的文件。

在时间 t2 > t1 重复读取同一偏移量时,它也可能不一致

r2 = FSDIS0.read(l)

而在时间 t3 > t2

r3 = FSDIS0.read(l)

可能是 r3 != r2。(也就是说,某些数据可能被缓存或复制,而在后续读取中,返回了文件内容的不同版本)。

类似地,如果路径 p 中的数据被删除,此更改在 FSDIS0 上执行的读取操作期间可能可见,也可能不可见。