接口 org.apache.hadoop.fs.MultipartUploader

MultipartUploader 可以使用多个部分将文件上传到 Hadoop 支持的文件系统。多部分上传的好处在于可以并行地从多个客户端或进程上传文件,并且在调用 complete 函数之前,结果对其他客户端不可见。

当由对象存储实现时,上传的数据可能会产生存储费用,即使在文件系统中不可见之前也是如此。此 API 的用户必须勤勉,并始终尽最大努力完成或中止上传。abortUploadsUnderPath(path) 操作可以在这里提供帮助。

不变式

有效 MultipartUploader 的所有要求都被视为隐式前置条件和后置条件

单个多部分上传的操作可以在多部分上传器的不同实例、不同进程和主机中进行。因此,要求

  1. 上传部分、完成上传或中止上传所需的所有状态都必须包含在上传句柄中或可从上传句柄中检索。

  2. 该句柄必须可序列化;它必须可反序列化为执行 Hadoop 完全相同版本的不同进程。

  3. 不同的主机/进程可以按顺序或同时上传不同的部分。它们上传到文件系统中的顺序不得限制数据存储在最终文件中的顺序。

  4. 上传可以在与任何上传部分的实例不同的实例上完成。

  5. 上传的输出在上传完成之前不得在最终目标中可见。

  6. 如果单个多部分上传器实例按顺序启动或完成多个上传文件到同一目标,则这不是错误,无论存储是否支持并发上传。

并发

多个进程可以同时上传多部分上传的部分。

如果对正在进行活动上传的目标调用 startUpload(path),则实现必须执行以下两个操作之一。

  • 将该调用拒绝为重复调用。
  • 允许两者继续进行,文件最终输出为两个上传中恰好一个 的输出。

哪个上传成功是未定义的。用户不得期望跨文件系统、跨文件系统实例*甚至跨不同请求保持一致的行为。

如果在部分上传进行时完成或中止多部分上传,则进行中的上传(如果尚未完成)不得全部或部分包含在最终文件中。实现应在 putPart() 操作中引发错误。

序列化兼容性

用户不得期望序列化 PathHandle 版本在以下情况下兼容:* 不同的多部分上传器实现。* 同一实现的不同版本。

也就是说:所有客户端都必须使用完全相同的 Hadoop 版本。

模型

支持多部分上传的 FileSystem/FileContext 将现有模型 (目录、文件、符号链接) 扩展为 (目录、文件、符号链接、上传) 类型为 Map[UploadHandle -> Map[PartHandle -> UploadPart]上传

状态元组的 Uploads 元素是所有活动上传的映射。

Uploads: Map[UploadHandle -> Map[PartHandle -> UploadPart]`

UploadHandle 是非空字节列表。

UploadHandle: List[byte]
len(UploadHandle) > 0

客户端必须将此视为不透明。此功能设计的核心是句柄在所有客户端中有效:句柄可以在主机 hostA 上序列化,在 hostB 上反序列化,并且仍然用于扩展或完成上传。

UploadPart = (Path: path, parts: Map[PartHandle -> byte[]])

同样,PartHandle 类型也是一个非空的不透明字节列表,同样可以在主机之间封送。

PartHandle: List[byte]

隐含地,FS.Uploads 中的每个 UploadHandle 都是唯一的。同样,[PartHandle -> UploadPart] 映射中的每个 PartHandle 也必须是唯一的。

  1. 没有要求 Part Handle 在上传之间是唯一的。
  2. 没有要求 Upload Handle 随着时间的推移是唯一的。但是,如果 Part Handle 被快速回收,则存在风险,即名义上幂等的 abort(FS, uploadHandle) 操作可能会无意中取消使用相同 Upload Handle 的后续操作。

异步 API

所有操作都返回 CompletableFuture<> 类型,必须随后对其进行评估以获取其返回值。

  1. 操作的执行可能是调用线程上的阻塞操作。
  2. 如果不是,则在单独的线程中执行,并且必须在将来评估返回时完成。
  3. 一些/所有前提条件可能在初始调用时进行评估,
  4. 所有未在当时评估的前提条件必须在执行将来时进行评估。

这意味着,当实现与快速文件系统/存储进行交互时,包括文件存在在内的所有前提条件可能被提前评估,而与远程对象存储进行交互的实现(其探测速度较慢)可能在异步阶段验证前提条件,尤其是与远程存储进行交互的前提条件。

Java CompletableFutures 无法很好地处理已检查异常。Hadoop 代码库仍在不断完善此处的异常处理细节,因为异步 API 的使用越来越多。假设任何声明必须引发 IOException 的前提条件失败都可能导致该操作在将来评估时被包装在某种形式的 RuntimeException 中;这也适用于操作期间引发的任何其他 IOException

close()

应用程序必须在使用上传器后调用 close();这样做是为了释放其他对象、更新统计信息等。

状态更改操作

CompletableFuture<UploadHandle> startUpload(Path)

启动多部分上传,最终返回一个 UploadHandle 以用于后续操作。

前提条件

if path == "/" : raise IOException

if exists(FS, path) and not isFile(FS, path) raise PathIsDirectoryException, IOException

如果文件系统不支持对目标的并发上传,则添加以下前提条件

if path in values(FS.Uploads) raise PathExistsException, IOException

后置条件

初始化操作完成后,文件系统状态将使用新的活动上传更新,并带有新的句柄,此句柄将返回给调用方。

handle' = UploadHandle where not handle' in keys(FS.Uploads)
FS' = FS where FS'.Uploads(handle') == {}
result = handle'

CompletableFuture<PartHandle> putPart(UploadHandle uploadHandle, int partNumber, Path filePath, InputStream inputStream, long lengthInBytes)

上传特定多部分上传的部分,最终返回一个不透明的部分句柄,表示指定上传的这一部分。

前提条件

uploadHandle in keys(FS.Uploads)
partNumber >= 1
lengthInBytes >= 0
len(inputStream) >= lengthInBytes

后置条件

data' = inputStream(0..lengthInBytes)
partHandle' = byte[] where not partHandle' in keys(FS.uploads(uploadHandle).parts)
FS' = FS where FS'.uploads(uploadHandle).parts(partHandle') == data'
result = partHandle'

数据存储在文件系统中,等待完成。它不得在目标路径中可见。它可能在文件系统中的某个临时路径中可见;这是特定于实现的,并且不得依赖它。

CompletableFuture<PathHandle> complete(UploadHandle uploadId, Path filePath, Map<Integer, PartHandle> handles)

完成多部分上传。

文件系统可以强制执行每个部分的最小大小,但不包括上传的最后一个部分。

如果某个部分超出此范围,则必须引发 IOException

前提条件

uploadHandle in keys(FS.Uploads) else raise FileNotFoundException
FS.Uploads(uploadHandle).path == path
if exists(FS, path) and not isFile(FS, path) raise PathIsDirectoryException, IOException
parts.size() > 0
forall k in keys(parts): k > 0
forall k in keys(parts):
  not exists(k2 in keys(parts)) where (parts[k] == parts[k2])

所有键都必须大于零,并且不得对同一部分句柄有任何重复引用。这些验证可以在操作期间的任何时间执行。在失败后,无法保证对此上传使用有效路径映射的 complete() 调用将完成。调用方应在任何此类故障后调用 abort() 以确保清理。

如果对这个 uploadHandle 执行了 putPart() 操作,但其 PathHandle 句柄未包含在此请求中,则遗漏的部分不得成为结果文件的一部分。

MultipartUploader 必须清理任何此类未完成的条目。

对于支持目录(本地文件系统、HDFS 等)的后备存储,如果在完成时,目标处现在有一个目录,则必须引发 PathIsDirectoryException 或其他 IOException

后置条件

UploadData' == ordered concatention of all data in the map of parts, ordered by key
exists(FS', path') and result = PathHandle(path')
FS' = FS where FS.Files(path) == UploadData' and not uploadHandle in keys(FS'.uploads)

PathHandle 由完成操作返回,因此后续操作将能够识别出数据在此期间未发生更改。

文件中上传部分的顺序是地图中部分的自然顺序:部分 1 位于部分 2 之前,依此类推。

CompletableFuture<Void> abort(UploadHandle uploadId, Path filePath)

中止多部分上传。句柄将变为无效,并且不可重复使用。

前提条件

uploadHandle in keys(FS.Uploads) else raise FileNotFoundException

后置条件

上传句柄不再已知。

FS' = FS where not uploadHandle in keys(FS'.uploads)

使用同一句柄对 abort() 的后续调用将失败,除非已回收该句柄。

CompletableFuture<Integer> abortUploadsUnderPath(Path path)

对路径下的所有上传执行尽力清理。

返回解析为以下内容的 future。

-1 if unsuppported
>= 0 if supported

由于这是尽力而为,因此无法实现严格的后置条件。理想的后置条件是路径下的所有上传都已中止,并且计数是中止的上传数

FS'.uploads forall upload in FS.uploads:
    not isDescendant(FS, path, upload.path)
return len(forall upload in FS.uploads:
               isDescendant(FS, path, upload.path))