Hadoop 流式处理

Hadoop Streaming

Hadoop streaming 是 Hadoop 发行版附带的一个实用程序。该实用程序允许您使用任何可执行文件或脚本作为映射器和/或归约器来创建和运行 Map/Reduce 作业。例如

mapred streaming \
  -input myInputDirs \
  -output myOutputDir \
  -mapper /bin/cat \
  -reducer /usr/bin/wc

Streaming 如何工作

在上述示例中,映射器和归约器都是可执行文件,它们从 stdin(逐行)读取输入,并将输出发送到 stdout。该实用程序将创建一个 Map/Reduce 作业,将作业提交到适当的集群,并监控作业的进度,直到作业完成。

当为映射器指定可执行文件时,每个映射器任务将在映射器初始化时将可执行文件作为单独的进程启动。随着映射器任务的运行,它会将输入转换为行,并将行馈送到进程的 stdin。同时,映射器从进程的 stdout 收集面向行的输出,并将每行转换为一个键/值对,该键/值对作为映射器的输出收集。默认情况下,一行前缀直到第一个制表符字符key,而行的其余部分(不包括制表符字符)将是value。如果行中没有制表符字符,则整行被视为键,而值为空。但是,可以通过设置-inputformat命令选项进行自定义,如下所述。

当为归约器指定可执行文件时,每个归约器任务将在归约器初始化时将可执行文件作为单独的进程启动。随着归约器任务的运行,它会将输入键/值对转换为行,并将行馈送到进程的 stdin。同时,归约器从进程的 stdout 收集面向行的输出,并将每行转换为一个键/值对,该键/值对作为归约器的输出收集。默认情况下,一行前缀直到第一个制表符字符是键,而行的其余部分(不包括制表符字符)是值。但是,可以通过设置-outputformat命令选项进行自定义,如下所述。

这是 Map/Reduce 框架与流映射器/归约器之间的通信协议的基础。

用户可以将stream.non.zero.exit.is.failure指定为truefalse,以使退出状态为非零的流任务分别为失败成功。默认情况下,退出状态为非零的流任务被视为失败任务。

流命令选项

流支持流命令选项以及通用命令选项。通用命令行语法如下所示。

注意:务必在流选项之前放置通用选项,否则命令将失败。有关示例,请参阅使存档可供任务使用

mapred streaming [genericOptions] [streamingOptions]

Hadoop 流命令选项在此列出

参数 可选/必需 说明
-input 目录名或文件名 必需 映射器的输入位置
-output 目录名 必需 归约器的输出位置
-mapper 可执行文件或 JavaClassName 可选 映射器可执行文件。如果未指定,则使用 IdentityMapper 作为默认值
-reducer 可执行文件或 JavaClassName 可选 归约器可执行文件。如果未指定,则使用 IdentityReducer 作为默认值
-file 文件名 可选 使映射器、归约器或组合器可执行文件在计算节点上本地可用
-inputformat JavaClassName 可选 您提供的类应返回 Text 类的键/值对。如果未指定,则使用 TextInputFormat 作为默认值
-outputformat JavaClassName 可选 您提供的类应采用 Text 类的键/值对。如果未指定,则使用 TextOutputformat 作为默认值
-partitioner JavaClassName 可选 确定将键发送到哪个归约器的类
-combiner streamingCommand 或 JavaClassName 可选 用于映射输出的组合器可执行文件
-cmdenv name=value 可选 将环境变量传递给流命令
-inputreader 可选 出于向后兼容性:指定记录读取器类(而不是输入格式类)
-verbose 可选 详细输出
-lazyOutput 可选 延迟创建输出。例如,如果输出格式基于 FileOutputFormat,则仅在首次调用 Context.write 时创建输出文件
-numReduceTasks 可选 指定归约器的数量
-mapdebug 可选 当映射任务失败时调用的脚本
-reducedebug 可选 当归约任务失败时调用的脚本

指定 Java 类作为映射器/归约器

您可以提供 Java 类作为映射器和/或归约器。

mapred streaming \
  -input myInputDirs \
  -output myOutputDir \
  -inputformat org.apache.hadoop.mapred.KeyValueTextInputFormat \
  -mapper org.apache.hadoop.mapred.lib.IdentityMapper \
  -reducer /usr/bin/wc

您可以将 stream.non.zero.exit.is.failure 指定为 truefalse,以使退出状态为非零的流任务分别变为 失败成功。默认情况下,退出状态为非零的流任务被视为失败任务。

使用作业提交打包文件

您可以指定任何可执行文件作为映射器和/或归约器。可执行文件不必预先存在于集群中的计算机上;但是,如果不存在,您需要使用“-file”选项告诉框架将可执行文件作为作业提交的一部分打包。例如

mapred streaming \
  -input myInputDirs \
  -output myOutputDir \
  -mapper myPythonScript.py \
  -reducer /usr/bin/wc \
  -file myPythonScript.py

上述示例指定了用户定义的 Python 可执行文件作为映射器。选项“-file myPythonScript.py”导致 Python 可执行文件作为作业提交的一部分发送到集群机器。

除了可执行文件,您还可以打包映射器和/或归约器可能使用的其他辅助文件(例如字典、配置文件等)。例如

mapred streaming \
  -input myInputDirs \
  -output myOutputDir \
  -mapper myPythonScript.py \
  -reducer /usr/bin/wc \
  -file myPythonScript.py \
  -file myDictionary.txt

为作业指定其他插件

与普通 Map/Reduce 作业一样,您可以为流式作业指定其他插件

 -inputformat JavaClassName
 -outputformat JavaClassName
 -partitioner JavaClassName
 -combiner streamingCommand or JavaClassName

您为输入格式提供的类应返回 Text 类的键/值对。如果您未指定输入格式类,则 TextInputFormat 用作默认值。由于 TextInputFormat 返回 LongWritable 类的键,而这些键实际上不是输入数据的一部分,因此将丢弃这些键;只有值将被传输到流式映射器。

您为输出格式提供的类应获取 Text 类的键/值对。如果您未指定输出格式类,则 TextOutputFormat 用作默认值。

设置环境变量

要在流式命令中设置环境变量,请使用

 -cmdenv EXAMPLE_DIR=/home/example/dictionaries/

通用命令选项

流式处理支持 流式命令选项 以及通用命令选项。通用命令行语法如下所示。

注意:务必在流选项之前放置通用选项,否则命令将失败。有关示例,请参阅使存档可供任务使用

hadoop command [genericOptions] [streamingOptions]

您可以在流式处理中使用的 Hadoop 通用命令选项在此处列出

参数 可选/必需 说明
-conf configuration_file 可选 指定应用程序配置文件
-D property=value 可选 对给定属性使用值
-fs host:port 或 local 可选 指定名称节点
-files 可选 指定要复制到 Map/Reduce 集群的逗号分隔文件
-libjars 可选 指定要包含在类路径中的逗号分隔 jar 文件
-archives 可选 指定要在计算机器上解压缩的逗号分隔存档

使用 -D 选项指定配置变量

您可以使用“-D <property>=<value>”指定其他配置变量。

指定目录

要更改本地临时目录,请使用

 -D dfs.data.dir=/tmp

要指定其他本地临时目录,请使用

 -D mapred.local.dir=/tmp/local
 -D mapred.system.dir=/tmp/system
 -D mapred.temp.dir=/tmp/temp

注意:有关作业配置参数的更多详细信息,请参阅:mapred-default.xml

指定仅映射作业

通常,您可能只想使用映射函数处理输入数据。若要执行此操作,只需将 mapreduce.job.reduces 设置为零。Map/Reduce 框架不会创建任何归约器任务。相反,映射器任务的输出将成为作业的最终输出。

 -D mapreduce.job.reduces=0

为了向后兼容,Hadoop Streaming 还支持 “-reducer NONE” 选项,该选项等效于 “-D mapreduce.job.reduces=0”。

指定归约器数量

要指定归约器数量(例如两个),请使用

mapred streaming \
  -D mapreduce.job.reduces=2 \
  -input myInputDirs \
  -output myOutputDir \
  -mapper /bin/cat \
  -reducer /usr/bin/wc

自定义如何将行拆分为键/值对

如前所述,当 Map/Reduce 框架从映射器的 stdout 中读取一行时,它会将该行拆分为一个键/值对。默认情况下,该行前缀(直到第一个制表符字符)是键,该行的其余部分(不包括制表符字符)是值。

但是,您可以自定义此默认设置。您可以指定除制表符字符(默认值)之外的其他字段分隔符,并且您可以指定一行中的第 n 个字符(n >= 1)而不是第一个字符(默认值)作为键和值之间的分隔符。例如

mapred streaming \
  -D stream.map.output.field.separator=. \
  -D stream.num.map.output.key.fields=4 \
  -input myInputDirs \
  -output myOutputDir \
  -mapper /bin/cat \
  -reducer /bin/cat

在上述示例中,“-D stream.map.output.field.separator=.” 将 “.” 指定为映射输出的字段分隔符,并且一行中直到第四个 “.” 的前缀将成为键,该行的其余部分(不包括第四个 “.”)将成为值。如果一行少于四个 “.”,则整行将成为键,并且值将成为一个空文本对象(如 new Text(””)创建的对象)。

类似地,您可以使用 “-D stream.reduce.output.field.separator=SEP” 和 “-D stream.num.reduce.output.fields=NUM” 将归约输出行中的第 n 个字段分隔符指定为键和值之间的分隔符。

类似地,您可以将 “stream.map.input.field.separator” 和 “stream.reduce.input.field.separator” 指定为 Map/Reduce 输入的输入分隔符。默认情况下,分隔符是制表符字符。

使用大文件和归档文件

使用 -files 和 -archives 选项,您可以让任务使用文件和存档。参数是您已上传到 HDFS 的文件或存档的 URI。这些文件和存档在作业之间进行缓存。您可以从 fs.default.name 配置变量中检索 host 和 fs_port 值。

注意: -files 和 -archives 选项是通用选项。请确保在命令选项前放置通用选项,否则命令将失败。

让任务使用文件

-files 选项在任务的当前工作目录中创建一个符号链接,该符号链接指向文件的本地副本。

在此示例中,Hadoop 会自动在任务的当前工作目录中创建一个名为 testfile.txt 的符号链接。此符号链接指向 testfile.txt 的本地副本。

-files hdfs://host:fs_port/user/testfile.txt

用户可以使用 # 为 -files 指定不同的符号链接名称。

-files hdfs://host:fs_port/user/testfile.txt#testfile

可以像这样指定多个条目

-files hdfs://host:fs_port/user/testfile1.txt,hdfs://host:fs_port/user/testfile2.txt

让任务使用存档

-archives 选项允许您将 jar 本地复制到任务的当前工作目录,并自动解压文件。

在此示例中,Hadoop 会自动在任务的当前工作目录中创建一个名为 testfile.jar 的符号链接。此符号链接指向存储已上传 jar 文件的解压内容的目录。

-archives hdfs://host:fs_port/user/testfile.jar

用户可以使用 # 为 -archives 指定不同的符号链接名称。

-archives hdfs://host:fs_port/user/testfile.tgz#tgzdir

在此示例中,input.txt 文件有两行,指定了两个文件:cachedir.jar/cache.txt 和 cachedir.jar/cache2.txt 的名称。“cachedir.jar”是到存档目录的符号链接,该目录包含文件“cache.txt”和“cache2.txt”。

mapred streaming \
                -archives 'hdfs://hadoop-nn1.example.com/user/me/samples/cachefile/cachedir.jar' \
                -D mapreduce.job.maps=1 \
                -D mapreduce.job.reduces=1 \
                -D mapreduce.job.name="Experiment" \
                -input "/user/me/samples/cachefile/input.txt" \
                -output "/user/me/samples/cachefile/out" \
                -mapper "xargs cat" \
                -reducer "cat"

$ ls test_jar/
cache.txt  cache2.txt

$ jar cvf cachedir.jar -C test_jar/ .
added manifest
adding: cache.txt(in = 30) (out= 29)(deflated 3%)
adding: cache2.txt(in = 37) (out= 35)(deflated 5%)

$ hdfs dfs -put cachedir.jar samples/cachefile

$ hdfs dfs -cat /user/me/samples/cachefile/input.txt
cachedir.jar/cache.txt
cachedir.jar/cache2.txt

$ cat test_jar/cache.txt
This is just the cache string

$ cat test_jar/cache2.txt
This is just the second cache string

$ hdfs dfs -ls /user/me/samples/cachefile/out
Found 2 items
-rw-r--r-* 1 me supergroup        0 2013-11-14 17:00 /user/me/samples/cachefile/out/_SUCCESS
-rw-r--r-* 1 me supergroup       69 2013-11-14 17:00 /user/me/samples/cachefile/out/part-00000

$ hdfs dfs -cat /user/me/samples/cachefile/out/part-00000
This is just the cache string
This is just the second cache string

更多使用示例

Hadoop 分区程序类

Hadoop 有一个库类 KeyFieldBasedPartitioner,对许多应用程序很有用。此类允许 Map/Reduce 框架根据某些键字段(而不是整个键)对映射输出进行分区。例如

mapred streaming \
  -D stream.map.output.field.separator=. \
  -D stream.num.map.output.key.fields=4 \
  -D map.output.key.field.separator=. \
  -D mapreduce.partition.keypartitioner.options=-k1,2 \
  -D mapreduce.job.reduces=12 \
  -input myInputDirs \
  -output myOutputDir \
  -mapper /bin/cat \
  -reducer /bin/cat \
  -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

此处,-D stream.map.output.field.separator=.-D stream.num.map.output.key.fields=4 与前一个示例中的解释相同。流式传输使用这两个变量来识别映射程序的键/值对。

上述 Map/Reduce 作业的映射输出键通常有四个字段,由“.”分隔。但是,Map/Reduce 框架将使用 -D mapred.text.key.partitioner.options=-k1,2 选项按键的前两个字段对映射输出进行分区。此处,-D map.output.key.field.separator=. 指定分区的分隔符。这可确保键中前两个字段相同的键/值对将被分区到同一个还原程序中。

这实际上等效于将前两个字段指定为主键,将后两个字段指定为辅助键。主键用于分区,而主键和辅助键的组合用于排序。此处显示了一个简单的说明

映射输出(键)

11.12.1.2
11.14.2.3
11.11.4.1
11.12.1.1
11.14.2.2

划分为 3 个归约器(前 2 个字段用作分区键)

11.11.4.1
-----------
11.12.1.2
11.12.1.1
-----------
11.14.2.3
11.14.2.2

为归约器对每个分区进行排序(所有 4 个字段用于排序)

11.11.4.1
-----------
11.12.1.1
11.12.1.2
-----------
11.14.2.2
11.14.2.3

Hadoop 比较器类

Hadoop 有一个库类 KeyFieldBasedComparator,对许多应用程序很有用。此类提供 Unix/GNU Sort 提供的部分功能。例如

mapred streaming \
  -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator \
  -D stream.map.output.field.separator=. \
  -D stream.num.map.output.key.fields=4 \
  -D mapreduce.map.output.key.field.separator=. \
  -D mapreduce.partition.keycomparator.options=-k2,2nr \
  -D mapreduce.job.reduces=1 \
  -input myInputDirs \
  -output myOutputDir \
  -mapper /bin/cat \
  -reducer /bin/cat

上述 Map/Reduce 作业的映射输出键通常有四个字段,由“.”分隔。但是,Map/Reduce 框架将使用 -D mapreduce.partition.keycomparator.options=-k2,2nr 选项按键的第二个字段对输出进行排序。此处,-n 指定排序是数值排序,-r 指定结果应反转。下面显示了一个简单的说明

映射输出(键)

11.12.1.2
11.14.2.3
11.11.4.1
11.12.1.1
11.14.2.2

对归约器的输出进行排序(其中第二个字段用于排序)

11.14.2.3
11.14.2.2
11.12.1.2
11.12.1.1
11.11.4.1

Hadoop 聚合包

Hadoop 有一个名为 Aggregate 的库包。Aggregate 提供一个特殊的归约器类和一个特殊的合并器类,以及一个简单的聚合器列表,这些聚合器对一系列值执行“求和”、“最大值”、“最小值”等聚合。Aggregate 允许您定义一个映射器插件类,该类预计将为映射器的每个输入键/值对生成“可聚合项”。合并器/归约器将通过调用适当的聚合器来聚合这些可聚合项。

要使用 Aggregate,只需指定“-reducer aggregate”

mapred streaming \
  -input myInputDirs \
  -output myOutputDir \
  -mapper myAggregatorForKeyCount.py \
  -reducer aggregate \
  -file myAggregatorForKeyCount.py

python 程序 myAggregatorForKeyCount.py 如下所示

#!/usr/bin/python3

import sys

def generateLongCountToken(id):
    return "LongValueSum:" + id + "\t" + "1"

def main(argv):
    line = sys.stdin.readline()
    try:
        while line:
            line = line[:-1]
            fields = line.split("\t")
            print(generateLongCountToken(fields[0]))
            line = sys.stdin.readline()
    except "end of file":
        return None

if __name__ == "__main__":
     main(sys.argv)

Hadoop 字段选择类

Hadoop 有一个库类 FieldSelectionMapReduce,它实际上允许您处理文本数据,就像 unix “cut” 实用程序一样。类中定义的映射函数将每个输入键/值对视为字段列表。您可以指定字段分隔符(默认值为制表符)。您可以选择任意字段列表作为映射输出键,并选择任意字段列表作为映射输出值。同样,类中定义的归约函数将每个输入键/值对视为字段列表。您可以选择任意字段列表作为归约输出键,并选择任意字段列表作为归约输出值。例如

mapred streaming \
  -D mapreduce.map.output.key.field.separator=. \
  -D mapreduce.partition.keypartitioner.options=-k1,2 \
  -D mapreduce.fieldsel.data.field.separator=. \
  -D mapreduce.fieldsel.map.output.key.value.fields.spec=6,5,1-3:0- \
  -D mapreduce.fieldsel.reduce.output.key.value.fields.spec=0-2:5- \
  -D mapreduce.map.output.key.class=org.apache.hadoop.io.Text \
  -D mapreduce.job.reduces=12 \
  -input myInputDirs \
  -output myOutputDir \
  -mapper org.apache.hadoop.mapred.lib.FieldSelectionMapReduce \
  -reducer org.apache.hadoop.mapred.lib.FieldSelectionMapReduce \
  -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

选项“-D mapreduce.fieldsel.map.output.key.value.fields.spec=6,5,1-3:0-”指定映射输出的键/值选择。键选择规范和值选择规范由“:”分隔。在此情况下,映射输出键将包含字段 6、5、1、2 和 3。映射输出值将包含所有字段(0- 表示字段 0 和所有后续字段)。

选项“-D mapreduce.fieldsel.reduce.output.key.value.fields.spec=0-2:5-”指定归约输出的键/值选择。在此情况下,归约输出键将包含字段 0、1、2(对应于原始字段 6、5、1)。归约输出值将包含从字段 5 开始的所有字段(对应于所有原始字段)。

常见问题

如何使用 Hadoop Streaming 运行一组任意(半)独立任务?

通常情况下,您不需要 Map Reduce 的全部功能,而只需要运行同一程序的多个实例 - 无论是在数据的不同部分上,还是在相同的数据上,但使用不同的参数。您可以使用 Hadoop Streaming 来执行此操作。

如何处理文件,每个映射一个?

举例来说,考虑在 Hadoop 集群中压缩(压缩)一组文件的问题。您可以使用 Hadoop Streaming 和自定义映射器脚本来实现此目的

  • 生成一个包含输入文件完整 HDFS 路径的文件。每个映射任务将获得一个文件名作为输入。

  • 创建一个映射器脚本,该脚本在给定文件名后,将文件获取到本地磁盘,对文件进行 gzip 压缩,然后将其放回所需的输出目录中。

我应该使用多少个归约器?

有关详细信息,请参阅 MapReduce 教程:归约器

如果我在 shell 脚本中设置一个别名,它是否会在 -mapper 之后起作用?

例如,假设我执行:别名 c1='cut -f1'。-mapper “c1” 是否起作用?

使用别名不起作用,但允许变量替换,如本示例所示

$ hdfs dfs -cat /user/me/samples/student_marks
alice   50
bruce   70
charlie 80
dan     75

$ c2='cut -f2'; mapred streaming \
  -D mapreduce.job.name='Experiment' \
  -input /user/me/samples/student_marks \
  -output /user/me/samples/student_out \
  -mapper "$c2" -reducer 'cat'

$ hdfs dfs -cat /user/me/samples/student_out/part-00000
50
70
75
80

我可以使用 UNIX 管道吗?

例如,-mapper “cut -f1 | sed s/foo/bar/g” 是否起作用?

目前,此操作不起作用,并会产生“java.io.IOException: Broken pipe”错误。这可能是一个需要调查的错误。

如果我收到“设备上没有剩余空间”错误,该怎么办?

例如,当我通过 -file 选项分发大型可执行文件(例如 3.6G)运行流作业时,会收到“设备上没有剩余空间”错误。

jar 打包发生在由配置变量 stream.tmpdir 指向的目录中。stream.tmpdir 的默认值为 /tmp。将其值设置为空间更大的目录

-D stream.tmpdir=/export/bigspace/…

如何指定多个输入目录?

您可以使用多个“-input”选项指定多个输入目录

mapred streaming \
  -input '/user/foo/dir1' -input '/user/foo/dir2' \
    (rest of the command)

如何生成 gzip 格式的输出文件?

您可以生成 gzip 文件作为生成的输出,而不是纯文本文件。将“ -D mapreduce.output.fileoutputformat.compress=true -D mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec”作为选项传递给您的流作业。

如何使用流提供我自己的输入/输出格式?

您可以通过打包自定义类并将其放入 $HADOOP_CLASSPATH 中来指定您自己的自定义类。

如何使用流解析 XML 文档?

您可以使用记录读取器 StreamXmlRecordReader 来处理 XML 文档。

mapred streaming \
  -inputreader "StreamXmlRecord,begin=BEGIN_STRING,end=END_STRING" \
    (rest of the command)

在 BEGIN_STRING 和 END_STRING 之间找到的任何内容都将被视为映射任务的一个记录。

StreamXmlRecordReader 理解的名称值属性是

  • (字符串)“开始” - 标记记录开始的字符,和“结束” - 标记记录结束的字符。
  • (布尔值)“慢匹配” - 切换以查找开始和结束字符,但在 CDATA 中而不是常规标记中。默认为 false。
  • (整数)“前瞻” - 使用“慢匹配”时同步 CDATA 的最大前瞻字节,应大于“maxrec”。默认为 2*“maxrec”。
  • (整数)“maxrec” - 在“慢匹配”期间每次匹配之间读取的最大记录大小。默认为 50000 字节。

如何在流应用程序中更新计数器?

流进程可以使用 stderr 发出计数器信息。应将 reporter:counter:<group>,<counter>,<amount> 发送到 stderr 以更新计数器。

如何在流应用程序中更新状态?

流进程可以使用 stderr 发出状态信息。要设置状态,应将 reporter:status:<message> 发送到 stderr。

如何获取流式作业的 mapper/reducer 中的作业变量?

请参阅 配置的参数。在流式作业执行期间,“mapred”参数的名称会发生转换。句点 ( . ) 会变为下划线 ( _ )。例如,mapreduce.job.id 会变为 mapreduce_job_id,而 mapreduce.job.jar 会变为 mapreduce_job_jar。在您的代码中,请使用带下划线的参数名称。

如果我收到“error=7, Argument list too long”怎么办

作业会将整个配置复制到环境中。如果作业正在处理大量输入文件,将作业配置添加到环境中可能会导致环境超限。环境中的作业配置副本对于运行作业并不是必需的,可以通过设置以下内容来截断它

-D stream.jobconf.truncate.limit=20000

默认情况下,值不会被截断 (-1)。零 (0) 将仅复制名称,而不复制值。对于几乎所有情况,20000 都是一个安全值,可以防止环境超限。