可插拔 Shuffle 和可插拔排序功能允许使用备用实现替换内置的 Shuffle 和排序逻辑。此功能的示例用例包括:使用 HTTP 之外的其他应用程序协议(例如 RDMA)将数据从 Map 节点混洗到 Reducer 节点;或使用支持哈希聚合和 Limit-N 查询的自定义算法替换排序逻辑。
重要提示:可插拔 Shuffle 和可插拔排序功能仍处于实验阶段且不稳定。这意味着提供的 API 可能会在 Hadoop 的未来版本中发生更改并破坏兼容性。
自定义 Shuffle 实现需要在 NodeManager 中运行的 org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices.AuxiliaryService
实现类和在 Reducer 任务中运行的 org.apache.hadoop.mapred.ShuffleConsumerPlugin
实现类。
Hadoop 提供的默认实现可用作参考
org.apache.hadoop.mapred.ShuffleHandler
org.apache.hadoop.mapreduce.task.reduce.Shuffle
自定义排序实现需要一个在 Mapper 任务中运行的 org.apache.hadoop.mapred.MapOutputCollector
实现类,以及(可选,取决于排序实现)一个在 Reducer 任务中运行的 org.apache.hadoop.mapred.ShuffleConsumerPlugin
实现类。
Hadoop 提供的默认实现可用作参考
org.apache.hadoop.mapred.MapTask$MapOutputBuffer
org.apache.hadoop.mapreduce.task.reduce.Shuffle
除了在服务 Shuffle 的 NodeManager 中运行的辅助服务外(默认情况下为 ShuffleHandler
),所有可插入组件都在作业任务中运行。这意味着它们可以按作业基础进行配置。为 Shuffle 提供服务的辅助服务必须在 NodeManager 配置中进行配置。
属性 | 默认值 | 说明 |
---|---|---|
mapreduce.job.reduce.shuffle.consumer.plugin.class |
org.apache.hadoop.mapreduce.task.reduce.Shuffle |
要使用的 ShuffleConsumerPlugin 实现 |
mapreduce.job.map.output.collector.class |
org.apache.hadoop.mapred.MapTask$MapOutputBuffer |
要使用的 MapOutputCollector 实现 |
这些属性也可以在 mapred-site.xml
中设置,以更改所有作业的默认值。
收集器类配置可以指定一个由收集器实现组成的逗号分隔列表。在这种情况下,映射任务将尝试依次实例化每个实现,直到其中一个实现成功初始化。例如,如果给定的收集器实现仅与某些类型的键或值兼容,则这可能很有用。
yarn-site.xml
有两种方法可以通过清单文件或通过配置(旧方法)配置辅助服务。如果使用清单文件,则不会从配置中读取辅助服务配置。
如果使用清单,则必须通过在 yarn-site.xml 中将属性 yarn.nodemanager.aux-services.manifest.enabled
设置为 true 来启用该功能。可以在 yarn-site.xml 中的属性 yarn.nodemanager.aux-services.manifest
下设置文件路径,或者可以通过 PUT 调用将文件发送到每个 NM,该调用指向端点 http://nm-http-address:port/ws/v1/node/auxiliaryservices
。如果在配置中设置了文件路径,则 NM 将以 yarn.nodemanager.aux-services.manifest.reload-ms
指定的间隔检查此文件是否有新修改(默认为 0;设置间隔 <= 0 表示不会自动重新加载)。
否则,设置以下属性以通过配置配置辅助服务。
属性 | 默认值 | 说明 |
---|---|---|
yarn.nodemanager.aux-services |
...,mapreduce_shuffle |
辅助服务名称 |
yarn.nodemanager.aux-services.mapreduce_shuffle.class |
org.apache.hadoop.mapred.ShuffleHandler |
要使用的辅助服务类 |
yarn.nodemanager.aux-services.%s.classpath |
无 | 本地目录,其中包括相关的 jar 文件以及所有依赖项的 jar 文件。我们可以指定单个 jar 文件或使用 /dep/* 来加载 dep 目录下的所有 jar。 |
yarn.nodemanager.aux-services.%s.remote-classpath |
无 | jar 文件的远程绝对或相对路径 |
使用清单
{ "services": [ { "name": "mapreduce_shuffle", "version": "1", "configuration": { "properties": { "class.name": "org.apache.hadoop.mapred.ShuffleHandler" } } }, { "name": "AuxServiceFromHDFS", "version": "1", "configuration": { "properties": { "class.name": "org.apache.auxtest.AuxServiceFromHDFS2" }, "files": [ { "src_file": "hdfs:///aux/test/aux-service-hdfs.jar", "type": "STATIC" } ] } } ] }
或使用配置
<configuration> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle,AuxServiceFromHDFS</value> </property> <property> <name>yarn.nodemanager.aux-services.AuxServiceFromHDFS.remote-classpath</name> <value>/aux/test/aux-service-hdfs.jar</value> </property> <property> <name>yarn.nodemanager.aux-services.AuxServiceFromHDFS.class</name> <value>org.apache.auxtest.AuxServiceFromHDFS2</value> </property> </configuration>
使用清单
{ "services": [ { "name": "mapreduce_shuffle", "version": "1", "configuration": { "properties": { "class.name": "org.apache.hadoop.mapred.ShuffleHandler" } } }, { "name": "AuxServiceFromHDFS", "version": "1", "configuration": { "properties": { "class.name": "org.apache.auxtest.AuxServiceFromHDFS2" }, "files": [ { "src_file": "file:///aux/test/aux-service-hdfs.jar", "type": "STATIC" } ] } } ] }
或使用配置
<configuration> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle,AuxServiceFromHDFS</value> </property> <property> <name>yarn.nodemanager.aux-services.AuxServiceFromHDFS.classpath</name> <value>/aux/test/aux-service-hdfs.jar</value> </property> <property> <name>yarn.nodemanager.aux-services.AuxServiceFromHDFS.class</name> <value>org.apache.auxtest.AuxServiceFromHDFS2</value> </property> </configuration>
重要提示:如果除了默认的 mapreduce_shuffle
服务之外还要设置辅助服务,则应将新的服务密钥添加到 yarn.nodemanager.aux-services
属性,例如 mapred.shufflex
。然后,定义相应类的属性必须为 yarn.nodemanager.aux-services.mapreduce_shufflex.class
。或者,如果使用辅助服务清单文件,则应将服务添加到服务列表中。