MapReduce 教程

目的

本文档全面描述了 Hadoop MapReduce 框架的所有面向用户的功能,并作为教程。

先决条件

确保已安装、配置并运行 Hadoop。更多详细信息

概述

Hadoop MapReduce 是一个软件框架,用于轻松编写应用程序,这些应用程序可以在大型商品硬件集群(数千个节点)中以可靠、容错的方式并行处理海量数据(多 TB 数据集)。

MapReduce 作业通常将输入数据集拆分为独立块,这些块由映射任务以完全并行的方式进行处理。该框架对映射的输出进行排序,然后输入到归约任务中。通常,作业的输入和输出都存储在文件系统中。该框架负责调度任务、监视任务并重新执行失败的任务。

通常,计算节点和存储节点是相同的,即 MapReduce 框架和 Hadoop 分布式文件系统(请参阅HDFS 架构指南)在同一组节点上运行。此配置允许框架在已存在数据所在节点上有效地调度任务,从而在整个集群中实现非常高的聚合带宽。

MapReduce 框架包含一个主 ResourceManager、每个集群节点一个工作 NodeManager 以及每个应用程序一个 MRAppMaster(请参阅YARN 架构指南)。

应用程序至少指定输入/输出位置,并通过实现适当的接口和/或抽象类提供映射归约函数。这些以及其他作业参数构成了作业配置

然后,Hadoop 作业客户端将作业(jar/可执行文件等)和配置提交给 ResourceManager,然后 ResourceManager 负责将软件/配置分发给工作程序、调度任务并监视任务,向作业客户端提供状态和诊断信息。

尽管 Hadoop 框架是用 Java™ 实现的,但 MapReduce 应用程序不必用 Java 编写。

  • Hadoop Streaming 是一个实用程序,允许用户使用任何可执行文件(例如 shell 实用程序)作为映射器和/或归约器来创建和运行作业。

  • Hadoop Pipes 是一个SWIG 兼容的 C++ API,用于实现 MapReduce 应用程序(不基于 JNI™)。

输入和输出

MapReduce 框架专门在 <key, value> 对上运行,即框架将作业的输入视为一组 <key, value> 对,并生成一组 <key, value> 对作为作业的输出,可以想象这些对具有不同的类型。

keyvalue 类必须可由框架序列化,因此需要实现 Writable 接口。此外,key 类必须实现 WritableComparable 接口,以方便框架进行排序。

MapReduce 作业的输入和输出类型

(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)

示例:WordCount v1.0

在我们深入了解细节之前,让我们浏览一个示例 MapReduce 应用程序,以便了解它们的工作原理。

WordCount 是一个简单的应用程序,它统计给定输入集中每个单词出现的次数。

它适用于本地独立、伪分布式或完全分布式 Hadoop 安装(单节点设置)。

源代码

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

用法

假设环境变量已设置为以下内容

export JAVA_HOME=/usr/java/default
export PATH=${JAVA_HOME}/bin:${PATH}
export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar

编译 WordCount.java 并创建一个 jar

$ bin/hadoop com.sun.tools.javac.Main WordCount.java
$ jar cf wc.jar WordCount*.class

假设

  • /user/joe/wordcount/input - HDFS 中的输入目录
  • /user/joe/wordcount/output - HDFS 中的输出目录

示例文本文件作为输入

$ bin/hadoop fs -ls /user/joe/wordcount/input/
/user/joe/wordcount/input/file01
/user/joe/wordcount/input/file02

$ bin/hadoop fs -cat /user/joe/wordcount/input/file01
Hello World Bye World

$ bin/hadoop fs -cat /user/joe/wordcount/input/file02
Hello Hadoop Goodbye Hadoop

运行应用程序

$ bin/hadoop jar wc.jar WordCount /user/joe/wordcount/input /user/joe/wordcount/output

输出

$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000
Bye 1
Goodbye 1
Hadoop 2
Hello 2
World 2

应用程序可以使用选项 -files 指定一个逗号分隔的路径列表,这些路径将出现在任务的当前工作目录中。-libjars 选项允许应用程序将 jar 添加到 map 和 reduce 的类路径中。-archives 选项允许它们将逗号分隔的存档列表作为参数传递。这些存档已解档,并且在任务的当前工作目录中创建了一个名为存档的链接。有关命令行选项的更多详细信息,请访问 命令指南

使用 -libjars-files-archives 运行 wordcount 示例

bin/hadoop jar hadoop-mapreduce-examples-<ver>.jar wordcount -files cachefile.txt -libjars mylib.jar -archives myarchive.zip input output

在此处,myarchive.zip 将被放置并解压缩到名为“myarchive.zip”的目录中。

用户可以使用 # 为通过 -files-archives 选项传递的文件和存档指定不同的符号名称。

例如,

bin/hadoop jar hadoop-mapreduce-examples-<ver>.jar wordcount -files dir1/dict.txt#dict1,dir2/dict.txt#dict2 -archives mytar.tgz#tgzdir input output

在此处,任务可以使用符号名称 dict1 和 dict2 分别访问文件 dir1/dict.txt 和 dir2/dict.txt。存档 mytar.tgz 将被放置并解压缩到名为“tgzdir”的目录中。

应用程序可以通过使用选项 -Dmapreduce.map.env、-Dmapreduce.reduce.env 和 -Dyarn.app.mapreduce.am.env 分别在命令行上指定它们,为映射器、还原器和应用程序主任务指定环境变量。

例如,以下内容为映射器和还原器设置环境变量 FOO_VAR=bar 和 LIST_VAR=a,b,c,

bin/hadoop jar hadoop-mapreduce-examples-<ver>.jar wordcount -Dmapreduce.map.env.FOO_VAR=bar -Dmapreduce.map.env.LIST_VAR=a,b,c -Dmapreduce.reduce.env.FOO_VAR=bar -Dmapreduce.reduce.env.LIST_VAR=a,b,c input output

演练

WordCount 应用程序非常简单。

public void map(Object key, Text value, Context context
                ) throws IOException, InterruptedException {
  StringTokenizer itr = new StringTokenizer(value.toString());
  while (itr.hasMoreTokens()) {
    word.set(itr.nextToken());
    context.write(word, one);
  }
}

Mapper 实现通过 map 方法一次处理一行,由指定的 TextInputFormat 提供。然后,它通过 StringTokenizer 将行拆分为由空格分隔的标记,并发出键值对 < <word>, 1>

对于给定的示例输入,第一个映射发出

< Hello, 1>
< World, 1>
< Bye, 1>
< World, 1>

第二个映射发出

< Hello, 1>
< Hadoop, 1>
< Goodbye, 1>
< Hadoop, 1>

我们将在本教程稍后了解为给定作业生成的地图数,以及如何以细粒度的方式控制它们。

    job.setCombinerClass(IntSumReducer.class);

WordCount 还指定了一个 combiner。因此,每个映射的输出在按 key 排序后,将通过本地合并器(根据作业配置与 Reducer 相同)进行本地聚合。

第一个映射的输出

< Bye, 1>
< Hello, 1>
< World, 2>

第二个映射的输出

< Goodbye, 1>
< Hadoop, 2>
< Hello, 1>
public void reduce(Text key, Iterable<IntWritable> values,
                   Context context
                   ) throws IOException, InterruptedException {
  int sum = 0;
  for (IntWritable val : values) {
    sum += val.get();
  }
  result.set(sum);
  context.write(key, result);
}

Reducer 实现通过 reduce 方法只对值求和,这些值是每个键(在本例中即为单词)的出现次数。

因此,作业的输出是

< Bye, 1>
< Goodbye, 1>
< Hadoop, 2>
< Hello, 2>
< World, 2>

main 方法在 Job 中指定作业的各个方面,例如输入/输出路径(通过命令行传递)、键/值类型、输入/输出格式等。然后,它调用 job.waitForCompletion 以提交作业并监控其进度。

我们将在本教程稍后了解有关 JobInputFormatOutputFormat 和其他接口和类的更多信息。

MapReduce - 用户界面

本节提供了 MapReduce 框架每个用户界面方面的合理详细信息。这应有助于用户以细粒度的方式实现、配置和调整其作业。但是,请注意,每个类/接口的 javadoc 仍然是最全面的可用文档;这仅是一个教程。

让我们首先了解 MapperReducer 接口。应用程序通常实现它们以提供 mapreduce 方法。

然后,我们将讨论其他核心接口,包括 JobPartitionerInputFormatOutputFormat 等。

最后,我们将通过讨论框架的一些有用功能(例如 DistributedCacheIsolationRunner 等)来结束。

有效负载

应用程序通常实现 MapperReducer 接口以提供 mapreduce 方法。这些方法构成了作业的核心。

映射器

Mapper 将输入键/值对映射到一组中间键/值对。

映射是将输入记录转换为中间记录的各个任务。转换后的中间记录不必与输入记录属于同一种类型。给定的输入对可以映射到零个或多个输出对。

Hadoop MapReduce 框架为作业的 InputFormat 生成的每个 InputSplit 生成一个映射任务。

总体而言,映射器实现通过 Job.setMapperClass(Class) 方法传递给作业。然后,框架为该任务的 InputSplit 中的每个键/值对调用 map(WritableComparable, Writable, Context)。然后,应用程序可以覆盖 cleanup(Context) 方法来执行任何必需的清理。

输出对不必与输入对属于同一种类型。给定的输入对可以映射到零个或多个输出对。输出对通过调用 context.write(WritableComparable, Writable) 来收集。

应用程序可以使用 Counter 来报告其统计信息。

与给定输出键关联的所有中间值随后由框架分组,并传递给 Reducer 以确定最终输出。用户可以通过 Job.setGroupingComparatorClass(Class) 指定 Comparator 来控制分组。

Mapper 输出按每个 Reducer 排序,然后分区。分区的总数与作业的归约任务数相同。用户可以通过实现自定义 Partitioner 来控制哪些键(以及记录)进入哪个 Reducer

用户可以通过 Job.setCombinerClass(Class) 选择性地指定一个 combiner,以对中间输出执行本地聚合,这有助于减少从 Mapper 传输到 Reducer 的数据量。

中间排序输出始终存储为简单的 (key-len, key, value-len, value) 格式。应用程序可以通过 Configuration 控制是否以及如何压缩中间输出,以及要使用的 CompressionCodec

有多少个映射?

映射的数量通常由输入的总大小(即输入文件的块的总数)决定。

每个节点的映射的并行度合适级别似乎是 10-100 个映射,尽管对于非常轻量级的映射任务,它已被设置为 300 个映射。任务设置需要一段时间,因此最好让映射至少执行一分钟。

因此,如果您预计有 10TB 的输入数据,并且块大小为 128MB,您将最终获得 82,000 个映射,除非使用 Configuration.set(MRJobConfig.NUM_MAPS, int)(它仅向框架提供提示)将其设置为更高的值。

Reducer

Reducer 将一组共享一个键的中间值减少为一组较小的值。

作业的 reduce 数量由用户通过 Job.setNumReduceTasks(int) 设置。

总体而言,Reducer 实现通过 Job.setReducerClass(Class) 方法传递给作业的 Job,并且可以覆盖它以初始化它们自己。然后,框架为分组输入中的每个 <key, (list of values)> 对调用 reduce(WritableComparable, Iterable<Writable>, Context) 方法。然后,应用程序可以覆盖 cleanup(Context) 方法以执行任何所需的清理。

Reducer 有 3 个主要阶段:shuffle、sort 和 reduce。

Shuffle

Reducer 的输入是映射器的排序输出。在此阶段,框架通过 HTTP 获取所有映射器的输出的相关分区。

Sort

框架在此阶段按键对 Reducer 输入进行分组(因为不同的映射器可能输出相同的键)。

Shuffle 和排序阶段同时发生;在获取映射输出时,它们被合并。

辅助排序

如果对中间键进行分组的等价规则需要与在归约前对键进行分组的规则不同,则可以通过 Job.setSortComparatorClass(Class) 指定一个 Comparator。由于 Job.setGroupingComparatorClass(Class) 可用于控制中间键的组合方式,因此可以将它们结合使用来模拟“对值进行辅助排序”。

归约

在此阶段,reduce(WritableComparable, Iterable<Writable>, Context) 方法针对分组输入中的每个 <key, (list of values)> 对调用。

归约任务的输出通常通过 Context.write(WritableComparable, Writable) 写入 FileSystem

应用程序可以使用 Counter 来报告其统计信息。

Reducer 的输出未排序

需要多少个归约?

归约的正确数量似乎是 0.951.75 乘以(<节点数> * <每个节点的最大容器数>)。

使用 0.95,所有归约都可以立即启动,并在映射完成时开始传输映射输出。使用 1.75,速度较快的节点将完成其第一轮归约,并启动第二波归约,从而更好地执行负载平衡。

增加归约的数量会增加框架开销,但会增加负载平衡并降低故障成本。

上述缩放因子略小于整数,以便在框架中为推测任务和失败任务保留一些归约槽。

归约器 NONE

如果不需要归约,可以将归约任务的数量设置为

在这种情况下,映射任务的输出将直接进入 FileSystem,进入由 FileOutputFormat.setOutputPath(Job, Path) 设置的输出路径。在将映射输出写入 FileSystem 之前,框架不会对其进行排序。

分区器

Partitioner 对键空间进行分区。

分区器控制中间映射输出的键的分区。键(或键的子集)用于派生分区,通常通过哈希函数。分区总数与作业的归约任务数相同。因此,这控制了中间键(以及记录)将被发送到哪个归约任务(m 个)进行归约。

HashPartitioner 是默认的 Partitioner

计数器

Counter 是 MapReduce 应用程序报告其统计信息的工具。

MapperReducer 实现可以使用 Counter 报告统计信息。

Hadoop MapReduce 附带一个 ,其中包含通常有用的映射器、归约器和分区器。

作业配置

Job 表示 MapReduce 作业配置。

Job 是用户向 Hadoop 框架描述要执行的 MapReduce 作业的主要接口。框架会尝试按照 Job 描述的忠实执行作业,但

Job 通常用于指定 Mapper、组合器(如果有)、PartitionerReducerInputFormatOutputFormat 实现。FileInputFormat 指示输入文件集合 (FileInputFormat.setInputPaths(Job, Path…)/ FileInputFormat.addInputPath(Job, Path)) 和 (FileInputFormat.setInputPaths(Job, String…)/ FileInputFormat.addInputPaths(Job, String)),以及应该写入输出文件的位置 (FileOutputFormat.setOutputPath(Path))。

可选地,Job 用于指定作业的其他高级方面,例如要使用的 Comparator、要放入 DistributedCache 中的文件、是否压缩中间和/或作业输出(以及如何压缩)、作业任务是否可以以推测方式执行 (setMapSpeculativeExecution(boolean))/ setReduceSpeculativeExecution(boolean))、每个任务的最大尝试次数 (setMaxMapAttempts(int)/ setMaxReduceAttempts(int)) 等。

当然,用户可以使用 Configuration.set(String, String)/ Configuration.get(String) 来设置/获取应用程序所需的任意参数。但是,对于大量(只读)数据,请使用 DistributedCache

任务执行和环境

MRAppMaster 在单独的 jvm 中将 Mapper/Reducer任务作为子进程执行。

子任务继承父 MRAppMaster 的环境。用户可以通过 mapreduce.{map|reduce}.java.optsJob 中的配置参数向子 jvm 指定其他选项,例如通过 -Djava.library.path=<> 等指定运行时链接器搜索共享库的非标准路径。如果 mapreduce.{map|reduce}.java.opts 参数包含符号 @taskid@,则用 MapReduce 任务的 taskid 值对其进行插值。

下面是一个带有多个参数和替换的示例,显示 jvm GC 日志记录和无密码 JVM JMX 代理的启动,以便它可以连接到 jconsole 等来监视子内存、线程和获取线程转储。它还将 map 和 reduce 子 jvm 的最大堆大小分别设置为 512MB 和 1024MB。它还向子 jvm 的 java.library.path 添加了一个附加路径。

<property>
  <name>mapreduce.map.java.opts</name>
  <value>
  -Xmx512M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@[email protected]
  -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false
  </value>
</property>

<property>
  <name>mapreduce.reduce.java.opts</name>
  <value>
  -Xmx1024M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@[email protected]
  -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false
  </value>
</property>

内存管理

用户/管理员还可以使用 mapreduce.{map|reduce}.memory.mb 指定启动的子任务的最大虚拟内存,以及它递归启动的任何子进程。请注意,此处设置的值是每个进程的限制。mapreduce.{map|reduce}.memory.mb 的值应以兆字节 (MB) 为单位指定。此外,该值必须大于或等于传递给 JavaVM 的 -Xmx,否则 VM 可能无法启动。

注意:mapreduce.{map|reduce}.java.opts 仅用于配置 MRAppMaster 启动的子任务。守护程序的内存选项配置记录在 配置 Hadoop 守护程序的环境 中。

还可以配置框架某些部分可用的内存。在映射和归约任务中,性能可能会受到影响,具体取决于影响操作并发性和数据命中磁盘频率的参数调整。监视作业的文件系统计数器(尤其是与映射字节计数和归约字节计数相关的计数器)对于调整这些参数非常有价值。

映射参数

从映射中发出的记录将被序列化到缓冲区中,元数据将被存储到会计缓冲区中。如以下选项中所述,当序列化缓冲区或元数据超过阈值时,缓冲区的内容将被排序并写入磁盘,同时映射继续输出记录。如果在溢出进行期间任一缓冲区完全填满,映射线程将被阻塞。当映射完成时,任何剩余记录都将被写入磁盘,所有磁盘段都将合并到一个文件中。最大程度地减少磁盘溢出次数可以减少映射时间,但更大的缓冲区也会减少映射器可用的内存。

名称 类型 说明
mapreduce.task.io.sort.mb int 从映射中发出的记录的序列化和会计缓冲区的累积大小(以兆字节为单位)。
mapreduce.map.sort.spill.percent float 序列化缓冲区的软限制。一旦达到,一个线程将开始在后台将内容溢出到磁盘。

其他说明

  • 如果在溢出进行时任一溢出阈值被超过,收集将持续到溢出完成。例如,如果 mapreduce.map.sort.spill.percent 设置为 0.33,并且在溢出运行时缓冲区的剩余部分被填满,则下一个溢出将包括所有收集的记录,或缓冲区的 0.66,并且不会生成其他溢出。换句话说,阈值定义的是触发器,而不是阻塞。

  • 大于序列化缓冲区的记录将首先触发溢出,然后溢出到一个单独的文件。此记录是否首先通过合并器是未定义的。

混洗/归约参数

如前所述,每个归约通过 HTTP 将分区程序分配给它的输出提取到内存中,并定期将这些输出合并到磁盘中。如果已启用映射输出的中间压缩,则每个输出都会解压到内存中。以下选项会影响归约之前这些合并到磁盘的频率,以及在归约期间分配给映射输出的内存。

名称 类型 说明
mapreduce.task.io.soft.factor int 指定同时要合并的磁盘段数。它限制了合并期间打开的文件和压缩编解码器的数量。如果文件数超过此限制,则合并将分多批进行。虽然此限制也适用于映射,但大多数作业都应配置为不太可能达到此限制。
mapreduce.reduce.merge.inmem.thresholds int 在合并到磁盘之前提取到内存中的已排序映射输出的数量。与前一个注释中的溢出阈值类似,这不是定义分区单位,而是触发器。在实践中,这通常设置得非常高 (1000) 或禁用 (0),因为在内存中合并段通常比从磁盘合并更便宜(请参阅此表后面的注释)。此阈值仅影响混洗期间内存中合并的频率。
mapreduce.reduce.shuffle.merge.percent float 在启动内存中合并之前提取的映射输出的内存阈值,表示为分配给在内存中存储映射输出的内存的百分比。由于无法容纳在内存中的映射输出可能会停滞,因此将此值设置得较高可能会降低提取和合并之间的并行性。相反,对于输入可以完全容纳在内存中的归约,高达 1.0 的值是有效的。此参数仅影响混洗期间内存中合并的频率。
mapreduce.reduce.shuffle.input.buffer.percent float 可分配用于在 shuffle 期间存储映射输出的内存百分比(相对于通常在 mapreduce.reduce.java.opts 中指定的最大堆大小)。虽然应为框架预留一些内存,但通常将此值设置得足够高以存储大量且众多的映射输出是有利的。
mapreduce.reduce.input.buffer.percent float 在 reduce 期间,映射输出可能保留的内存百分比(相对于最大堆大小)。当 reduce 开始时,映射输出将合并到磁盘,直到剩余输出低于此定义的资源限制。默认情况下,所有映射输出都在 reduce 开始之前合并到磁盘,以最大化 reduce 可用的内存。对于内存密集度较低的 reduce,应增加此值以避免访问磁盘。

其他说明

  • 如果映射输出大于分配给复制映射输出的内存的 25%,则它将直接写入磁盘,而无需首先通过内存进行暂存。

  • 使用合并器运行时,关于高合并阈值和大型缓冲区的推理可能不成立。对于在获取所有映射输出之前启动的合并,合并器在溢出到磁盘时运行。在某些情况下,可以通过投入资源合并映射输出(使磁盘溢出变小并并行溢出和获取)来获得更好的 reduce 时间,而不是激进地增加缓冲区大小。

  • 在将内存中的映射输出合并到磁盘以开始 reduce 时,如果由于有要溢出的段和磁盘上至少有 mapreduce.task.io.sort.factor 个段而需要中间合并,则内存中的映射输出将成为中间合并的一部分。

配置的参数

以下属性在每个任务执行的作业配置中本地化

名称 类型 说明
mapreduce.job.id 字符串 作业 ID
mapreduce.job.jar 字符串 作业目录中的 job.jar 位置
mapreduce.job.local.dir 字符串 作业特定的共享暂存空间
mapreduce.task.id 字符串 任务 ID
mapreduce.task.attempt.id 字符串 任务尝试 ID
mapreduce.task.is.map 布尔值 这是映射任务吗
mapreduce.task.partition int 作业中任务的 ID
mapreduce.map.input.file 字符串 映射读取的文件名
mapreduce.map.input.start 长整型 映射输入拆分的开始偏移量
mapreduce.map.input.length 长整型 映射输入拆分中的字节数
mapreduce.task.output.dir 字符串 任务的临时输出目录

注意:在流式作业执行期间,“mapreduce”参数的名称会发生转换。句点 ( . ) 会变成下划线 ( _ )。例如,mapreduce.job.id 会变成 mapreduce_job_id,而 mapreduce.job.jar 会变成 mapreduce_job_jar。要在流式作业的映射器/还原器中获取值,请使用带下划线的参数名称。

任务日志

NodeManager 会读取任务的标准输出 (stdout) 和错误 (stderr) 流以及 syslog,并将其记录到 ${HADOOP_LOG_DIR}/userlogs 中。

分发库

还可使用 DistributedCache 为映射和/或还原任务分发 jar 和本机库。子 jvm 始终会将其当前工作目录添加到 java.library.pathLD_LIBRARY_PATH 中。因此,可以通过 System.loadLibrarySystem.load 加载缓存的库。有关如何通过分布式缓存加载共享库的更多详细信息,请参阅 本机库 中的说明。

作业提交和监控

Job 是用户作业与 ResourceManager 交互的主要接口。

Job 提供了提交作业、跟踪其进度、访问组件任务的报告和日志、获取 MapReduce 集群的状态信息等功能。

作业提交过程涉及

  1. 检查作业的输入和输出规范。

  2. 计算作业的 InputSplit 值。

  3. 如有必要,设置作业的 DistributedCache 的必要会计信息。

  4. 将作业的 jar 和配置复制到 FileSystem 上的 MapReduce 系统目录中。

  5. 将作业提交到 ResourceManager,并根据需要监控其状态。

作业历史记录文件也会记录到用户指定的目录 mapreduce.jobhistory.intermediate-done-dirmapreduce.jobhistory.done-dir,默认为作业输出目录。

用户可以使用以下命令在指定目录中查看历史日志摘要:$ mapred job -history output.jhist 此命令将打印作业详细信息、失败和已终止提示详细信息。可以使用以下命令查看有关作业的更多详细信息,例如成功任务和为每个任务进行的任务尝试:$ mapred job -history all output.jhist

通常,用户使用 Job 来创建应用程序、描述作业的各个方面、提交作业以及监视其进度。

作业控制

用户可能需要链接 MapReduce 作业来完成无法通过单个 MapReduce 作业完成的复杂任务。这相当容易,因为作业的输出通常会转到分布式文件系统,而输出反过来又可以用作下一个作业的输入。

但是,这也意味着确保作业完成(成功/失败)的责任完全在于客户端。在这种情况下,各种作业控制选项是

作业输入

InputFormat 描述了 MapReduce 作业的输入规范。

MapReduce 框架依赖于作业的 InputFormat

  1. 验证作业的输入规范。

  2. 将输入文件拆分为逻辑 InputSplit 实例,然后将每个实例分配给单个 Mapper

  3. 提供 RecordReader 实现,用于从逻辑 InputSplit 中收集输入记录,以便 Mapper 进行处理。

基于文件的 InputFormat 实现(通常是 FileInputFormat 的子类)的默认行为是根据输入文件的总大小(以字节为单位)将输入拆分为 逻辑 InputSplit 实例。但是,输入文件的 FileSystem 块大小被视为输入拆分的上限。可以通过 mapreduce.input.fileinputformat.split.minsize 设置拆分大小的下限。

显然,基于输入大小的逻辑拆分对于许多应用程序来说是不够的,因为必须遵守记录边界。在这种情况下,应用程序应该实现一个 RecordReader,它负责遵守记录边界,并向各个任务提供逻辑 InputSplit 的面向记录的视图。

TextInputFormat 是默认的 InputFormat

如果 TextInputFormat 是给定作业的 InputFormat,则框架会检测带有 .gz 扩展名的输入文件,并使用适当的 CompressionCodec 自动解压缩它们。但是,必须注意,带有上述扩展名的压缩文件不能被 拆分,并且每个压缩文件都由单个映射器整体处理。

InputSplit

InputSplit 表示由单个 Mapper 处理的数据。

通常,InputSplit 呈现输入的字节导向视图,而 RecordReader 负责处理和呈现面向记录的视图。

FileSplit 是默认的 InputSplit。它将 mapreduce.map.input.file 设置为逻辑拆分输入文件的路径。

RecordReader

RecordReaderInputSplit 读取 <key, value> 对。

通常,RecordReader 会转换由 InputSplit 提供的输入的字节导向视图,并向 Mapper 实现呈现面向记录的视图以进行处理。因此,RecordReader 承担处理记录边界的责任,并向任务呈现键和值。

作业输出

OutputFormat 描述 MapReduce 作业的输出规范。

MapReduce 框架依赖于作业的 OutputFormat

  1. 验证作业的输出规范;例如,检查输出目录是否已存在。

  2. 提供用于写入作业输出文件的 RecordWriter 实现。输出文件存储在 FileSystem 中。

TextOutputFormat 是默认的 OutputFormat

OutputCommitter

OutputCommitter 描述 MapReduce 作业的任务输出提交。

MapReduce 框架依赖于作业的 OutputCommitter

  1. 在初始化期间设置作业。例如,在作业初始化期间为作业创建临时输出目录。作业设置由一个单独的任务在作业处于 PREP 状态且在初始化任务后完成。一旦设置任务完成,作业将移至 RUNNING 状态。

  2. 作业完成后清理作业。例如,作业完成后删除临时输出目录。作业清理在作业结束时由单独的任务完成。清理任务完成后,作业被声明为成功/失败/已终止。

  3. 设置任务临时输出。任务设置在任务初始化期间作为同一任务的一部分完成。

  4. 检查任务是否需要提交。如果任务不需要提交,则避免提交过程。

  5. 提交任务输出。任务完成后,任务将根据需要提交其输出。

  6. 放弃任务提交。如果任务已失败/已终止,则将清理输出。如果任务无法清理(在异常块中),则将启动具有相同尝试 ID 的单独任务来执行清理。

FileOutputCommitter 是默认的 OutputCommitter。作业设置/清理任务占用映射或归约容器,无论哪个在 NodeManager 上可用。作业清理任务、任务清理任务和作业设置任务具有最高优先级,并按此顺序排列。

任务副作用文件

在某些应用程序中,组件任务需要创建和/或写入副作用文件,这些文件不同于实际作业输出文件。

在这种情况下,同一 MapperReducer 的两个实例可能同时运行(例如,推测性任务),并尝试在 FileSystem 上打开和/或写入同一文件(路径),因此可能会出现问题。因此,应用程序编写者必须为每个任务尝试(使用 attemptid,例如 attempt_200709221812_0001_m_000000_0)选择唯一名称,而不仅仅是每个任务。

为了避免这些问题,当 OutputCommitterFileOutputCommitter 时,MapReduce 框架为每个任务尝试在 FileSystem 上维护一个特殊的 ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid} 子目录,可通过 ${mapreduce.task.output.dir} 访问,其中存储任务尝试的输出。在任务尝试成功完成后,${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid}(仅)中的文件被提升${mapreduce.output.fileoutputformat.outputdir}。当然,框架会丢弃失败的任务尝试的子目录。此过程对应用程序完全透明。

应用程序编写人员可以通过在执行任务期间在 ${mapreduce.task.output.dir} 中创建任何所需的辅助文件来利用此功能,通过 FileOutputFormat.getWorkOutputPath(Conext),框架会以类似的方式为成功的任务尝试提升这些文件,从而无需为每个任务尝试选择唯一路径。

注意:在执行特定任务尝试期间 ${mapreduce.task.output.dir} 的值实际上是 ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_{$taskid},此值由 MapReduce 框架设置。因此,只需在 MapReduce 任务返回的路径中创建任何辅助文件,即可利用此功能 FileOutputFormat.getWorkOutputPath(Conext)

对于 reducer=NONE 的作业映射(即 0 个 reduce),整个讨论都成立,因为在这种情况下,映射的输出直接进入 HDFS。

RecordWriter

RecordWriter 将输出 <key, value> 对写入输出文件。

RecordWriter 实现将作业输出写入 FileSystem

其他有用功能

将作业提交到队列

用户将作业提交到队列。队列作为作业集合,允许系统提供特定功能。例如,队列使用 ACL 来控制可以向其提交作业的用户。队列预计主要由 Hadoop 调度程序使用。

Hadoop 配置为使用一个名为“default”的单一强制队列。队列名称在 Hadoop 站点配置的 mapreduce.job.queuename 属性中定义。一些作业调度程序(例如 容量调度程序)支持多个队列。

作业通过 mapreduce.job.queuename 属性或通过 Configuration.set(MRJobConfig.QUEUE_NAME, String) API 定义需要提交到的队列。设置队列名称是可选的。如果提交作业时没有关联的队列名称,则将其提交到“default”队列。

计数器

Counters 表示全局计数器,由 MapReduce 框架或应用程序定义。每个 Counter 可以是任何 Enum 类型。特定 Enum 的计数器被归入 Counters.Group 类型的组中。

应用程序可以定义任意的 CountersEnum 类型),并通过 map 和/或 reduce 方法中的 Counters.incrCounter(Enum, long) 或 Counters.incrCounter(String, String, long) 更新它们。这些计数器随后由框架全局聚合。

DistributedCache

DistributedCache 有效地分发应用程序特定的、大型的、只读文件。

DistributedCache 是由 MapReduce 框架提供的用于缓存应用程序所需文件(文本、存档、jar 等)的工具。

应用程序通过 Job 中的 url(hdfs://) 指定要缓存的文件。DistributedCache 假设通过 hdfs:// url 指定的文件已经存在于 FileSystem 中。

在该节点上执行任何作业任务之前,框架会将必要的文件复制到工作节点。其效率源于以下事实:每个作业只复制一次文件,并且能够缓存已解压缩的存档。

DistributedCache 跟踪缓存文件的修改时间戳。显然,在作业执行期间,应用程序或外部不应修改缓存文件。

DistributedCache 可用于分发简单的、只读的数据/文本文件和更复杂的文件类型,例如存档和 jar。存档(zip、tar、tgz 和 tar.gz 文件)在工作节点处解压缩。文件具有执行权限设置。

可以通过设置属性 mapreduce.job.cache.{files |archives} 来分发文件/存档。如果必须分发多个文件/存档,则可以将它们作为逗号分隔的路径添加。还可以通过 API Job.addCacheFile(URI)/ Job.addCacheArchive(URI)Job.setCacheFiles(URI[])/ Job.setCacheArchives(URI[]) 设置属性,其中 URI 的形式为 hdfs://host:port/absolute-path#link-name。在流处理中,可以通过命令行选项 -cacheFile/-cacheArchive 分发文件。

DistributedCache 还可以用作地图和/或缩减任务中使用的基本软件分发机制。它可用于分发 jar 和本机库。 Job.addArchiveToClassPath(Path)Job.addFileToClassPath(Path) api 可用于缓存文件/jar,并将它们添加到子 jvm 的类路径中。通过设置配置属性 mapreduce.job.classpath.{files |archives} 也可以执行相同的操作。类似地,符号链接到任务工作目录的缓存文件可用于分发本机库并加载它们。

私有和公有 DistributedCache 文件

DistributedCache 文件可以是私有的或公有的,这决定了它们如何在工作节点上共享。

  • “私有”DistributedCache 文件被缓存到需要这些文件的用户私有的本地目录中。这些文件仅由特定用户的全部任务和作业共享,并且无法被工作节点上其他用户的作业访问。DistributedCache 文件通过其在文件系统(通常是 HDFS)中上传文件时的权限而变为私有。如果文件没有可世界读取的访问权限,或者通向该文件的目录路径没有可世界执行的查找访问权限,则该文件将变为私有。

  • “公有”DistributedCache 文件被缓存到全局目录中,并且文件访问被设置为对所有用户公开可见。这些文件可以由工作节点上所有用户的任务和作业共享。DistributedCache 文件通过其在文件系统(通常是 HDFS)中上传文件时的权限而变为公有。如果文件具有可世界读取的访问权限,并且通向该文件的目录路径具有可世界执行的查找访问权限,则该文件将变为公有。换句话说,如果用户打算向所有用户公开提供文件,则必须将文件权限设置为可世界读取,并且通向该文件的路径上的目录权限必须设置为可世界执行。

分析

分析是一个实用工具,可为部分映射和缩减获取内置 java 分析器的代表性(2 或 3)样本。

用户可以通过设置配置属性mapreduce.task.profile来指定系统是否应收集作业中某些任务的分析器信息。可以使用 api Configuration.set(MRJobConfig.TASK_PROFILE, boolean) 设置值。如果将值设置为true,则启用任务分析。分析器信息存储在用户日志目录中。默认情况下,未为作业启用分析。

一旦用户配置需要分析,她/他可以使用配置属性mapreduce.task.profile.{maps|reduces}来设置要分析的 MapReduce 任务范围。可以使用 api Configuration.set(MRJobConfig.NUM_{MAP|REDUCE}_PROFILES, String) 设置值。默认情况下,指定范围为0-2

用户还可以通过设置配置属性mapreduce.task.profile.params来指定分析器配置参数。可以使用 api Configuration.set(MRJobConfig.TASK_PROFILE_PARAMS, String) 指定值。如果字符串包含%s,则在任务运行时,它将被替换为分析输出文件的文件名。这些参数通过命令行传递给任务子 JVM。分析参数的默认值为-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s

调试

MapReduce 框架提供了一个运行用户提供的脚本以进行调试的工具。当 MapReduce 任务失败时,用户可以运行调试脚本,例如处理任务日志。该脚本可以访问任务的 stdout 和 stderr 输出、syslog 和 jobconf。调试脚本的 stdout 和 stderr 的输出显示在控制台诊断中,也作为作业 UI 的一部分显示。

在以下部分中,我们将讨论如何使用作业提交调试脚本。脚本文件需要分发并提交到框架。

如何分发脚本文件

用户需要使用 DistributedCache分发符号链接到脚本文件。

如何提交脚本

提交调试脚本的一种快速方法是分别为属性 mapreduce.map.debug.scriptmapreduce.reduce.debug.script 设置值,以调试映射和归约任务。还可以使用 API Configuration.set(MRJobConfig.MAP_DEBUG_SCRIPT, String)Configuration.set(MRJobConfig.REDUCE_DEBUG_SCRIPT, String) 设置这些属性。在流模式中,可以使用命令行选项 -mapdebug-reducedebug 提交调试脚本,以分别调试映射和归约任务。

脚本的参数是任务的 stdout、stderr、syslog 和 jobconf 文件。在 MapReduce 任务失败的节点上运行的调试命令为

$script $stdout $stderr $syslog $jobconf

管道程序将 c++ 程序名称作为命令的第五个参数。因此,对于管道程序,命令为

$script $stdout $stderr $syslog $jobconf $program

默认行为

对于管道,将运行默认脚本以在 gdb 下处理核心转储,打印堆栈跟踪并提供有关正在运行线程的信息。

数据压缩

Hadoop MapReduce 为应用程序编写者提供了指定中间映射输出和作业输出(即归约输出)的压缩功能。它还捆绑了 CompressionCodec 实现,用于 zlib 压缩算法。还支持 gzipbzip2snappylz4 文件格式。

出于性能(zlib)和 Java 库不可用等原因,Hadoop 还提供了上述压缩编解码器的本机实现。有关其用法和可用性的更多详细信息,请参阅 此处

中间输出

应用程序可以通过 Configuration.set(MRJobConfig.MAP_OUTPUT_COMPRESS, boolean) api 控制中间映射输出的压缩,并可以通过 Configuration.set(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC, Class) api 使用要使用的 CompressionCodec

作业输出

应用程序可以通过 FileOutputFormat.setCompressOutput(Job, boolean) api 控制作业输出的压缩,并且可以通过 FileOutputFormat.setOutputCompressorClass(Job, Class) api 指定要使用的 CompressionCodec

如果作业输出要存储在 SequenceFileOutputFormat 中,则可以通过 SequenceFileOutputFormat.setOutputCompressionType(Job, SequenceFile.CompressionType) api 指定所需的 SequenceFile.CompressionType(即 RECORD / BLOCK - 默认为 RECORD)。

跳过错误记录

Hadoop 提供了一个选项,在处理映射输入时可以跳过某些错误输入记录。应用程序可以通过 SkipBadRecords 类控制此功能。

当映射任务在某些输入上确定性崩溃时,可以使用此功能。这通常是由于映射函数中的错误造成的。通常,用户必须修复这些错误。但是,有时这是不可能的。错误可能出现在第三方库中,例如无法获得其源代码的库。在这种情况下,即使经过多次尝试,任务也永远无法成功完成,并且作业将失败。使用此功能,只会丢失围绕错误记录的小部分数据,这对于某些应用程序(例如对非常大的数据执行统计分析的应用程序)可能是可以接受的。

默认情况下,此功能处于禁用状态。要启用它,请参阅 SkipBadRecords.setMapperMaxSkipRecords(Configuration, long)SkipBadRecords.setReducerMaxSkipGroups(Configuration, long)

启用此功能后,在一定数量的映射失败后,框架将进入“跳过模式”。有关更多详细信息,请参阅 SkipBadRecords.setAttemptsToStartSkipping(Configuration, int)。在“跳过模式”中,映射任务将维护正在处理的记录范围。为此,框架依赖于已处理记录计数器。请参阅 SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDSSkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS。此计数器使框架能够知道已成功处理了多少条记录,因此,知道哪个记录范围导致任务崩溃。在进一步尝试中,将跳过此范围的记录。

跳过的记录数取决于应用程序增加已处理记录计数器的频率。建议在处理完每条记录后增加此计数器。在通常批量处理其处理的某些应用程序中,这可能无法实现。在这种情况下,框架可能会跳过错误记录周围的其他记录。用户可以通过 SkipBadRecords.setMapperMaxSkipRecords(Configuration, long)SkipBadRecords.setReducerMaxSkipGroups(Configuration, long) 控制跳过的记录数。框架尝试使用类似二分搜索的方法缩小跳过记录的范围。跳过的范围被分成两半,只执行一半。在后续失败中,框架会找出哪一半包含错误记录。任务将被重新执行,直到达到可接受的跳过值或用尽所有任务尝试。要增加任务尝试次数,请使用 Job.setMaxMapAttempts(int)Job.setMaxReduceAttempts(int)

跳过的记录将以序列文件格式写入 HDFS,以便以后分析。可以通过 SkipBadRecords.setSkipOutputPath(JobConf, Path) 更改位置。

示例:WordCount v2.0

这是一个更完整的 WordCount,它使用了到目前为止我们讨论的 MapReduce 框架提供的许多功能。

这需要 HDFS 启动并运行,特别是对于 DistributedCache 相关功能。因此,它仅适用于 伪分布式完全分布式 Hadoop 安装。

源代码

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.StringUtils;

public class WordCount2 {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    static enum CountersEnum { INPUT_WORDS }

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    private boolean caseSensitive;
    private Set<String> patternsToSkip = new HashSet<String>();

    private Configuration conf;
    private BufferedReader fis;

    @Override
    public void setup(Context context) throws IOException,
        InterruptedException {
      conf = context.getConfiguration();
      caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);
      if (conf.getBoolean("wordcount.skip.patterns", false)) {
        URI[] patternsURIs = Job.getInstance(conf).getCacheFiles();
        for (URI patternsURI : patternsURIs) {
          Path patternsPath = new Path(patternsURI.getPath());
          String patternsFileName = patternsPath.getName().toString();
          parseSkipFile(patternsFileName);
        }
      }
    }

    private void parseSkipFile(String fileName) {
      try {
        fis = new BufferedReader(new FileReader(fileName));
        String pattern = null;
        while ((pattern = fis.readLine()) != null) {
          patternsToSkip.add(pattern);
        }
      } catch (IOException ioe) {
        System.err.println("Caught exception while parsing the cached file '"
            + StringUtils.stringifyException(ioe));
      }
    }

    @Override
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      String line = (caseSensitive) ?
          value.toString() : value.toString().toLowerCase();
      for (String pattern : patternsToSkip) {
        line = line.replaceAll(pattern, "");
      }
      StringTokenizer itr = new StringTokenizer(line);
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
        Counter counter = context.getCounter(CountersEnum.class.getName(),
            CountersEnum.INPUT_WORDS.toString());
        counter.increment(1);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
    String[] remainingArgs = optionParser.getRemainingArgs();
    if ((remainingArgs.length != 2) && (remainingArgs.length != 4)) {
      System.err.println("Usage: wordcount <in> <out> [-skip skipPatternFile]");
      System.exit(2);
    }
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount2.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    List<String> otherArgs = new ArrayList<String>();
    for (int i=0; i < remainingArgs.length; ++i) {
      if ("-skip".equals(remainingArgs[i])) {
        job.addCacheFile(new Path(remainingArgs[++i]).toUri());
        job.getConfiguration().setBoolean("wordcount.skip.patterns", true);
      } else {
        otherArgs.add(remainingArgs[i]);
      }
    }
    FileInputFormat.addInputPath(job, new Path(otherArgs.get(0)));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1)));

    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

示例运行

示例文本文件作为输入

$ bin/hadoop fs -ls /user/joe/wordcount/input/
/user/joe/wordcount/input/file01
/user/joe/wordcount/input/file02

$ bin/hadoop fs -cat /user/joe/wordcount/input/file01
Hello World, Bye World!

$ bin/hadoop fs -cat /user/joe/wordcount/input/file02
Hello Hadoop, Goodbye to hadoop.

运行应用程序

$ bin/hadoop jar wc.jar WordCount2 /user/joe/wordcount/input /user/joe/wordcount/output

输出

$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000
Bye 1
Goodbye 1
Hadoop, 1
Hello 2
World! 1
World, 1
hadoop. 1
to 1

请注意,输入与我们查看的第一个版本不同,以及它们如何影响输出。

现在,让我们通过 DistributedCache 插入一个列出要忽略的单词模式的模式文件。

$ bin/hadoop fs -cat /user/joe/wordcount/patterns.txt
\.
\,
\!
to

再次运行它,这次使用更多选项

$ bin/hadoop jar wc.jar WordCount2 -Dwordcount.case.sensitive=true /user/joe/wordcount/input /user/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt

正如预期的那样,输出

$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000
Bye 1
Goodbye 1
Hadoop 1
Hello 2
World 2
hadoop 1

再次运行它,这次关闭大小写敏感性

$ bin/hadoop jar wc.jar WordCount2 -Dwordcount.case.sensitive=false /user/joe/wordcount/input /user/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt

果然,输出

$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000
bye 1
goodbye 1
hadoop 2
hello 2
world 2

亮点

第二个版本的 WordCount 通过使用 MapReduce 框架提供的一些功能改进了前一个版本

  • 演示了应用程序如何在 Mapper(和 Reducer)实现的 setup 方法中访问配置参数。

  • 演示了如何使用 DistributedCache 分发作业所需的只读数据。在这里,它允许用户指定在计数时要跳过的单词模式。

  • 演示了 GenericOptionsParser 处理通用 Hadoop 命令行选项的实用性。

  • 演示了应用程序如何使用 Counters 以及如何设置传递给 map(和 reduce)方法的特定于应用程序的状态信息。

Java 和 JNI 是 Oracle America, Inc. 在美国和其他国家/地区的商标或注册商标。