Hadoop:可插拔 Shuffle 和可插拔排序

简介

可插拔 Shuffle 和可插拔排序功能允许使用备用实现替换内置的 Shuffle 和排序逻辑。此功能的示例用例包括:使用 HTTP 之外的其他应用程序协议(例如 RDMA)将数据从 Map 节点混洗到 Reducer 节点;或使用支持哈希聚合和 Limit-N 查询的自定义算法替换排序逻辑。

重要提示:可插拔 Shuffle 和可插拔排序功能仍处于实验阶段且不稳定。这意味着提供的 API 可能会在 Hadoop 的未来版本中发生更改并破坏兼容性。

实现自定义 Shuffle 和自定义排序

自定义 Shuffle 实现需要在 NodeManager 中运行的 org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices.AuxiliaryService 实现类和在 Reducer 任务中运行的 org.apache.hadoop.mapred.ShuffleConsumerPlugin 实现类。

Hadoop 提供的默认实现可用作参考

  • org.apache.hadoop.mapred.ShuffleHandler
  • org.apache.hadoop.mapreduce.task.reduce.Shuffle

自定义排序实现需要一个在 Mapper 任务中运行的 org.apache.hadoop.mapred.MapOutputCollector 实现类,以及(可选,取决于排序实现)一个在 Reducer 任务中运行的 org.apache.hadoop.mapred.ShuffleConsumerPlugin 实现类。

Hadoop 提供的默认实现可用作参考

  • org.apache.hadoop.mapred.MapTask$MapOutputBuffer
  • org.apache.hadoop.mapreduce.task.reduce.Shuffle

配置

除了在服务 Shuffle 的 NodeManager 中运行的辅助服务外(默认情况下为 ShuffleHandler),所有可插入组件都在作业任务中运行。这意味着它们可以按作业基础进行配置。为 Shuffle 提供服务的辅助服务必须在 NodeManager 配置中进行配置。

作业配置属性(按作业基础)

属性 默认值 说明
mapreduce.job.reduce.shuffle.consumer.plugin.class org.apache.hadoop.mapreduce.task.reduce.Shuffle 要使用的 ShuffleConsumerPlugin 实现
mapreduce.job.map.output.collector.class org.apache.hadoop.mapred.MapTask$MapOutputBuffer 要使用的 MapOutputCollector 实现

这些属性也可以在 mapred-site.xml 中设置,以更改所有作业的默认值。

收集器类配置可以指定一个由收集器实现组成的逗号分隔列表。在这种情况下,映射任务将尝试依次实例化每个实现,直到其中一个实现成功初始化。例如,如果给定的收集器实现仅与某些类型的键或值兼容,则这可能很有用。

NodeManager 配置属性,所有节点中的 yarn-site.xml

有两种方法可以通过清单文件或通过配置(旧方法)配置辅助服务。如果使用清单文件,则不会从配置中读取辅助服务配置。

如果使用清单,则必须通过在 yarn-site.xml 中将属性 yarn.nodemanager.aux-services.manifest.enabled 设置为 true 来启用该功能。可以在 yarn-site.xml 中的属性 yarn.nodemanager.aux-services.manifest 下设置文件路径,或者可以通过 PUT 调用将文件发送到每个 NM,该调用指向端点 http://nm-http-address:port/ws/v1/node/auxiliaryservices。如果在配置中设置了文件路径,则 NM 将以 yarn.nodemanager.aux-services.manifest.reload-ms 指定的间隔检查此文件是否有新修改(默认为 0;设置间隔 <= 0 表示不会自动重新加载)。

否则,设置以下属性以通过配置配置辅助服务。

属性 默认值 说明
yarn.nodemanager.aux-services ...,mapreduce_shuffle 辅助服务名称
yarn.nodemanager.aux-services.mapreduce_shuffle.class org.apache.hadoop.mapred.ShuffleHandler 要使用的辅助服务类
yarn.nodemanager.aux-services.%s.classpath 本地目录,其中包括相关的 jar 文件以及所有依赖项的 jar 文件。我们可以指定单个 jar 文件或使用 /dep/* 来加载 dep 目录下的所有 jar。
yarn.nodemanager.aux-services.%s.remote-classpath jar 文件的远程绝对或相对路径

从 HDFS 加载 jar 文件的示例

使用清单

{
  "services": [
    {
      "name": "mapreduce_shuffle",
      "version": "1",
      "configuration": {
        "properties": {
          "class.name": "org.apache.hadoop.mapred.ShuffleHandler"
        }
      }
    },
    {
      "name": "AuxServiceFromHDFS",
      "version": "1",
      "configuration": {
        "properties": {
          "class.name": "org.apache.auxtest.AuxServiceFromHDFS2"
        },
        "files": [
          {
            "src_file": "hdfs:///aux/test/aux-service-hdfs.jar",
            "type": "STATIC"
          }
        ]
      }
    }
  ]
}

或使用配置

<configuration>
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle,AuxServiceFromHDFS</value>
    </property>

    <property>
        <name>yarn.nodemanager.aux-services.AuxServiceFromHDFS.remote-classpath</name>
        <value>/aux/test/aux-service-hdfs.jar</value>
    </property>

    <property>
        <name>yarn.nodemanager.aux-services.AuxServiceFromHDFS.class</name>
        <value>org.apache.auxtest.AuxServiceFromHDFS2</value>
    </property>
</configuration>

从本地文件系统加载 jar 文件的示例

使用清单

{
  "services": [
    {
      "name": "mapreduce_shuffle",
      "version": "1",
      "configuration": {
        "properties": {
          "class.name": "org.apache.hadoop.mapred.ShuffleHandler"
        }
      }
    },
    {
      "name": "AuxServiceFromHDFS",
      "version": "1",
      "configuration": {
        "properties": {
          "class.name": "org.apache.auxtest.AuxServiceFromHDFS2"
        },
        "files": [
          {
            "src_file": "file:///aux/test/aux-service-hdfs.jar",
            "type": "STATIC"
          }
        ]
      }
    }
  ]
}

或使用配置

<configuration>
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle,AuxServiceFromHDFS</value>
    </property>

    <property>
        <name>yarn.nodemanager.aux-services.AuxServiceFromHDFS.classpath</name>
        <value>/aux/test/aux-service-hdfs.jar</value>
    </property>

    <property>
        <name>yarn.nodemanager.aux-services.AuxServiceFromHDFS.class</name>
        <value>org.apache.auxtest.AuxServiceFromHDFS2</value>
    </property>
</configuration>

重要提示:如果除了默认的 mapreduce_shuffle 服务之外还要设置辅助服务,则应将新的服务密钥添加到 yarn.nodemanager.aux-services 属性,例如 mapred.shufflex。然后,定义相应类的属性必须为 yarn.nodemanager.aux-services.mapreduce_shufflex.class。或者,如果使用辅助服务清单文件,则应将服务添加到服务列表中。