原文地址

摘要

MapReduce是一个用于处理和生成大型数据集的编程模型及其相关实现。

用户需要指定一个处理键/值对的映射函数,以生成一组中间键/值对,同时还需要指定一个减少函数,该函数合并与同一中间键相关联的所有中间值。正如本文所示,许多真实世界的任务可以在这个模型中表达。

采用这种函数式风格编写的程序会自动并行化,并在大规模的商用机器集群上执行。

运行时系统负责处理输入数据的分区细节,调度程序的执行跨多台机器,处理机器故障,并管理所需的机器间通信。这使得没有并行和分布式系统经验的程序员能够轻松利用大型分布式系统的资源。

我们的MapReduce实现在一个大规模的商用机器集群上运行,且具有很高的可扩展性:典型的MapReduce计算能够在数千台机器上处理数千兆字节的数据。

程序员发现该系统易于使用:已经实现了数百个MapReduce程序,并且每天在Google的集群上执行上千个MapReduce作业。

1 引言

在过去的五年中,谷歌的作者和许多其他人实施了数百个特殊用途的计算,处理大量原始数据,如爬取的文档、网络请求日志等,以计算各种派生数据,如倒排索引、Web文档图结构的各种表示、每个主机爬取的页面数量的摘要、给定日期最常见查询集等。大多数这样的计算在概念上是直截了当的。然而,输入数据通常很大,为了在合理的时间内完成计算,必须将计算分布到数百或数千台机器上。如何并行化计算、分发数据以及处理故障的问题导致原始简单计算变得复杂,需要大量处理这些问题的复杂代码。

作为对这种复杂性的应对,我们设计了一个新的抽象,允许我们表达我们试图执行的简单计算,同时隐藏库中并行化、容错、数据分发和负载均衡的混乱细节。我们的抽象受到了Lisp和许多其他函数式语言中存在的映射和减少原语的启发。我们意识到,我们的大多数计算涉及对输入中的每个逻辑“记录”应用映射操作,以计算一组中间键/值对,然后对所有共享相同键的值应用减少操作,以适当地组合派生数据。我们使用具有用户指定的映射和减少操作的函数模型,可以轻松并行化大规模计算,并将重新执行作为容错的主要机制。

这项工作的主要贡献是提供了一个简单而强大的界面,实现了大规模计算的自动并行化和分布,以及一个在大规模商用PC集群上实现此接口并在性能上表现出色的实现。第2节描述了基本的编程模型并给出了几个例子。第3节描述了一个针对我们基于集群的计算环境定制的MapReduce接口的实现。第4节描述了我们发现有用的编程模型的几个改进。第5节对我们的实现在各种任务中的性能进行了测量。第6节探讨了在谷歌内部使用MapReduce的情况,包括我们在将其作为重写我们的生产索引系统的基础时的经验。第7节讨论了相关和未来的工作。

2 编程模型

计算接受一组输入键/值对,并生成一组输出键/值对。MapReduce库的用户通过两个函数表达计算:Map和Reduce。

用户编写的Map函数接受一个输入键/值对,并生成一组中间键/值对。MapReduce库将所有与相同中间键 I 关联的中间值组合在一起,并将它们传递给Reduce函数。

用户同样编写的Reduce函数接受一个中间键 I 和该键对应的一组值。它将这些值合并在一起,形成可能更小的值集。通常,每次调用Reduce只产生零个或一个输出值。

中间值通过迭代器传递给用户的减少函数。这使我们能够处理无法在内存中容纳的值列表。

2.1 示例

考虑一个问题,统计大量文档中每个单词的出现次数。用户将编写类似于以下伪代码的代码:

map(String key, String value):
  // key: 文档名称
  // value: 文档内容
  for each word w in value:
    EmitIntermediate(w, "1");

reduce(String key, Iterator values):
  // key: 一个单词
  // values: 计数列表
  int result = 0;
  for each v in values:
    result += ParseInt(v);
  Emit(AsString(result));

在这个简单的例子中,map函数发出每个单词及其相关的出现次数计数(仅为 ‘1’)。reduce函数将为特定单词发出的所有计数相加在一起。

此外,用户编写代码填充一个mapreduce规范对象,其中包含输入和输出文件的名称以及可选的调优参数。

然后,用户调用MapReduce函数,将规范对象传递给它。用户的代码与MapReduce库(用C++实现)链接在一起。附录A包含此示例的完整程序文本。

2.2 类型

尽管先前的伪代码是基于字符串输入和输出编写的,但从概念上讲,用户提供的map和reduce函数具有关联的类型:

map(k1,v1) → list(k2,v2)
reduce(k2, list(v2)) → list(v2)

即,输入键和值来自不同的域,而输出键和值来自不同的域。此外,中间键和值与输出键和值来自相同的域。

我们的C++实现将字符串传递给用户定义的函数,并由用户代码负责在字符串和适当类型之间进行转换。

2.3 更多示例

以下是几个简单而有趣的程序示例,可以轻松地表达为MapReduce计算。

分布式Grep:map函数如果匹配了提供的模式,则发出一行。reduce函数是一个标识函数,只是将提供的中间数据复制到输出。

URL访问频率计数:map函数处理web页面请求的日志并输出 hURL, 1i。reduce函数将相同URL的所有值相加,并发出 hURL, 总计数i 对。

反向Web链接图:map函数对于在名为source的页面中找到的指向目标URL的每个链接,输出 htarget, sourcei 对。reduce函数连接与给定目标URL相关联的所有源URL的列表,并发出 htarget, list(source)i 对。

每个主机的词向量:一个词向量对文档或一组文档中出现的最重要的单词进行总结,表示为 hword, frequencyi 对的列表。map函数对每个输入文档(其中主机名从文档的URL中提取)发出 hhostname, term vectori 对。reduce函数接收给定主机的所有单文档词向量。它将这些词向量相加,丢弃不常见的术语,然后发出最终的 hhostname, term vectori 对。

倒排索引:map函数解析每个文档,并发出一系列 hword, document IDi 对。reduce函数接受给定单词的所有对,对相应的文档ID进行排序,并发出一个 hword, list(document ID)i 对。所有输出对的集合形成了一个简单的倒排索引。可以轻松扩展此计算以跟踪单词位置。

分布式排序:map函数从每个记录中提取键,并发出一个 hkey, recordi 对。reduce函数不改变地发出所有对。此计算依赖于第4.1节描述的分区设施和第4.2节描述的排序属性。

3 实现

MapReduce 接口有许多不同的实现方式,正确的选择取决于环境。

例如,一个实现可能适用于小型共享内存机器,另一个适用于大型NUMA多处理器,还有一个适用于更大规模的网络机器集合。

本节描述了一种针对谷歌广泛使用的计算环境的实现:由多台商用PC组成的大型集群,通过交换式以太网[4]连接在一起。

在我们的环境中:

(1)机器通常是运行Linux的双处理器x86处理器,每台机器配备2-4 GB的内存。

(2)使用商用网络硬件 - 通常在机器级别上是100兆位/秒或1千兆位/秒,但总体叉带宽要低得多。

(3)一个集群由数百或数千台机器组成,因此机器故障是常见的。

(4)存储由连接到个别机器的廉价IDE硬盘提供。我们使用内部开发的分布式文件系统[8]来管理这些硬盘上存储的数据。该文件系统使用复制来在不可靠的硬件上提供可用性和可靠性。

(5)用户将作业提交到调度系统。每个作业由一组任务组成,并由调度程序映射到集群中的一组可用机器。

3.1 执行概览

Map调用通过自动将输入数据分割为一组M个片段,从而在多台机器上分布。可以通过不同的机器并行处理输入片段。

通过使用分区函数(例如,hash(key) mod R),将中间键空间分割成R块,从而分布Reduce调用。用户指定分区数(R)和分区函数。

图1显示了在我们的实现中MapReduce操作的整体流程。当用户程序调用MapReduce函数时,会发生以下动作序列(图1中的数字标签对应下面的列表中的数字):

  • 图1

执行概览

  1. 用户程序中的MapReduce库首先将输入文件分割为通常为16兆字节至64兆字节(由用户通过可选参数进行控制)的M个片段。然后,在机器集群上启动许多程序的副本。
  2. 程序的一个副本是特殊的 - 主节点。其余的是由主节点分配工作的工作节点。有M个map任务和R个reduce任务要分配。主节点选择空闲的工作节点,并为每个节点分配一个map任务或一个reduce任务。
  3. 被分配了map任务的工作节点读取相应输入分片的内容。它从输入数据中解析出键/值对,并将每对传递给用户定义的Map函数。Map函数生成的中间键/值对在内存中缓冲。
  4. 定期,缓冲的键/值对被写入本地磁盘,由分区函数分为R个区域。这些缓冲对在本地磁盘上的位置被传递回主节点,负责将这些位置转发给reduce工作节点。
  5. 当主节点通知reduce工作节点这些位置时,它使用远程过程调用从map工作节点的本地磁盘读取缓冲数据。当reduce工作节点读取所有中间数据后,它按中间键对数据进行排序,以便将相同键的所有出现组合在一起。排序是必要的,因为通常许多不同的键映射到相同的reduce任务。如果中间数据量过大而无法容纳在内存中,则使用外部排序。
  6. reduce工作节点遍历排序后的中间数据,并对每个唯一的中间键进行处理,将键和相应的中间值集传递给用户的Reduce函数。Reduce函数的输出附加到此reduce分区的最终输出文件。
  7. 当所有map任务和reduce任务完成时,主节点唤醒用户程序。此时,用户程序中的MapReduce调用返回到用户代码。

成功完成后,mapreduce执行的输出可在R个输出文件中获得(每个reduce任务一个,文件名由用户指定)。

通常,用户不需要将这些R个输出文件合并为一个文件 - 他们通常将这些文件作为另一个MapReduce调用的输入,或者从能够处理被分割为多个文件的输入的另一个分布式应用程序中使用它们。

3.2 主节点数据结构

主节点保持多个数据结构。对于每个map任务和reduce任务,它存储状态(空闲、进行中或已完成)以及工作节点的标识(对于非空闲任务)。

主节点是中间文件区域位置从map任务传播到reduce任务的媒介。因此,对于每个完成的map任务,主节点存储由map任务生成的R个中间文件区域的位置和大小。

随着map任务的完成,对这些位置和大小信息的更新被接收。这些信息被逐步推送到具有进行中reduce任务的工作节点。

3.3 容错性

由于MapReduce库旨在使用数百或数千台机器处理大量数据,因此该库必须能够优雅地容忍机器故障。

工作节点故障

主节点定期对每个工作节点进行ping。如果在一定时间内未收到工作节点的响应,主节点将工作节点标记为失败。由工作节点完成的任何map任务都将被重置回其初始的空闲状态,因此可重新安排在其他工作节点上执行。类似地,由于故障的工作节点上进行中的任何map任务或reduce任务也将被重置为空闲状态,并可重新安排。

已完成的map任务在故障时需要重新执行,因为它们的输出存储在故障机器的本地磁盘上,因此无法访问。已完成的reduce任务不需要重新执行,因为它们的输出存储在全局文件系统中。

当一个map任务首先由工作节点A执行,然后由于A失败而稍后由工作节点B执行时,所有执行reduce任务的工作节点都会收到有关重新执行的通知。任何尚未从工作节点A读取数据的reduce任务将从工作节点B读取数据。MapReduce对大规模工作节点故障具有弹性。例如,在一次MapReduce操作中,运行集群的网络维护导致每次有80台机器在几分钟内无法访问。MapReduce主节点简单地重新执行了不可访问的工作节点机器执行的工作,并继续取得进展,最终完成MapReduce操作。

主节点故障

主节点周期性地写入上述主节点数据结构的周期性检查点是很容易的。如果主节点任务失败,可以从最后一个检查点状态启动新的副本。然而,鉴于只有一个主节点,其失败的可能性很小;因此,我们的当前实现如果主节点失败,则中止MapReduce计算。客户端可以检查此条件并在需要时重试MapReduce操作。

在出现故障的情况下的语义

当用户提供的map和reduce运算符是其输入值的确定性函数时,我们的分布式实现产生与整个程序的非故障顺序执行所产生的相同的输出。

我们依靠map和reduce任务输出的原子提交来实现此属性。每个正在进行的任务将其输出写入私有临时文件。reduce任务生成一个这样的文件,而map任务生成R个这样的文件(每个reduce任务一个)。当map任务完成时,工作节点向主节点发送消息,并在消息中包含R个临时文件的名称。如果主节点接收到已完成的map任务的完成消息,它将忽略该消息。否则,它会记录R个文件的名称在主节点的数据结构中。当reduce任务完成时,reduce工作节点原子地将其临时输出文件重命名为最终输出文件。如果同一reduce任务在多台机器上执行,将为同一最终输出文件执行多次重命名调用。我们依靠底层文件系统提供的原子重命名操作来保证最终的文件系统状态仅包含由reduce任务的一次执行产生的数据。

我们的大多数map和reduce运算符是确定性的,而且我们的语义在这种情况下等同于非故障顺序执行,这使得程序员很容易推断出他们程序的行为。当map和/或reduce运算符是不确定性的时,我们提供了更弱但仍然合理的语义。在存在不确定性运算符的情况下,特定reduce任务R1的输出等同于通过顺序执行不确定性程序产生的R1的输出。然而,对于不同的reduce任务R2,其输出可能对应于

通过不同的顺序执行不确定性程序产生的R2的输出。

考虑map任务M和reduce任务R1和R2。让e(Ri)是执行Ri的提交(存在确切的这样的执行)。由于e(R1)可能已读取由M的一个执行产生的输出,而e(R2)可能已读取由M的另一个执行产生的输出,因此产生了更弱的语义。

3.4 本地性

在我们的计算环境中,网络带宽是相对稀缺的资源。我们通过利用输入数据(由GFS[8]管理)存储在构成我们集群的机器的本地磁盘上的事实来节省网络带宽。

GFS将每个文件划分为64 MB块,并在不同的机器上存储每个块的多个副本(通常为3个副本)。

MapReduce主节点考虑输入文件的位置信息,并尝试在包含相应输入数据副本的机器上安排map任务。

如果失败,它将尝试在靠近该任务输入数据副本的地方安排map任务(例如,在与包含数据的机器相同网络交换机上的工作节点)。

在集群的大多数工作节点上运行大型MapReduce操作时,大多数输入数据都是在本地读取,不占用网络带宽。

3.5 任务粒度

我们将map阶段细分为M个部分,将reduce阶段细分为R个部分,如上所述。

理想情况下,M和R应远大于工作节点的数量。让每个工作节点执行多个不同的任务可以改善动态负载平衡,并在工作节点失败时加速恢复:它已完成的许多map任务可以分布在所有其他工作节点上。

在我们的实现中,有关M和R可以有多大的实际限制,因为主节点必须做出O(M + R)的调度决策,并且像上面描述的那样在内存中保留O(M * R)的状态。

(然而,内存使用的常数因子很小:状态的O(M * R)部分包含每个map任务/ reduce任务对约一字节的数据。)

此外,R通常受用户的约束,因为每个reduce任务的输出最终都会进入单独的输出文件。在实践中,我们倾向于选择M,使得每个单独的任务大致为16 MB至64 MB的输入数据(以使上述的本地性优化最为有效),并使R成为我们希望使用的工作节点数量的小倍数。

我们通常使用M = 200,000和R = 5,000,使用2,000个工作节点执行MapReduce计算。

3.6 备份任务

导致MapReduce操作总耗时延长的常见原因之一是“拖慢者”:一台机器在计算的最后几个map或reduce任务中花费异常长的时间。拖慢者可能出现出于各种原因。例如,具有坏磁盘的机器可能会遇到频繁的可纠正错误,从而使其读取性能从30 MB/s减慢到1 MB/s。集群调度系统可能已经在该机器上安排了其他任务,导致由于CPU、内存、本地磁盘或网络带宽的竞争而使其执行MapReduce代码变慢。我们最近遇到的一个问题是机器初始化代码中的一个错误,导致处理器缓存被禁用:受影响机器上的计算速度减慢了一百倍以上。

我们有一个通用的机制来缓解拖慢者的问题。当MapReduce操作接近完成时,主节点会调度剩余进行中任务的备份执行。只要主执行或备份执行完成,任务就被标记为已完成。我们已经调整了这个机制,以便它通常增加操作使用的计算资源不超过几个百分点。

我们发现这显著缩短了完成大型MapReduce操作的时间。

例如,第5.3节中描述的排序程序在禁用备份任务机制时需要44%的额外时间来完成。

4. 优化 Refinements

尽管仅通过编写Map和Reduce函数提供的基本功能对大多数需求已经足够,但我们发现一些扩展很有用。本节将对这些扩展进行描述。

4.1 划分函数 Partitioning Function

MapReduce的用户指定他们想要的reduce任务/输出文件数量(R)。数据通过对中间键使用划分函数划分到这些任务中。提供了一个默认的划分函数,使用哈希(例如,“hash(key) mod R”)。这通常导致相当平衡的划分。然而,在某些情况下,通过键的某些其他函数进行数据划分是有用的。

例如,有时输出键是URL,我们希望同一主机的所有条目都最终出现在同一个输出文件中。为了支持这样的情况,MapReduce库的用户可以提供一个特殊的划分函数。

例如,使用“hash(Hostname(urlkey)) mod R”作为划分函数将导致来自同一主机的所有URL最终都出现在同一个输出文件中。

4.2 顺序保证 Ordering Guarantees

我们保证在给定的分区内,中间键/值对按键递增的顺序进行处理。这个排序保证使得在每个分区生成一个排序的输出文件变得容易,当输出文件格式需要支持按键进行高效随机访问查找,或者输出的用户发现按顺序排列的数据很方便时,它就变得有用。

4.3 合并函数 Combiner Function

在某些情况下,每个map任务生成的中间键存在显著的重复,并且用户指定的Reduce函数是可交换且可结合的。2.1节的字数统计示例就是一个很好的例子。

由于单词频率往往遵循Zipf分布,每个map任务将生成数百或数千个形式为 <the, 1> 的记录。

所有这些计数将通过网络发送到单个reduce任务,然后由Reduce函数相加以生成一个数字。我们允许用户指定一个可选的合并函数,在数据发送到网络之前对其进行部分合并。

合并函数在执行map任务的每台机器上执行。通常,相同的代码用于实现合并函数和reduce函数。

合并函数和reduce函数之间唯一的区别是MapReduce库如何处理函数的输出。reduce函数的输出写入最终输出文件。合并函数的输出写入一个将发送到reduce任务的中间文件。

部分合并显著加速了某些类别的MapReduce操作。附录A包含使用合并器的示例。

4.4 输入和输出类型 Input and Output Types

MapReduce库支持以几种不同的格式读取输入数据。例如,“text”模式输入将每一行视为一个键/值对:键是文件中的偏移量,值是行的内容。

另一种常见的支持格式存储按键排序的键/值对序列。

每种输入类型实现都知道如何将自己分割成有意义的范围,以便作为单独的map任务进行处理(例如,文本模式的范围分割确保范围分割仅在行边界发生)。

用户可以通过提供一个简单的阅读器接口的实现来为新的输入类型添加支持,尽管大多数用户只使用少量预定义的输入类型之一。

阅读器不一定需要提供从文件中读取的数据。例如,可以轻松定义一个从数据库或从内存中映射的数据结构中读取记录的阅读器。

类似地,我们支持一组输出类型,以不同的格式生成数据,并且用户代码可以轻松添加对新输出类型的支持。

4.5 副作用 Side-effects

在某些情况下,MapReduce的用户发现将辅助文件作为其map和/或reduce运算符的附加输出产生是方便的。我们依赖应用程序编写者使这些副作用成为原子且幂等的。

通常,应用程序会写入一个临时文件,一旦完全生成,就会原子地将此文件重命名。

我们不支持由单个任务生成的多个输出文件的原子两阶段提交。因此,生成具有跨文件一致性要求的多个输出文件的任务应该是确定性的。在实践中,这个限制从未成为问题。

4.6 跳过错误记录 Skipping Bad Records

有时用户代码中存在的错误会导致Map或Reduce函数在某些记录上以确定性的方式崩溃。这样的错误会阻止MapReduce操作完成。通常的做法是修复错误,但有时这是不可行的;也许错误是在源代码不可用的第三方库中。此外,有时忽略一些记录是可以接受的,例如在对大型数据集进行统计分析时。我们提供了一种可选的执行模式,其中MapReduce库检测到哪些记录导致确定性崩溃,并跳过这些记录以推动进展。

每个工作进程都安装了一个信号处理程序,用于捕获分段违例和总线错误。在调用用户Map或Reduce操作之前,MapReduce库将参数的序列号存储在全局变量中。如果用户代码生成信号,则信号处理程序将发送一个包含序列号的“最后一口气”UDP数据包到MapReduce主节点。当主节点在特定记录上看到多次故障时,它会指示在发出相应的Map或Reduce任务的下一个重新执行时跳过该记录。

4.7 本地执行 Local Execution

在Map或Reduce函数中调试问题可能会很棘手,因为实际的计算发生在一个分布式系统中,通常涉及数千台机器,由主节点动态地做出工作分配决策。为了方便调试、性能分析和小规模测试,我们开发了MapReduce库的另一种实现,该实现在本地机器上顺序执行MapReduce操作的所有工作。为用户提供了控制,以便计算可以限制在特定的map任务上。用户使用特殊标志调用其程序,然后可以轻松使用任何有用的调试或测试工具(例如gdb)。

4.8 状态信息 Status Information

主节点运行一个内部HTTP服务器,并提供一组供人类阅读的状态页面。状态页面显示计算的进度,例如已完成的任务数、进行中的任务数、输入字节数、中间数据字节数、输出字节数、处理速率等。页面还包含对每个任务生成的标准错误和标准输出文件的链接。用户可以使用这些数据来预测计算需要多长时间,以及是否应该向计算添加更多资源。

这些页面还可用于确定计算是否比预期慢得多。

此外,顶级状态页面显示了哪些工作机器失败以及它们在失败时正在处理哪些map和reduce任务。在尝试诊断用户代码中的错误时,此信息非常有用。

4.9 计数器 Counters

MapReduce库提供了一个计数器功能,用于计算各种事件的发生次数。例如,用户代码可能想要计算处理的单词总数或索引的德语文档数量等。

要使用此功能,用户代码创建一个命名计数器对象,然后在Map和/或Reduce函数中适当地增加计数器。

例如:

Counter* uppercase;
uppercase = GetCounter("uppercase");
map(String name, String contents):
  for each word w in contents:
    if (IsCapitalized(w)):
      uppercase->Increment();
    EmitIntermediate(w, "1");

来自各个工作机器的计数器值会定期传播到主节点(在ping响应中携带)。主节点聚合成功的map和reduce任务的计数器值,并在MapReduce操作完成时将它们返回给用户代码。当前的计数器值也显示在主节点状态页面上,以便用户可以观察实时计算的进展。在聚合计数器值时,主节点消除了相同map或reduce任务的重复执行的影响,以避免重复计数。

(重复执行可能由我们使用的备份任务以及由于故障而重新执行的任务引起。)

MapReduce库还自动维护一些计数器值,例如处理的输入键/值对数量和生成的输出键/值对数量。

用户发现计数器功能对于检查MapReduce操作的行为非常有用。

例如,在某些MapReduce操作中,用户代码可能希望确保生成的输出对的数量完全等于处理的输入对的数量,或者处理的德语文档的比例在总处理文档数量的可容忍比例内。

5. 性能

在这一部分,我们对在一个庞大机器集群上运行的两个MapReduce计算进行了性能测试。其中一个计算在大约一太字节的数据中搜索特定模式。另一个计算对大约一太字节的数据进行排序。

这两个程序代表了MapReduce用户编写的真实程序的一个大的子集——其中一类程序将数据从一种表示转换为另一种表示,另一类程序从大数据集中提取少量有趣的数据。

5.1 集群配置

所有程序都在一个包含大约1800台机器的集群上执行。每台机器配备有两个2GHz英特尔Xeon处理器,启用了超线程技术,4GB内存,两个160GB IDE硬盘,以及一个千兆以太网链路。这些机器被布置在一个两层树状交换网络中,根部拥有大约100-200 Gbps的聚合带宽。

所有机器都在同一个托管设施内,因此任意两台机器之间的往返时间小于一毫秒。

在4GB内存中,大约有1-1.5GB被集群上运行的其他任务所占用。这些程序是在周末下午执行的,此时CPU、硬盘和网络大多处于空闲状态。

5.2 Grep

grep程序扫描了1010个100字节的记录,寻找一个相对罕见的三字符模式(该模式在92337个记录中出现)。

输入被分成大约64MB的片段(M = 15000),并且整个输出都放在一个文件中(R = 1)。

图2显示了计算随时间的进展情况。Y轴显示了扫描输入数据的速率。

  • 图2

F2

随着越来越多的机器被分配给MapReduce计算,速率逐渐提高,在分配了1764个工作机器时达到峰值,超过30GB/s。随着映射任务完成,速率开始下降,在计算进行约80秒时降至零。

整个计算从开始到结束大约需要150秒。其中大约一分钟用于启动开销。这种开销是由程序传播到所有工作机器以及与GFS交互以打开1000个输入文件并获取用于局部性优化的信息所致。

5.3 Sort

sort程序对1010个100字节的记录(大约1太字节的数据)进行排序。此程序是根据TeraSort基准测试[10]建模的。

排序程序包含不到50行用户代码。三行的Map函数从文本行中提取一个10字节的排序键,并将键和原始文本行作为中间键/值对发射出去。我们使用内置的Identity函数作为Reduce运算符。此函数将中间键/值对无更改地传递为输出键/值对。最终排序输出写入一组2倍复制的GFS文件(即2太字节作为程序的输出)。

与之前一样,输入数据分成64MB的片段(M = 15000)。我们将排序输出分成4000个文件(R = 4000)。分区函数使用键的前几个字节将其分隔成R个部分。

我们的分区函数对这个基准测试的键分布有内置的了解。在一般的排序程序中,我们会添加一个预处理的MapReduce操作,该操作将收集键的样本,并使用样本键的分布计算最终排序阶段的分割点。

图3(a)显示了sort程序正常执行的进展情况。左上角的图表显示了读取输入的速率。速率峰值约为13 GB/s,由于所有映射任务在200秒之内完成,速率迅速下降。请注意,与grep相比,输入速率较低。这是因为sort映射任务大约有一半的时间和I/O带宽用于将中间输出写入本地磁盘。grep的相应中间输出几乎没有大小。

中间左侧的图表显示了从映射任务到减少任务发送数据的速率。此洗牌在第一个映射任务完成后立即开始。图中的第一个小峰是约1700个减少任务的第一批(整个MapReduce分配了约1700台机器,每台机器最多同时执行一个减少任务)。大约在计算进行到300秒时,这些第一批的减少任务完成,我们开始为剩余的减少任务洗牌数据。所有的洗牌在计算进行到约600秒时完成。

左下方的图表显示了由减少任务将排序数据写入最终输出文件的速率。在第一次洗牌结束和写入期间有一个延迟,因为机器正在忙于对中间数据进行排序。写入以2-4 GB/s的速率持续一段时间。所有写入在计算进行到约850秒时完成。

包括启动开销,整个计算需要891秒。这与TeraSort基准测试[18]的当前最佳报告结果1057秒相似。

需要注意的几点:输入速率高于洗牌速率和输出速率,这是由于我们的局部性优化 - 大多数数据从本地磁盘读取,绕过了我们相对带宽受限的网络。洗牌速率高于输出速率,因为输出阶段写入排序数据的两个副本(出于可靠性和可用性原因,我们制作输出的两个副本)。我们写两个副本,因为这是底层文件系统提供的可靠性和可用性的机制。如果底层文件系统使用纠删码[14]而不是复制,写入数据的网络带宽需求将减少。

  • 图3

F3

5.4 备份任务的效果

在图3(b)中,我们展示了一个在禁用备份任务的情况下执行排序程序的过程。

执行流程与图3(a)中显示的类似,只是在几乎没有写活动发生的地方有一个非常长的尾部。在960秒后,除了5个减少任务之外,所有任务都已完成。然而,这最后的几个滞后者要到300秒后才能完成。整个计算需要1283秒,增加了44%的经过时间。

5.5 机器故障

在图3(c)中,我们展示了一个排序程序的执行,其中我们故意在计算进行了几分钟后杀死了1746个工作进程中的200个。底层的集群调度程序立即在这些机器上重新启动了新的工作进程(因为只有进程被杀死,机器仍然正常运行)。

工作进程的死亡表现为负的输入速率,因为一些先前完成的映射工作消失了(因为相应的映射工作者被杀死),需要重新执行。这些映射工作的重新执行相对较快。

整个计算在包括启动开销的933秒内完成(只是比正常执行时间增加了5%)。

6. 经验

我们于2003年2月编写了MapReduce库的第一个版本,并在2003年8月进行了重大增强,包括局部性优化、动态负载均衡等。

自那时以来,我们惊讶地发现MapReduce库在我们处理的各种问题中具有广泛的适用性。

它已在Google的许多领域广泛使用,包括:

• 大规模机器学习问题, • Google新闻和Froogle产品的聚类问题, • 提取用于生成热门查询报告的数据(例如Google Zeitgeist), • 用于新实验和产品的Web页面属性提取(例如从大量Web页面语料库中提取地理位置以进行本地搜索), • 大规模图计算。图4显示了随着时间推移,我们在主要源代码管理系统中提交的独立MapReduce程序实例数量的显着增长,从2003年初的0增加到截至2004年9月底的近900个独立实例。MapReduce之所以如此成功,是因为它使得编写简单的程序并在半小时内在一千台机器上高效运行成为可能,极大地加速了开发和原型设计周期。此外,它允许没有分布式和/或并行系统经验的程序员轻松利用大量资源。

在每个作业结束时,MapReduce库会记录有关作业使用的计算资源的统计信息。

在表1中,我们展示了Google在2004年8月运行的一些MapReduce作业的统计信息。

  • 表1&图4

T1&F4

6.1 大规模索引

迄今为止,我们对MapReduce最重要的应用之一是对生成用于Google Web搜索服务的数据结构的生产索引系统进行的完全重写。索引系统以由我们的爬虫系统检索到的大量文档为输入,这些文档存储为一组GFS文件。这些文档的原始内容超过20TB。索引过程作为五到十个MapReduce操作的序列运行。使用MapReduce(而不是以前版本的索引系统中的特定分布式传递)带来了一些好处:

• 索引代码更简单、更小、更易理解,因为处理容错、分发和并行化的代码被隐藏在MapReduce库中。例如,当使用MapReduce表达时,计算的一个阶段的代码量从约3800行C++代码减少到约700行。

• MapReduce库的性能足够好,我们可以将概念上不相关的计算分开,而不是混合在一起以避免对数据的额外传递。这使得更改索引过程变得很容易。例如,在我们的旧索引系统中花费几个月时间的一个更改在新系统中只需几天即可实现。

• 索引过程的操作变得更加容易,因为MapReduce库自动处理由机器故障、慢机器和网络故障引起的大多数问题,而无需操作员干预。此外,通过向索引群集添加新机器,可以轻松提高索引过程的性能。

7 相关工作

许多系统提供了受限的编程模型,并利用这些限制来自动并行化计算。

例如,可以使用并行前缀计算[6, 9, 13]在N个处理器上对N元素数组的所有前缀进行计算。MapReduce可以被视为对这些模型的一种简化和提炼,基于我们在大规模真实世界计算中的经验。更重要的是,我们提供了一个容错实现,可扩展到数千个处理器。相比之下,大多数并行处理系统只在较小规模上实现,并将处理机器故障的细节留给程序员。

Bulk Synchronous Programming [17] 和一些 MPI 原语 [11] 提供更高级的抽象,使程序员更容易编写并行程序。这些系统与 MapReduce 的一个关键区别是,MapReduce 利用受限的编程模型来自动并行化用户程序,并提供透明的容错性。

我们的局部性优化受到一些技术的启发,比如主动磁盘 [12, 15],其中计算被推送到靠近本地磁盘的处理单元,以减少通过 I/O 子系统或网络发送的数据量。我们在与少量磁盘直接连接的商品处理器上运行,而不是直接在磁盘控制器处理器上运行,但总体方法是相似的。

我们的备份任务机制类似于 Charlotte 系统 [3] 中采用的急切调度机制。简单急切调度的一个缺点是,如果给定任务导致重复失败,整个计算将无法完成。我们通过跳过坏记录的机制解决了这个问题的一些实例。

MapReduce 实现依赖于一个内部的集群管理系统,负责在大量共享机器上分发和运行用户任务。虽然这不是本文的重点,但集群管理系统在精神上类似于其他系统,如 Condor [16]。

MapReduce 库中的排序工具在操作上类似于 NOW-Sort [1]。源机器(map 工作者)对要排序的数据进行分区,并将其发送到 R 个 reduce 工作者中的一个。每个 reduce 工作者在本地排序其数据(如果可能的话,将其保存在内存中)。当然,NOW-Sort没有我们的库广泛适用的用户可定义的 Map 和 Reduce 函数。

River [2] 提供了一个编程模型,其中进程通过在分布式队列上传送数据进行通信。与 MapReduce 一样,River 系统试图在异构硬件或系统扰动引入的不均匀性存在的情况下提供良好的平均性能。River 通过精心安排磁盘和网络传输以实现平衡的完成时间来实现这一目标。MapReduce 采用了一种不同的方法。通过限制编程模型,MapReduce 框架能够将问题分割成大量的细粒度任务。这些任务会动态地在可用的工作者上进行调度,以便更快的工作者处理更多的任务。受限制的编程模型还允许我们在作业结束时安排对任务的冗余执行,从而大大减少在存在不均匀性(如慢或卡住的工作者)的情况下的完成时间。

BAD-FS [5] 的编程模型与 MapReduce 完全不同,与 MapReduce 不同,它旨在跨广域网络执行作业。然而,存在两个基本相似之处。 (1)两个系统都使用冗余执行来从由故障引起的数据丢失中恢复。 (2)两者都使用具有局部感知调度的方法,以减少通过拥塞的网络链路发送的数据量。

TACC [7] 是一个旨在简化构建高可用网络服务的系统。与 MapReduce 类似,它依赖于重新执行作为实现容错性的机制。

8 结论

MapReduce 编程模型在 Google 中已成功用于许多不同的目的。我们将这一成功归因于几个原因。

首先,该模型易于使用,即使对于没有并行和分布式系统经验的程序员也是如此,因为它隐藏了并行化、容错、局部优化和负载平衡的细节。

其次,大量的问题可以轻松地表达为 MapReduce 计算。

例如,MapReduce 用于生成 Google 生产 Web 搜索服务的数据,用于排序、数据挖掘、机器学习等各种系统。

第三,我们已经开发了一个可扩展到包含数千台机器的大型机群的 MapReduce 实现。该实现有效地利用这些机器资源,因此适用于 Google 遇到的许多大规模计算问题。

我们从这项工作中学到了几件事。首先,限制编程模型使并行化和分布计算、以及使这些计算容错变得容易。

其次,网络带宽是一种稀缺资源。因此,我们系统中的许多优化都针对减少通过网络发送的数据量:局部性优化使我们能够从本地磁盘读取数据,将中间数据写入本地磁盘只保存了网络带宽的一个副本。

第三,冗余执行可以用于减轻慢机器的影响,并处理机器故障和数据丢失。

致谢

Josh Levenberg 在修订和扩展用户级 MapReduce API 方面发挥了关键作用,他根据使用 MapReduce 的经验以及其他人对增强功能的建议进行了许多新功能的开发。MapReduce 从 Google 文件系统 [8] 读取输入并将输出写入该系统。我们要感谢 Mohit Aron、Howard Gobioff、Markus Gutschke、David Kramer、Shun-Tak Leung 和 Josh Redstone 在开发 GFS 时所做的工作。同时,我们还要感谢 Percy Liang 和 Olcan Sercinoglu 在开发 MapReduce 使用的集群管理系统时的贡献。Mike Burrows、Wilson Hsieh、Josh Levenberg、Sharon Perl、Rob Pike 和 Debby Wallach 对本文早期草稿提供了有益的评论。匿名的OSDI审稿人以及我们的指导教师 Eric Brewer 在论文可以改进的许多领域提供了许多有用的建议。最后,我们要感谢 Google 工程组织内所有使用 MapReduce 的用户,他们提供了有益的反馈、建议和错误报告。

参考资料

https://static.googleusercontent.com/media/research.google.com/zh-CN//archive/mapreduce-osdi04.pdf