可插拔 Shuffle 和可插拔排序功能允许使用备用实现替换内置的 Shuffle 和排序逻辑。此功能的示例用例包括:使用 HTTP 之外的其他应用程序协议(例如 RDMA)将数据从 Map 节点混洗到 Reducer 节点;或使用支持哈希聚合和 Limit-N 查询的自定义算法替换排序逻辑。
重要提示:可插拔 Shuffle 和可插拔排序功能仍处于实验阶段且不稳定。这意味着提供的 API 可能会在 Hadoop 的未来版本中发生更改并破坏兼容性。
自定义 Shuffle 实现需要在 NodeManager 中运行的 org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices.AuxiliaryService 实现类和在 Reducer 任务中运行的 org.apache.hadoop.mapred.ShuffleConsumerPlugin 实现类。
Hadoop 提供的默认实现可用作参考
org.apache.hadoop.mapred.ShuffleHandlerorg.apache.hadoop.mapreduce.task.reduce.Shuffle自定义排序实现需要一个在 Mapper 任务中运行的 org.apache.hadoop.mapred.MapOutputCollector 实现类,以及(可选,取决于排序实现)一个在 Reducer 任务中运行的 org.apache.hadoop.mapred.ShuffleConsumerPlugin 实现类。
Hadoop 提供的默认实现可用作参考
org.apache.hadoop.mapred.MapTask$MapOutputBufferorg.apache.hadoop.mapreduce.task.reduce.Shuffle除了在服务 Shuffle 的 NodeManager 中运行的辅助服务外(默认情况下为 ShuffleHandler),所有可插入组件都在作业任务中运行。这意味着它们可以按作业基础进行配置。为 Shuffle 提供服务的辅助服务必须在 NodeManager 配置中进行配置。
| 属性 | 默认值 | 说明 |
|---|---|---|
mapreduce.job.reduce.shuffle.consumer.plugin.class |
org.apache.hadoop.mapreduce.task.reduce.Shuffle |
要使用的 ShuffleConsumerPlugin 实现 |
mapreduce.job.map.output.collector.class |
org.apache.hadoop.mapred.MapTask$MapOutputBuffer |
要使用的 MapOutputCollector 实现 |
这些属性也可以在 mapred-site.xml 中设置,以更改所有作业的默认值。
收集器类配置可以指定一个由收集器实现组成的逗号分隔列表。在这种情况下,映射任务将尝试依次实例化每个实现,直到其中一个实现成功初始化。例如,如果给定的收集器实现仅与某些类型的键或值兼容,则这可能很有用。
yarn-site.xml有两种方法可以通过清单文件或通过配置(旧方法)配置辅助服务。如果使用清单文件,则不会从配置中读取辅助服务配置。
如果使用清单,则必须通过在 yarn-site.xml 中将属性 yarn.nodemanager.aux-services.manifest.enabled 设置为 true 来启用该功能。可以在 yarn-site.xml 中的属性 yarn.nodemanager.aux-services.manifest 下设置文件路径,或者可以通过 PUT 调用将文件发送到每个 NM,该调用指向端点 http://nm-http-address:port/ws/v1/node/auxiliaryservices。如果在配置中设置了文件路径,则 NM 将以 yarn.nodemanager.aux-services.manifest.reload-ms 指定的间隔检查此文件是否有新修改(默认为 0;设置间隔 <= 0 表示不会自动重新加载)。
否则,设置以下属性以通过配置配置辅助服务。
| 属性 | 默认值 | 说明 |
|---|---|---|
yarn.nodemanager.aux-services |
...,mapreduce_shuffle |
辅助服务名称 |
yarn.nodemanager.aux-services.mapreduce_shuffle.class |
org.apache.hadoop.mapred.ShuffleHandler |
要使用的辅助服务类 |
yarn.nodemanager.aux-services.%s.classpath |
无 | 本地目录,其中包括相关的 jar 文件以及所有依赖项的 jar 文件。我们可以指定单个 jar 文件或使用 /dep/* 来加载 dep 目录下的所有 jar。 |
yarn.nodemanager.aux-services.%s.remote-classpath |
无 | jar 文件的远程绝对或相对路径 |
使用清单
{
"services": [
{
"name": "mapreduce_shuffle",
"version": "1",
"configuration": {
"properties": {
"class.name": "org.apache.hadoop.mapred.ShuffleHandler"
}
}
},
{
"name": "AuxServiceFromHDFS",
"version": "1",
"configuration": {
"properties": {
"class.name": "org.apache.auxtest.AuxServiceFromHDFS2"
},
"files": [
{
"src_file": "hdfs:///aux/test/aux-service-hdfs.jar",
"type": "STATIC"
}
]
}
}
]
}
或使用配置
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle,AuxServiceFromHDFS</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.AuxServiceFromHDFS.remote-classpath</name>
<value>/aux/test/aux-service-hdfs.jar</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.AuxServiceFromHDFS.class</name>
<value>org.apache.auxtest.AuxServiceFromHDFS2</value>
</property>
</configuration>
使用清单
{
"services": [
{
"name": "mapreduce_shuffle",
"version": "1",
"configuration": {
"properties": {
"class.name": "org.apache.hadoop.mapred.ShuffleHandler"
}
}
},
{
"name": "AuxServiceFromHDFS",
"version": "1",
"configuration": {
"properties": {
"class.name": "org.apache.auxtest.AuxServiceFromHDFS2"
},
"files": [
{
"src_file": "file:///aux/test/aux-service-hdfs.jar",
"type": "STATIC"
}
]
}
}
]
}
或使用配置
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle,AuxServiceFromHDFS</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.AuxServiceFromHDFS.classpath</name>
<value>/aux/test/aux-service-hdfs.jar</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.AuxServiceFromHDFS.class</name>
<value>org.apache.auxtest.AuxServiceFromHDFS2</value>
</property>
</configuration>
重要提示:如果除了默认的 mapreduce_shuffle 服务之外还要设置辅助服务,则应将新的服务密钥添加到 yarn.nodemanager.aux-services 属性,例如 mapred.shufflex。然后,定义相应类的属性必须为 yarn.nodemanager.aux-services.mapreduce_shufflex.class。或者,如果使用辅助服务清单文件,则应将服务添加到服务列表中。