org.apache.hadoop.fs.MultipartUploader
MultipartUploader
可以使用多个部分将文件上传到 Hadoop 支持的文件系统。多部分上传的好处在于可以并行地从多个客户端或进程上传文件,并且在调用 complete
函数之前,结果对其他客户端不可见。
当由对象存储实现时,上传的数据可能会产生存储费用,即使在文件系统中不可见之前也是如此。此 API 的用户必须勤勉,并始终尽最大努力完成或中止上传。abortUploadsUnderPath(path)
操作可以在这里提供帮助。
有效 MultipartUploader
的所有要求都被视为隐式前置条件和后置条件
单个多部分上传的操作可以在多部分上传器的不同实例、不同进程和主机中进行。因此,要求
上传部分、完成上传或中止上传所需的所有状态都必须包含在上传句柄中或可从上传句柄中检索。
该句柄必须可序列化;它必须可反序列化为执行 Hadoop 完全相同版本的不同进程。
不同的主机/进程可以按顺序或同时上传不同的部分。它们上传到文件系统中的顺序不得限制数据存储在最终文件中的顺序。
上传可以在与任何上传部分的实例不同的实例上完成。
上传的输出在上传完成之前不得在最终目标中可见。
如果单个多部分上传器实例按顺序启动或完成多个上传文件到同一目标,则这不是错误,无论存储是否支持并发上传。
多个进程可以同时上传多部分上传的部分。
如果对正在进行活动上传的目标调用 startUpload(path)
,则实现必须执行以下两个操作之一。
哪个上传成功是未定义的。用户不得期望跨文件系统、跨文件系统实例*甚至跨不同请求保持一致的行为。
如果在部分上传进行时完成或中止多部分上传,则进行中的上传(如果尚未完成)不得全部或部分包含在最终文件中。实现应在 putPart()
操作中引发错误。
用户不得期望序列化 PathHandle 版本在以下情况下兼容:* 不同的多部分上传器实现。* 同一实现的不同版本。
也就是说:所有客户端都必须使用完全相同的 Hadoop 版本。
支持多部分上传的 FileSystem/FileContext 将现有模型 (目录、文件、符号链接)
扩展为 (目录、文件、符号链接、上传)
类型为 Map[UploadHandle -> Map[PartHandle -> UploadPart]
的上传
。
状态元组的 Uploads
元素是所有活动上传的映射。
Uploads: Map[UploadHandle -> Map[PartHandle -> UploadPart]`
UploadHandle 是非空字节列表。
UploadHandle: List[byte] len(UploadHandle) > 0
客户端必须将此视为不透明。此功能设计的核心是句柄在所有客户端中有效:句柄可以在主机 hostA
上序列化,在 hostB
上反序列化,并且仍然用于扩展或完成上传。
UploadPart = (Path: path, parts: Map[PartHandle -> byte[]])
同样,PartHandle
类型也是一个非空的不透明字节列表,同样可以在主机之间封送。
PartHandle: List[byte]
隐含地,FS.Uploads
中的每个 UploadHandle
都是唯一的。同样,[PartHandle -> UploadPart]
映射中的每个 PartHandle
也必须是唯一的。
abort(FS, uploadHandle)
操作可能会无意中取消使用相同 Upload Handle 的后续操作。所有操作都返回 CompletableFuture<>
类型,必须随后对其进行评估以获取其返回值。
这意味着,当实现与快速文件系统/存储进行交互时,包括文件存在在内的所有前提条件可能被提前评估,而与远程对象存储进行交互的实现(其探测速度较慢)可能在异步阶段验证前提条件,尤其是与远程存储进行交互的前提条件。
Java CompletableFutures 无法很好地处理已检查异常。Hadoop 代码库仍在不断完善此处的异常处理细节,因为异步 API 的使用越来越多。假设任何声明必须引发 IOException
的前提条件失败都可能导致该操作在将来评估时被包装在某种形式的 RuntimeException
中;这也适用于操作期间引发的任何其他 IOException
。
close()
应用程序必须在使用上传器后调用 close()
;这样做是为了释放其他对象、更新统计信息等。
CompletableFuture<UploadHandle> startUpload(Path)
启动多部分上传,最终返回一个 UploadHandle
以用于后续操作。
if path == "/" : raise IOException if exists(FS, path) and not isFile(FS, path) raise PathIsDirectoryException, IOException
如果文件系统不支持对目标的并发上传,则添加以下前提条件
if path in values(FS.Uploads) raise PathExistsException, IOException
初始化操作完成后,文件系统状态将使用新的活动上传更新,并带有新的句柄,此句柄将返回给调用方。
handle' = UploadHandle where not handle' in keys(FS.Uploads) FS' = FS where FS'.Uploads(handle') == {} result = handle'
CompletableFuture<PartHandle> putPart(UploadHandle uploadHandle, int partNumber, Path filePath, InputStream inputStream, long lengthInBytes)
上传特定多部分上传的部分,最终返回一个不透明的部分句柄,表示指定上传的这一部分。
uploadHandle in keys(FS.Uploads) partNumber >= 1 lengthInBytes >= 0 len(inputStream) >= lengthInBytes
data' = inputStream(0..lengthInBytes) partHandle' = byte[] where not partHandle' in keys(FS.uploads(uploadHandle).parts) FS' = FS where FS'.uploads(uploadHandle).parts(partHandle') == data' result = partHandle'
数据存储在文件系统中,等待完成。它不得在目标路径中可见。它可能在文件系统中的某个临时路径中可见;这是特定于实现的,并且不得依赖它。
CompletableFuture<PathHandle> complete(UploadHandle uploadId, Path filePath, Map<Integer, PartHandle> handles)
完成多部分上传。
文件系统可以强制执行每个部分的最小大小,但不包括上传的最后一个部分。
如果某个部分超出此范围,则必须引发 IOException
。
uploadHandle in keys(FS.Uploads) else raise FileNotFoundException FS.Uploads(uploadHandle).path == path if exists(FS, path) and not isFile(FS, path) raise PathIsDirectoryException, IOException parts.size() > 0 forall k in keys(parts): k > 0 forall k in keys(parts): not exists(k2 in keys(parts)) where (parts[k] == parts[k2])
所有键都必须大于零,并且不得对同一部分句柄有任何重复引用。这些验证可以在操作期间的任何时间执行。在失败后,无法保证对此上传使用有效路径映射的 complete()
调用将完成。调用方应在任何此类故障后调用 abort()
以确保清理。
如果对这个 uploadHandle
执行了 putPart()
操作,但其 PathHandle
句柄未包含在此请求中,则遗漏的部分不得成为结果文件的一部分。
MultipartUploader 必须清理任何此类未完成的条目。
对于支持目录(本地文件系统、HDFS 等)的后备存储,如果在完成时,目标处现在有一个目录,则必须引发 PathIsDirectoryException
或其他 IOException
。
UploadData' == ordered concatention of all data in the map of parts, ordered by key exists(FS', path') and result = PathHandle(path') FS' = FS where FS.Files(path) == UploadData' and not uploadHandle in keys(FS'.uploads)
PathHandle
由完成操作返回,因此后续操作将能够识别出数据在此期间未发生更改。
文件中上传部分的顺序是地图中部分的自然顺序:部分 1 位于部分 2 之前,依此类推。
CompletableFuture<Void> abort(UploadHandle uploadId, Path filePath)
中止多部分上传。句柄将变为无效,并且不可重复使用。
uploadHandle in keys(FS.Uploads) else raise FileNotFoundException
上传句柄不再已知。
FS' = FS where not uploadHandle in keys(FS'.uploads)
使用同一句柄对 abort()
的后续调用将失败,除非已回收该句柄。
CompletableFuture<Integer> abortUploadsUnderPath(Path path)
对路径下的所有上传执行尽力清理。
返回解析为以下内容的 future。
-1 if unsuppported >= 0 if supported
由于这是尽力而为,因此无法实现严格的后置条件。理想的后置条件是路径下的所有上传都已中止,并且计数是中止的上传数
FS'.uploads forall upload in FS.uploads: not isDescendant(FS, path, upload.path) return len(forall upload in FS.uploads: isDescendant(FS, path, upload.path))