Shuffle是mapreduce编程模型的关键步骤,同样也是在程序运行中主要需要进行调优的阶段,因此更好的理解shuffle原理,才能够在程序中找到需要优化的关键点。
在理解Spark shuffle前先理解标准的mapreduce编程模型中使用的shuffle原理,也就是Hadoop中的MapReduce计算框架。
在mapreduce中默认每一个输入到reducer中的数据都根据key已经排好序了,系统将map端的输出进行排好序,reducer端拉取到数据这一过程就是shuffle。
map函数输出的结果,并不是简单的写入磁盘,针对每一个map任务都有一个环形缓冲区用于存储结果,在缓冲区被填满时溢写磁盘,在这之前要根据数据最终要传入的reducer把数据划分成对应的partition,在每个partition中再根据key进行排序。如果此时我们有指定combine函数,那么他们将会把拍好序的结果执行combine操作,之后再写入磁盘,否则将直接写入磁盘。
此时来到reducer端,map的输出数据在map任务的tasktracker的本地磁盘,那么将根据tasktracker分区文件运行reducer任务,这里需要注意的是,每个reducer任务 是在map任务运行完成后才开始运行的,也就是说每一个分区文件都已经完全生成好了(这也是mapreduce效率较低的其中原因之一),reducer任务启动同时开始复制线程进行查找map输出文件的并进行复制(如何查找到map输出文件的位置?:每个map输出完成后都会向application master会报信息,这其中包括了输出文件的位置。)。复制好map输出结果后,reducer开始合并阶段,将每一个map结果进行合并,同时会保持map已经做好的顺序进行排序。
在spark中的shuffle处理过程同样是基于迭代的方式,早期版本中spark没有采用hadoop在reducer获取前全部排序的方式。而是提供了基于hash的shuffle操作(HashShuffleManager)。
不过这种方式存在一个较大的缺点就是在ShuffleMapTask数据生成时会产生大量的小文件,以及占用较大的内存缓存开销。为了解决这个问题,spark在版本演化的过程中几次优化了Shuffle机制。
早期版本中spark的每个ShuffleMapTask会根据ResultTask也就是reducetask的数量创建bucket,bucket数量就是ShuffleMapTask*ResultTask的数量,在当前的机制中bucket是一个抽象概念,每个bucket对应一个生成文件。不过这种方式存在较大的问题。
假设当前存在四个ShuffleMapTask任务,但是只提供了两个Executor的CPU core,如上图所示,那么在任务运行中假设先执行了1和3,此时根据ResultTask生成了六个分区数据文件,而写个文件中的数据并不是有序的,执行完成后,在执行2和4 task,此时又生成了六个分区数据文件,至此整个任务一共产生了ShuffleMapTask*ResultTask个分区数据文件。
根据普通的hash shuffle,可以看到其产生大量小文件的缺点,于是提供了优化后的hash shuffle,在新的策略中提出了consolidate
机制,减少了一部分小文件的产生,不过这种策略还是不够完美。
consolidate机制,简单来说就是复用buffer缓冲区,合并一些key相同的数据到一个分区(注意此时合并后文件依然无序的),然后对同一个分区生成一份文件,这样就能够缓解普通hash shuffle出现的大量小文件的问题。
具体如何优化的呢,假设当前存在六个ShuffleMapTask,提供了两个cpu core,同时存在三个ResultTask(三个分区)。
根据spark任务的分时执行,可以假设第一个和第四个任务先执行,这样就先生成了六个bucket,每个cpu core对应三个,也就是说会生成六个文件。
此时在执行2和5任务,不过这次不会再生成新的bucket也就是说不会有新的文件生成,他们会复用内存中已经存在的bucket,继续向其对应的分区进行追加。同样3和6任务也是同样向其追加,当整个ShuffleMapTask都执行完成后,可以看到整体的文件数量变少了很多,新的文件数量可以根据core*ResultTask计算。
不过这种策略也不够完美,当ResultTask分区过多时,必有有1000个ResultTask那么也会出现较多的小文件,所以也就引入了新的shuffle机制。
当前spark版本中已经完全抛弃掉了HashShuffleManager
为了缓解hash shuffle产生大量文件,以及文件句柄浪费的大量内存。spark在1.1版本中借鉴Hadoop Shuffle中的排序机制,并且对生成的结果使用了合并、索引机制。
具体是如何操作的呢?在hash shuffle中每一个bucket使用hashmap存储,他是无序的,而在当前机制中map中使用key进行分区,map的value使用Array来进行存储每一个数据,当array的数据量达到一定量时就会被写入磁盘,而在溢写磁盘之前先对该部分数据进行排序。 而排序后的文件会按照默认的数据量分批写入磁盘(5w),每次分批写入一个磁盘都会产生一个临时文件,最后当这个任务完成后会将所有的文件merge,合并为一个文件,同时对每一个临时文件的起始位置信息,再创建一个索引文件,这样就能够在拉取时找到指定的数据段位置。
这样一来ShuffleMapTask端产生的文件数量将变得更少,也就是有多少个cpu core就会有多少个文件生成。
优点:
该策略与普通的sort shuffle机制只是略去了ShuffleMapTask端的排序过程,其他过程没有变化,最后都会合并成一个文件。
该策略的运行条件:
在shuffleMapTask数量小于默认值200时,启用bypass模式的sortShuffle(原因是数据量本身比较少,没必要进行sort全排序,因为数据量少本身查询速度就快,正好省了sort的那部分性能开销。)
前边侧重介绍ShuffleMapTask端结果生成的过程,也就是所谓的map端写数据过程。在ResultTask端进行读取时整体流程是相同的,都是ResultTask端去ShuffleMapTask端拉取对应的数据到本地,然后开始执行任务。 这一过程中针对不同的shuffle写机制具体实现有不同的疑问:
至此所有关于Spark Shuffle的原理分析完成,其中只需要重点关注SortShuffle这一种机制即可,因为在Spark已经完全移除了HashShuffle,并且将Tungsten-Sort合并入SortShuffle,我们需要清楚Shuffle机制的作用是什么,使用SortShuffle能够带来什么好处,以及Shuffle的读写过程中具体的流程如何,这样方便在后续进行Shuffle优化时能够准确优化对应的细节。
以下节选自 [Spark Shuffle的技术演进] 作者:LeonLu 链接:https://www.jianshu.com/p/4c5c2e535da5
在具体的实现上,Shuffle经历了Hash、Sort、Tungsten-Sort三阶段:
在Shuffle Write过程按照Hash的方式重组Partition的数据,不进行排序。每个map端的任务为每个reduce端的Task生成一个文件,通常会产生大量的文件(即对应为M*R个中间文件,其中M表示map端的Task个数,R表示reduce端的Task个数),伴随大量的随机磁盘IO操作与大量的内存开销。 Shuffle Read过程如果有combiner操作,那么它会把拉到的数据保存在一个Spark封装的哈希表(AppendOnlyMap)中进行合并。 在代码结构上:
org.apache.spark.storage.ShuffleBlockManager
负责Shuffle Write
org.apache.spark.BlockStoreShuffleFetcher
负责Shuffle Read
org.apache.spark.Aggregator
负责combine,依赖于AppendOnlyMap
通过文件合并,中间文件的生成方式修改为每个执行单位(一个Executor中的执行单位等于Core的个数除以每个Task所需的Core数)为每个reduce端的任务生成一个文件。最终可以将文件个数从MR修改为EC/T*R,其中,E表示Executor的个数,C表示每个Executor中可用Core的个数,T表示Task所分配的Core的个数。
是否采用Consolidate机制,需要配置spark.shuffle.consolidateFiles
参数
在combine的时候,可以将数据spill到磁盘,然后通过堆排序merge
在Sort Based Shuffle的Shuffle Write阶段,map端的任务会按照Partition id以及key对记录进行排序。同时将全部结果写到一个数据文件中,同时生成一个索引文件,reduce端的Task可以通过该索引文件获取相关的数据。 在代码结构上:
从以前的ShuffleBlockManager中分离出ShuffleManager来专门管理Shuffle Writer和Shuffle Reader。两种Shuffle方式分别对应
org.apache.spark.shuffle.hash.HashShuffleManager
和org.apache.spark.shuffle.sort.SortShuffleManager
,
可通过spark.shuffle.manager参数配置。两种Shuffle方式有各自的ShuffleWriter:
org.apache.spark.shuffle.hash.HashShuffleReader
。
org.apache.spark.util.collection.ExternalSorter
实现排序功能。可通过对spark.shuffle.spill参数配置,决定是否可以在排序时将临时数据Spill到磁盘。将数据记录用序列化的二进制方式存储,把排序转化成指针数组的排序,引入堆外内存空间和新的内存管理模型,这些技术决定了使用Tungsten-Sort要符合一些严格的限制,比如Shuffle dependency不能带有aggregation、输出不能排序等。由于堆外内存的管理基于JDK Sun Unsafe API,故Tungsten-Sort Based Shuffle也被称为Unsafe Shuffle。 在代码层面:
新增org.apache.spark.shuffle.unsafe.UnsafeShuffleManager
新增org.apache.spark.shuffle.unsafe.UnsafeShuffleWriter
(用java实现)
ShuffleReader复用HashShuffleReader
由SortShuffleManager自动判断选择最佳Shuffle方式,如果检测到满足Tungsten-sort条件会自动采用Tungsten-sort Based Shuffle,否则采用Sort Based Shuffle。 在代码方面:
UnsafeShuffleManager合并到SortShuffleManager HashShuffleReader 重命名为BlockStoreShuffleReader,Sort Based Shuffle和Hash Based Shuffle仍共用ShuffleReader。 Spark 2.0 Hash Based Shuffle退出历史舞台
从此Spark只有Sort Based Shuffle。