org.apache.hadoop.fs.FSDataInputStream
FSDataInputStream 扩展 DataInputStream
FSDataInputStream
的核心行为由 java.io.DataInputStream
定义,其扩展向系统添加了关键假设。
Seekable.seek()
偏移到字节数组内的偏移量,以后的读取从该偏移量开始。通过 FileSystem.open(p)
打开文件,如果成功,则返回
result = FSDataInputStream(0, FS.Files[p])
该流可以建模为
FSDIS = (pos, data[], isOpen)
带有访问函数
pos(FSDIS) data(FSDIS) isOpen(FSDIS)
隐式不变式:数据流的大小等于 FileSystem.getFileStatus(Path p)
返回的文件大小
forall p in dom(FS.Files[p]) : len(data(FSDIS)) == FS.getFileStatus(p).length
Closeable.close()
java.io.Closeable
的语义在 JRE 中的接口定义中定义。
操作必须是幂等的;以下序列不是错误
FSDIS.close(); FSDIS.close();
实现应能承受故障。如果内部流关闭,则应首先检查它是否为 null
。
实现不应在此操作期间引发 IOException
异常(或任何其他异常)。客户端应用程序通常会忽略这些异常,或可能意外失败。
FSDIS' = ((undefined), (undefined), False)
Seekable.getPos()
返回当前位置。流关闭时的结果未定义。
isOpen(FSDIS)
result = pos(FSDIS)
InputStream.read()
返回当前位置的数据。
read()
完成所需的时间没有限制。isOpen(FSDIS)
if ( pos < len(data) ): FSDIS' = (pos + 1, data, True) result = data[pos] else result = -1
InputStream.read(buffer[], offset, length)
从偏移量 offset
开始,将 length
字节的数据读入目标缓冲区。数据的来源是流的当前位置,如 pos
中隐式设置的那样。
isOpen(FSDIS) buffer != null else raise NullPointerException length >= 0 offset < len(buffer) length <= len(buffer) - offset pos >= 0 else raise EOFException, IOException
前置条件失败时可能引发的异常是
InvalidArgumentException ArrayIndexOutOfBoundsException RuntimeException
并非所有文件系统都检查 isOpen
状态。
if length == 0 : result = 0 else if pos > len(data): result = -1 else let l = min(length, len(data)-length) : buffer' = buffer where forall i in [0..l-1]: buffer'[o+i] = data[pos+i] FSDIS' = (pos+l, data, true) result = l
java.io
API 规定,如果要读取的数据量(即 length
),则调用必须阻塞,直到可用的数据量大于零——即,直到有数据为止。该调用不必在缓冲区已满时返回,或者确实阻塞,直到流中没有数据为止。
也就是说,l
并不是简单地定义为 min(length, len(data)-length)
,它严格地是范围 1..min(length, len(data)-length)
中的一个整数。虽然调用者可能希望尽可能多地填充缓冲区,但实现始终返回较小的数字(可能仅为 1 字节)是符合规范的。
关键的是,除非目标缓冲区大小为 0,否则调用必须阻塞,直到至少返回一个字节。因此,对于长度大于零的任何数据源,此 read()
操作的重复调用最终将读取所有数据。
Seekable.seek(s)
并非所有子类都实现 Seek 操作
supported(FSDIS, Seekable.seek) else raise [UnsupportedOperationException, IOException]
如果支持该操作,则应打开文件
isOpen(FSDIS)
某些文件系统不会执行此检查,而是依赖于 read()
合约来拒绝对已关闭流的读取(例如 RawLocalFileSystem
)。
seek(0)
必须始终成功,因为寻址位置必须为正且小于流的长度
s > 0 and ((s==0) or ((s < len(data)))) else raise [EOFException, IOException]
如果未满足此条件,某些文件系统不会引发异常。相反,当 len(data(FSDIS)) < pos(FSDIS)
时,它们会在任何 read()
操作中返回 -1。
在寻址失败后,pos(FSDIS)
的值可能会更改。例如,寻址到 EOF 之后可能会将读取位置移动到文件末尾,同时引发 EOFException
。
FSDIS' = (s, data, True)
有一个隐式不变条件:寻址到当前位置是无操作的
seek(getPos())
实现可能会识别此操作并绕过所有其他前提条件检查,保持输入流不变。
最近连接到对象存储的所有连接器都实现某种形式的“延迟寻址”:seek()
调用可能看起来会更新流,并且 getPos()
的值会更新,但直到实际读取数据之前,文件不会被打开/重新打开。延迟寻址的实现仍然必须根据文件的已知长度验证新的寻址位置。但是,此时无需刷新文件的状态(即文件是否存在,其当前长度是多少)。文件已被删除或截断这一事实可能要等到 read()
调用时才会浮出水面。
Seekable.seekToNewSource(offset)
此操作指示源从当前源的不同源检索 data[]
。仅当文件系统支持文件的多个副本并且在偏移量 offset
处有多个数据副本时,此操作才相关。
并非所有子类都实现此操作,而是引发异常或返回 False
。
supported(FSDIS, Seekable.seekToNewSource) else raise [UnsupportedOperationException, IOException]
示例:CompressionInputStream
、HttpFSFileSystem
如果支持,则文件必须处于打开状态
isOpen(FSDIS)
大多数未实现此操作的子类都会失败。
if not supported(FSDIS, Seekable.seekToNewSource(s)): result = False
示例:RawLocalFileSystem
、HttpFSFileSystem
如果支持该操作并且有数据的新位置
FSDIS' = (pos, data', true) result = True
新数据是原始数据(或其更新版本,如下面一致性部分所述),但包含 offset
处数据的块来自不同的副本。
如果没有其他副本,则不会更新 FSDIS
;响应会指示这一点
result = False
在测试方法之外,此方法的主要用途是在 {{FSInputChecker}} 类中,该类可以通过尝试在其他位置获取数据来对读取中的校验和错误做出反应。如果可以找到新的源,它将尝试重新读取并重新检查文件的该部分。
CanUnbuffer.unbuffer()
此操作指示源释放当前持有的任何系统资源,例如缓冲区、套接字、文件描述符等。任何后续 IO 操作可能都必须重新获取这些资源。在需要保持流打开但预计在不久的将来不会从流中进行任何 IO 操作的情况下,取消缓冲很有用(示例包括文件句柄缓存)。
并非所有子类都实现此操作。除了实现 CanUnbuffer
之外,子类还必须实现 StreamCapabilities
接口,并且 StreamCapabilities.hasCapability(UNBUFFER)
必须返回 true。如果子类实现了 CanUnbuffer
但未通过 StreamCapabilities
报告该功能,则对 unbuffer
的调用将不起作用。如果子类报告它确实实现了 UNBUFFER
,但未实现 CanUnbuffer
接口,则会抛出 UnsupportedOperationException
。
supported(FSDIS, StreamCapabilities.hasCapability && FSDIS.hasCapability(UNBUFFER) && CanUnbuffer.unbuffer)
此方法不是线程安全的。如果在 read
进行时调用 unbuffer
,则结果是未定义的。
unbuffer
可以针对已关闭的文件调用,在这种情况下,unbuffer
将不执行任何操作。
大多数未实现此操作的子类只是不执行任何操作。
如果支持该操作,unbuffer
将释放与流关联的任何和所有系统资源。这些资源的确切列表通常取决于实现,但是,通常情况下,它可能包括缓冲区、套接字、文件描述符等。
PositionedReadable
PositionedReadable
操作提供“定位读取”(“pread”)。它们提供了从数据流中的特定位置将数据读入缓冲区的能力。定位读取相当于在特定偏移量处执行 Seekable.seek
,然后执行 InputStream.read(buffer[], offset, length)
,只是有一个方法调用,而不是 seek
然后 read
,并且两个定位读取可以可选地通过 FSDataInputStream
流的单个实例并发运行。
该接口声明定位读取是线程安全的(某些实现不遵循此保证)。
任何与流操作(例如 Seekable.seek
、Seekable.getPos()
和 InputStream.read()
)并发的定位读取都必须独立运行;不得相互干扰。
并发定位读取和流操作必须是可序列化的;一个可能会阻塞另一个,因此它们会按顺序运行,但为了获得更好的吞吐量和“实时性”,它们应该并发运行。
给定两个并行的定位读取,一个在 pos1
处,长度为 len1
,进入缓冲区 dest1
,另一个在 pos2
处,长度为 len2
,进入缓冲区 dest2
,并且给定在寻找到 pos3
后运行的并发流读取,即使读取在底层流上重叠,结果缓冲区也必须按如下方式填充
// Positioned read #1 read(pos1, dest1, ... len1) -> dest1[0..len1 - 1] = [data(FS, path, pos1), data(FS, path, pos1 + 1) ... data(FS, path, pos1 + len1 - 1] // Positioned read #2 read(pos2, dest2, ... len2) -> dest2[0..len2 - 1] = [data(FS, path, pos2), data(FS, path, pos2 + 1) ... data(FS, path, pos2 + len2 - 1] // Stream read seek(pos3); read(dest3, ... len3) -> dest3[0..len3 - 1] = [data(FS, path, pos3), data(FS, path, pos3 + 1) ... data(FS, path, pos3 + len3 - 1]
请注意,实现不需要是原子的;操作的中间状态(getPos()
值的变化)可能可见。
并非所有 FSDataInputStream
实现都支持这些操作。那些不实现 Seekable.seek()
的实现不实现 PositionedReadable
接口。
supported(FSDIS, Seekable.seek) else raise [UnsupportedOperationException, IOException]
这可以认为是显而易见的:如果流不是 Seekable
,客户端无法查找位置。这也是使用 Seekable.seek()
的基类实现的副作用。
隐式不变式:对于所有 PositionedReadable
操作,pos
的值在操作结束时保持不变
pos(FSDIS') == pos(FSDIS)
对于任何失败的操作,目标 buffer
的内容都是未定义的。在报告失败之前,实现可能会覆盖部分或全部缓冲区。
int PositionedReadable.read(position, buffer, offset, length)
尽可能多地将数据读入为其分配的缓冲区空间。
position >= 0 else raise [EOFException, IOException, IllegalArgumentException, RuntimeException] len(buffer) - offset >= length else raise [IndexOutOfBoundException, RuntimeException] length >= 0 offset >= 0
读取的数据量是长度或从指定位置可用的数据量中较小的一个
let available = min(length, len(data)-position) buffer'[offset..(offset+available-1)] = data[position..position+available -1] result = available
length==0
的调用隐式不会读取任何数据;实现可能会缩短操作并省略任何 IO。在这种情况下,可能会省略对流是否位于文件末尾的检查。buffer
的最终状态将是未定义的。void PositionedReadable.readFully(position, buffer, offset, length)
将恰好 length
字节的数据读入缓冲区,如果可用数据不足,则失败。
position >= 0 else raise [EOFException, IOException, IllegalArgumentException, RuntimeException] length >= 0 offset >= 0 len(buffer) - offset >= length else raise [IndexOutOfBoundException, RuntimeException] (position + length) <= len(data) else raise [EOFException, IOException]
如果在读取操作期间发生 IO 异常,buffer
的最终状态将是未定义的。
如果输入流中没有足够的数据来满足请求,buffer
的最终状态将是未定义的。
从偏移量 offset
开始的缓冲区将填充从 position
开始的数据
buffer'[offset..(offset+length-1)] = data[position..(position + length -1)]
PositionedReadable.readFully(position, buffer)
其语义与以下完全等效
readFully(position, buffer, 0, len(buffer))
也就是说,缓冲区完全填充自位置 position
开始的输入源的内容
default void readVectored(List<? extends FileRange> ranges, IntFunction<ByteBuffer> allocate)
异步读取一系列范围的完整数据。默认实现遍历这些范围,尝试基于 minSeekForVectorReads
和 maxReadSizeForVectorReads
的值合并这些范围,然后同步读取每个合并的范围,但目的是子类可以实现高效的实现。支持在直接和堆字节缓冲区中读取。此外,建议客户端使用 WeakReferencedElasticByteBufferPool
分配缓冲区,以便即使直接缓冲区在不再被引用时也会被垃圾回收。
readVectored()
之后,getPos()
返回的位置是未定义的。
如果在 readVectored()
操作进行期间更改了文件,则输出是未定义的。某些范围可能包含旧数据,某些可能包含新数据,而某些可能同时包含两者。
在 readVectored()
操作进行期间,正常的读取 API 调用可能会被阻塞。
注意:不要使用直接缓冲区从 ChecksumFileSystem 中读取,因为这可能导致 HADOOP-18296 中解释的内存碎片。
对于每个请求的范围
range.getOffset >= 0 else raise IllegalArgumentException range.getLength >= 0 else raise EOFException
对于每个请求的范围
range.getData() returns CompletableFuture<ByteBuffer> which will have data from range.getOffset to range.getLength.
minSeekForVectorReads()
最小的合理寻址。如果第一个范围的末尾和下一个范围的开始之间的差值大于此值,则不会将两个范围合并在一起。
maxReadSizeForVectorReads()
合并范围后一次可以读取的最大字节数。如果要读取的合并数据大于此值,则不会合并两个范围。从本质上讲,将此值设置为 0 将禁用范围合并。
FileSystem.open(p)
提供的数据流 FSDIS 的所有本地和远程读取器都应在打开时获得对 FS.Files[p]
数据的访问权限。在时间 t0
FSDIS0 = FS'read(p) = (0, data0[])
在时间 t1
FS' = FS' where FS'.Files[p] = data1
从时间 t >= t1
开始,FSDIS0
的值是未定义的。
它可能保持不变
FSDIS0.data == data0 forall l in len(FSDIS0.data): FSDIS0.read() == data0[l]
它可能会获取新数据
FSDIS0.data == data1 forall l in len(FSDIS0.data): FSDIS0.read() == data1[l]
它可能不一致,以至于读取偏移量会返回来自任一数据集的数据
forall l in len(FSDIS0.data): (FSDIS0.read(l) == data0[l]) or (FSDIS0.read(l) == data1[l]))
也就是说,读取的每个值可能来自原始文件或更新后的文件。
在时间 t2 > t1
重复读取同一偏移量时,它也可能不一致
r2 = FSDIS0.read(l)
而在时间 t3 > t2
r3 = FSDIS0.read(l)
可能是 r3 != r2
。(也就是说,某些数据可能被缓存或复制,而在后续读取中,返回了文件内容的不同版本)。
类似地,如果路径 p
中的数据被删除,此更改在 FSDIS0
上执行的读取操作期间可能可见,也可能不可见。