弹性分布式数据集(上篇)

- 2 mins

摘要

我们提供了弹性分布式数据集(RDDs),一个抽象的分布式内存,用来帮助程序员能在大规模集群中以一种容错的方式进行内存计算。 RDDs被提出的动机是,当前的计算框架对迭代式算法与交互式数据挖掘这两种类型的应用程序,处理起来效率低效。 在这些种场景下,如果能将数据保存在内存中,将会使的应用程序的性能提高一个数量级。 为了高效地完成容错,RDDs提供了一种受限制的共享内存的方式,这种方式是基于粗粒度的转换共享状态而非细粒度的更新共享状态。 然而,我们分析表明RDDs可以表达出很多种类的计算,包括目前专门从事迭代任务的编程计算模型,比如 Pregel,当然也可以表达出目前模型表达不出的程序。 我们通过Spark系统已经实现了RDDs,并且通过各种各样的用户应用和测试来评估了这个系统。

1 介绍

分布式集群计算框架像MapReduce和Dryad等已经广泛应用于大数据集分析中。这些系统可以让用户不用担心分布式工作环境以及容错,而是使用一系列的高层次的操作来达到并行计算。 虽然当前的框架提供了大量的对访问集群的计算资源的抽象,但是它们还是缺少了对利用分布式内存的抽象扩充。 这样使的它们在处理需要在多个计算之间复用中间结果的应用的时候会非常的不高效。 数据的复用在迭代机器学习和图计算领域(比如 PageRank,K-means 以及线性回归等算法)是很常见的。 数据复用在交互式数据挖掘也是很常见的,一个用户会经常对一个相同的数据子集进行多次不同的特定查询。 然而,目前的大部分的框架对计算之间的数据复用的处理方式就是将中间数据写到一个外部稳定可靠的系统中,比如分布式文件系统,这样会由于数据的复制备份,磁盘的 I/O 以及数据的序列化而致应用任务执行很费时间。 认识到这个问题后,研究者们已经为一些需要中间数据复用的应用开发出了一些特殊的框架。比如Pregel 在做迭代式图计算的时候会将中间结果放在内存中。 HaLoop也提供了迭代式MapReduce接口。然而,这些框架仅仅支持一些特殊的计算模式(比如循环一系列的 MapReduce 步骤),并且它们是隐式的为些计算模式提供数据共享。 它们没有提供更加抽象一般的数据复用,比如可以让用户加载几个数据集到内存中然后对这些数据集进行专门的查询。 在这篇论文中,我们提出了一个全新的抽象概念,叫做弹性分布式数据集(RDDs),它可以高效的处理广泛的应用中涉及到的数据用的场景。 RDDs是一个可以容错且并行的数据结构,它可以让用户显式的将中间结果数据集持久在内存中、控制数据集的分区来达到数据存放处理最优以及可以使用丰富的操作指令。

在设计RDDs的时候,主要的挑战是定义一个可以提供高效容错的编程接口,已经存在的分布式内存抽象系统比如 分布式共享内存、键值对存储、数据库 以及 Piccolo,都是提供了基于粒度的更新可变状态(比如表中的cells)的接口。 基于这种接口下,保证容错的方式无非是将数据复备份到多台机器或者记录机器上的更新的日志。 这两种方式在数据密集性的工作任务中都是非常的昂贵的操作的,因为需要通过集群网络传输在机器节点间复制大量的数据,宽带传输数据的速度远远比 RAM 内存慢,而这两种方式会占用大量的存储空间。

与这些系统相反,RDDs提供了基于粗粒度转换(比如 map,filter 以及 join)的接口,可以对多的数据条目应用相同的操作。 这样就可以通过记录来某个数据集的一系列转换(就是这个数据集 lineage)而不是实际真实的数据来达到提供高效的容错机制。 如果一个RDD的分区数据丢失掉了,RDD有足够的信息知道它是从哪 RDDs 转换计算而来的,那么重新计算这个 RDD 所依赖的那个 RDD 对应的区就行了。 因此可以很快且不用通过复制备份方式来恢复丢失的数据。

虽然基于粗粒度的转换一开始看起来似乎受到限制,但是 RDDs 非常适合很多并行计算的应用,因为这些应用都是在大量的数据元素上应用相同的操作方法。 事实上,我们分析表明RDDs 不仅可以高效的表达出目前含括的MapReduce,DryadLINQ,SQL,Pregel 以及 HaLoop 等系统提出的分布式编程模型,而且还能表达它们表达不了的新的应用的计算模型,比如交互型数据挖掘。 我们相信,RDDs 解决那些新的框架提出来计算需求的能力将会成为是 RDD 抽象强大的最有力证据。 我们在Spark 系统中实现了 RDDs,这个系统已经在 UC Berkeley 以及好几个公司中应用于研究和生产应中。 Spark 和 DryadLINQ 类似使用scala编程语言提供了很便利的语言集成接口。 另外,Spark可以通过scala 的解释器来对大数据集进行交互式的查询。 我们相信 spark 是首个允许使用多用途编程语言来进行分布式内存中交互式数据挖掘的系统。

我们通过为基准测试以及用户应用的测试两个方面来评估了 RDDs 和 spark。 我们分析显示,Spark 在迭代应用中可以比 hadoop 快上 20 倍以上、使的现实中的数据分析报表的速度提升了 40 倍以及使的交互式的扫1TB数据集的延迟在 5-7 秒。 更重要的是,为了说明 RDDs 的普遍性,我们基于spark 用相对较小的程序(每个代码只有 200 行代码)实现了 Pregel 和 HaLoop 的编程模型,包括它们使用的数据分布优化。、 本篇论文以 RDDs(§2)和 Spark(§3)的概述开始。然后在(§4)中讨论 了RDDs内部的表达、在(§5)讨论了我们的实现以及在(§6)中讨论了实验结果。 最后,我们讨论了 RDDs 是怎么样来表达现在已存在的几个系统的编程模型(§7)、调查相关工作(§8)以及总结。

2 弹性分布式数据集 (RDDs)

这部分主要讲述 RDDs的概要,首先定义RDDs(§2.1)以及介绍 RDDs 在 spark 中的编程接口 (§2.2),然后我们对 RDDs 和细粒度共享内存抽象进行的对比(§2.3)。最后(§2.4)我们讨论了 RDD 模型的局限性。

2.1 RDD 抽象

形式上,一个 RDD是一个只读,分区的数据集。我们可以通过确定的操作创建一个RDDs,通过稳定的存储系统或其他的 RDDs。 为了区别开 RDDs 的其他操作,我们称这些操作为 转换,比如 map,filter 以及 join 等都是转换操作。 RDDs 并不要始终被具体化,一个 RDD 有足够的信息知道自己是从哪个数据集计算而来的(就是所谓的依赖血统),这是一个非常强大的属性:本质上,一个程序你不能引用一个无法从失败中重新构建的RDD。 最终,用户可以控制 RDDs 的两个方面:数据存储和分区。对于需要复用的 RDD,用户可以明确的选择一个数据存储策略(比如内存存储)。他们也可以基于一个元素的 key 来为 RDD 所有的元素在机器节点间进行数据分区, 这样非常利于数据分布优化,比如给两个数据集进行相同的 hash 分区,然后进行 join,可以提高 join 的性能。

2.2 Spark编程接口

Spark 和 DryadLINQ 和 FlumeJava 相似通过集成编程语言 api 来暴露 RDDs操作,这样的话,每一个数据集就代表一个对象,我们可以调用这个对象中的方法来操作这个对象。

开发人员可以通过对稳定存储的数据进行转换操作(比如 map 和 filter 等)来得到一个或者多个 RDDs。 他们可以对这些 RDDs进行动作操作,这些操作可以是得到应用的结果值或者将结果数据写入到存储系统中,动作包括:count(表示返回这个数据集的元素的个数)、collect(表示返回数据集的所有元素)以及 save(表示将输出结果写入到存储系统中)。 像DryadLINQ ,spark 在定义 RDDs的时候并不会真正的计算,而是要等到对这个 RDDs 触发了 动作 操作才会真正的触发计算,这个称之为 RDDs 的 懒加载 特性,所以我们可以先对 转换 进行组装一系列的 管道,然后再计算。

另外,编程者可以通过调用 RDDs 的 persist 方法来缓存后续需要复用的 RDDs。Spark 默认是将缓存数据放在内存中,但是如果内存不足的话则会写入到磁盘中。用户可以通过 persist 的参数来调整缓存策略,比如只将数据存储在磁盘中或者复制备份数据到多台机器。 最后,用户可以为每一个 RDDs的缓存设置优先级,以达到哪个在内存中的 RDDs 应该首先写道磁盘中。

2.2.1 例子:控制台日志数据挖掘

假设一个 web 服务正发生了大量的错误,然后运维人员想从存储在 hdfs 中的几 TB 的日志中找出错误的原因。运维人员可以通过 spark 将日志中的错误信息加载到分布式的内存中,然后对这些内存中的数据进行查询。她首先需要写下面的 scala 代码:

lines = spark.textFile("hdfs://...")
errors = lines.filter(_.startsWith("ERROR"))
errors.persist()

第一行表示从一个HDFS文件(许多行的文件数据集)上定义了一个RDD,第二行表示基于前面定义的 RDD 进行过滤数据。第三行将过滤后的 RDD 结果存储在内存中,以达到多个对这个共享 RDD 的查询。需要注意的事,filter 的参数是 scala 语法中的闭包。 到目前为止,集群上还没有真正的触发计算。然而,用户可以对RDD进行行动操作,比如对错误信息的计数:

errors.count()

用户也可以继续对RDD进行转换操作,然后计算其结果,比如:

// 计算所有包含MySQL的信息数量
errors.filter(_.contains("MySQL")).count()
// 返回错误信息中含有 "HDFS" 字样的信息中的时间字段的值(假设每行数据的字段是以 tab 来切分的,时间字段是第 3 个字段)
errors.filter(_.contains("HDFS"))
.map(_.split(\t)(3))
.collect()

在对errors第一次做 行动 操作的后,spark会将 errors 的所有分区的数据存储在内存中,这样后面对 errors 的计算速度会有很大的提升。需要注意的是,像 lines 这种基础数据的 RDD 是不会存储在内存中的。因为包含错误信息的数据可能只是整个日志数据的一小部分,所以将包含错误数据的日志放在内存中是比较合理的。 最后,为了说明我们的模型是如何达到容错的,我们在图一种展示了第三个查询的血缘关系图(lineage graph)。在这个查询种,我们以对 lines 进行过滤后的 errors 开始,然后在对 errors 进行了 filter 和 map 操作,最后做了 行动 操作即 collect。Spark 会最后两个 转换 组成一个 管道,然后将这个 管道 分解成一系列的 任务,最后将这些 任务 调度到含有 errors 缓存数据的机器上进行执行。此外,如果 errors 的一个分区的数据丢失了,spark 会对 lines 的相对应的分区应用 filter 函数来重新创建 errors 这个分区的数据。

Aspect RDDs Distr. Shared Mem.
粗粒度或者细粒度 细粒度
粗粒度 细粒度
数据一致性 不重要的(不可变的) 取决于app 或者运行环境
容错 利用lineage达到细粒度且低延迟的容错 需要应用checkpoints(就是需要写磁盘)并且需要程序回滚
计算慢的任务 可以利用备份的任务来解决 很难做到
计算数据的位置 自动的机遇数据本地性 取决于app(runtime是以透明为目标的)
内存不足时的行为 和已经存在的数据流处理系统一样,写磁盘 非常糟糕的性能(内存的交换?)

表一:RDDs 和 Distributed shared memory 对比

2.3 RDD 模型的优势

为了理解作为分布式内存抽象的RDDs的好处,我们在表一中用 RDDs 和分布式共享内存系统(DSM)进行了对比。在所有的 DSM 系统中,应用从一个全局的地址空间中的任意位置中读写数据。 需要注意的是,依据这个定义,我们所说的 DSM 系统不仅包含了传统的共享内存系统,还包含了对共享状态的细粒度写操作的其他系统(比如 Piccolo),以及分布式数据库。DSM是一个很普遍的抽象,但是这个普遍性使得它在商用集群中实现高效且容错的系统比较困难。

RDDs和DSM最主要的区别是,RDDs只能通过粗粒度的转换被创建(或者被写),然而 DSM 允许对每一个内存位置进行读写,这样使都RDDs在 应用中大量写数据受到了限制,但是可以使的容错变的更加高效。特别是,RDDs不需要发生非常耗时的checkpoint操作,因为它可以根据lineage进行恢复数据。而且,只有丢掉了数据的分区才会需要重新计算,并不需要回滚整个程序,并且这些重新计算的任务是在多台机器上并行运算的。

最后,RDDs比 DSM多提供了两个好处。第一,在对RDDs进行大量写操作的过程中,我们可以根据数据的本地性来调度任务以提高性能。第二,如果在scan-base的操作中,且这个时候内存不足以存储这个 RDDs,那么RDDs可以慢慢的从内存中清理掉。在内存中存储不下的分区数据会被写到磁盘中,且提供了和现有并行数据处理系统相同的性能保证。

2.4 不适合使用RDDs的应用程序

经过上面的讨论介绍,我们知道 RDDs非常适合将相同操作应用在整个数据集的所有的元素上的批处理应用。在这些场景下,RDDs可以利用血缘关系图来高效的记住每一个转换的步骤,并且不需要记录大量的数据就可以恢复丢失的分区数据。RDDs不太适合用于需要异步且细粒度的更新共享状态的应用,比如一个 web 应用或者数据递增的 web 爬虫应用的存储系统。对于这些应用,使用传统的纪录更新日志以及对数据进行checkpoint会更加高效。比如使用数据库、RAMCloud、Percolator 以及 Piccolo。我们的目标是给批量分析提供一个高效的编程模型,对于这些异步的应用需要其他的特殊系统来实现。

原文地址: nsdi_spark.pdf

rss facebook twitter github youtube mail spotify lastfm instagram linkedin google google-plus pinterest medium vimeo stackoverflow reddit quora quora