— spark — 1 min read
Spark编程模型就是RDD(Resilient distributed dataset)弹性分布式数据集,他是MapReduce编程模型的拓展和延伸,但是他解决了MapReduce的痛点,也就是计算任务的结果如何进行高效的共享。
Spark使用内存计算的模式以及类似与Mapreduce的模型,使得大数据的并行计算得到了提升。而Spark基于RDD模型也同样实现了多类模型计算,如迭代计算、交互式sql、流式数据计算等。
RDD是 Spark 中的最基础的抽象,代表一个不可变、可分区、可以被并行计算的元素集合,被计算的 RDD全部的缓存在内存中。
关于RDD的详细信息,先从Spark源码中的注释开始说起。在源码中提到了五点属性:
1/**2 * Internally, each RDD is characterized by five main properties:3 *4 * - A list of partitions5 * - A function for computing each split6 * - A list of dependencies on other RDDs7 * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)8 * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for9 * an HDFS file)10 */
这也是 Spark 实现并行计算的基础,通过对 RDD 的 partition 使一个 RDD 的数据集合分布在整个 spark 任务集群中,通过调度器进行任务的调度。
一个RDD包含了多个分区,每个分区包含了RDD的部分数据,一个分区也叫做一个partition,这个partition可以在创建RDD时指定,也可以使用默认值。在spark的任务执行阶段每个partition会被当做一个Task处理,关于Task是什么,后续详解。
spark中关于rdd的计算是针对每一个partition的,每个rdd都会有一个compute函数进行计算,compute函数内部使用迭代器方式。
这其中包括了宽依赖和窄依赖,正是因为依赖关系,形成了 RDD的 lineage,这也是 RDD 容错的基础
[key,value]
结构,可以选择指定针对 key 的分区策略也就是指定分区器,在 Spark 中提供了几种不同的分区器(e.g. HashPartitioner)。
主要是为了提高数据本地性,减少数据块的网络传输。
总的来说RDD
是spark
内部,只读的支持分区及容错(通过lineage),可以并行计算的基于内存数据结构。
RDD 诞生于 SparkContext,之前介绍了 SparkContext 的环境初始化工作(e.g 初始化DAGScheduler、TaskScheduler等),这只是他两个主要工作中的一个,另一个就是负责创建 RDD。 在 SparkContext 的伴生类中,实现了所有创建 RDD 的方法:
1def parallelize[T: ClassTag](2 seq: Seq[T],3 numSlices: Int = defaultParallelism): RDD[T] = withScope {4 assertNotStopped()5 new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())6}7
8def makeRDD[T: ClassTag](9 seq: Seq[T],10 numSlices: Int = defaultParallelism): RDD[T] = withScope {11 parallelize(seq, numSlices)12} 13def textFile(14 path: String,15 minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {16 assertNotStopped()17 hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],18 minPartitions).map(pair => pair._2.toString).setName(path)19}20.21.22.
进入 RDD 内部,RDD 作为一个抽象类,与其子类构成了我们熟知的模板方法模式,也就是说RDD 抽象类内部定义了基础方法 交由其子类实现,模板方法在抽象类内部实现,同时还包含了一些 hook 方法.这么做既保证了他的基础操作不会被随意更改,同时在面对新的RDD 结构添加进来时更方便拓展.
根据已经存在的RDD转换生成一个新的RDD,转换算子都是lazy操作,并不会马上执行,只有在遇到action算子时才会被真正的执行。
转换 | 含义 |
---|---|
map(func) | 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成 |
filter(func) | 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成 |
flatMap(func) | 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素) |
mapPartitions(func) | 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) | 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U] |
union(otherDataset) | 对源RDD和参数RDD求并集后返回一个新的RDD |
intersection(otherDataset) | 对源RDD和参数RDD求交集后返回一个新的RDD |
distinct([numTasks])) | 对源RDD进行去重后返回一个新的RDD |
groupByKey([numTasks]) | 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD |
reduceByKey(func, [numTasks]) | 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置 |
sortByKey([ascending], [numTasks]) | 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD |
sortBy(func,[ascending], [numTasks]) | 与sortByKey类似,但是更灵活 |
join(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD |
cogroup(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable[V],Iterable[W]))类型的RDD |
coalesce(numPartitions) | 减少 RDD 的分区数到指定值。 |
repartition(numPartitions) | 重新给 RDD 分区 |
repartitionAndSortWithinPartitions(partitioner) | 重新给 RDD 分区,并且每个分区内以记录的 key 排序 |
将rdd的计算的结果数据返回给Driver端,或者是保存结果数据到外部存储介质中,action算子会触发任务的执行。
动作 | 含义 |
---|---|
reduce(func) | reduce将RDD中元素前两个传给输入函数,产生一个新的return值,新产生的return值与RDD中下一个元素(第三个元素)组成两个元素,再被传给输入函数,直到最后只有一个值为止。 |
collect() | 在驱动程序中,以数组的形式返回数据集的所有元素 |
count() | 返回RDD的元素个数 |
first() | 返回RDD的第一个元素(类似于take(1)) |
take(n) | 返回一个由数据集的前n个元素组成的数组 |
takeOrdered(n, [ordering]) | 返回自然顺序或者自定义顺序的前 n 个元素 |
saveAsTextFile(path) | 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本 |
saveAsSequenceFile(path) | 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。 |
saveAsObjectFile(path) | 将数据集的元素,以 Java 序列化的方式保存到指定的目录下 |
countByKey() | 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。 |
foreach(func) | 在数据集的每一个元素上,运行函数func |
foreachPartition(func) | 在数据集的每一个分区上,运行函数func |
接下来我们以简单的transformation算子map
操作为例详细分析:
1val context = new SparkContext(conf)2val listRdd = Array(1, 2, 3, 5, 8, 6)3context.parallelize(listRdd).map(_ * 2).foreach(println(_))
首先在context创建后,Spark 使用parallelize根据本地的集合创建了一个平行化的集合 RDD ,叫做 ParallelCollectionRDD
同样他要是继承自 RDD。
接下来对创建好的 RDD 调用抽象类中的模板方法map()
, 这样就构建出一个 RDD 子类 MapPartitionsRDD
.
1def map[U: ClassTag](f: T => U): RDD[U] = withScope {2 val cleanF = sc.clean(f)3 new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))4}
MapPartitionsRDD
内部,可以看到他覆写了 RDD 中的几个基础方法,同样在其他的 RDD 的子类中也都是同样的方式进行了定制化.1override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None2override def getPartitions: Array[Partition] = firstParent[T].partitions3
4// 这里主要介绍compute()这个方法,这是在执行计算阶段最重要的方法.是所有 Executor 执行的基础5
6override def compute(split: Partition, context: TaskContext): Iterator[U] =7 //用于将(taskContext、partition index、iterator)的元组映射到输出迭代器的函数。8 f(context, split.index, firstParent[T].iterator(split, context))9
10override def clearDependencies() {11}12@transient protected lazy override val isBarrier_ : Boolean =13override protected def getOutputDeterministicLevel = {14}
走到这里Transform 阶段已经完成,当然在实际使用过程中,肯定还包含了,其他复杂度转换操作,例如
flatmap
,filter
,distinct
等操作.不过当前任务并没有提交执行,在RDD 源码注释中可以看到这句话launch a job to return a value to the user program
,也就是说在 Spark 中,所有的执行任务都是在调用了 Action 操作后才会开始执行的.以例子中的.foreach(println(_))
为例.
collect
,subtract
等一些方法.foreach()
将我们传递进去的函数,应用在当前这个 RDD 中的所有元素上,并将这个操作作为一个 job 进行提交给DAGScheduler
,DAGScheduler
将起划分为一个一个的stage(这其中需要记住stage划分算法),再将每一个stage转化为TaskSet,交由TaskScheduler
处理,之后由TaskScheduler
的后台进程SchedulerBackend
将一个个的task(task分配算法)提交给 Spark 分配的 Executor 进行执行任务!1def foreach(f: T => Unit): Unit = withScope {2 val cleanF = sc.clean(f)3 sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))4}
Spark正是以RDD为数据模型,完成了整个分布式的内存计算。对比来看Spark相较于Hadoop的优点一部分原因取决于RDD的使用。