Hadoop和spark为何要对key进行排序

本篇内容介绍了“Hadoop和spark为何要对key进行排序”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

创新互联专业为企业提供山阳网站建设、山阳做网站、山阳网站设计、山阳网站制作等企业网站建设、网页设计与制作、山阳企业网站模板建站服务,十余年山阳做网站经验,不只是建网站,更提供有价值的思路和整体网络服务。

1.思考

只要对hadoopmapreduce的原理清楚的都熟知下面的整个流程运行原理,其中涉及到至少三次排序,分别是溢写快速排序,溢写归并排序,reduce拉取归并排序,而且排序是默认的,即天然排序的,那么为什么要这么做的,设计原因是什么。先给个结论,为了整体更稳定,输出满足多数需求,前者体现在不是采用hashShuffle而是sortShuffle ,后者体现在预计算,要知道排序后的数据,在后续数据使用时的会方便很多,比如体现索引的地方,如reduce拉取数据时候。

Hadoop和spark为何要对key进行排序

2.MapReduce原理分析

在分析设计原因之前,先理解一下整个过程,在map阶段,根据预先定义的partition规则进行分区,map首先将输出写到缓存中,当缓存内容达到阈值时,将结果spill到硬盘,每一次spill都会在硬盘产生一个spill文件,因此一个map task可能会产生多个spill文件,其中在每次spill的时候会对key进行排序。接下来进入shuffle阶段,当map写出最后一个输出,需要在map端进行一次merge操作,按照partitionpartition内的key进行归并排序(合并+排序),此时每个partition内按照key值整体有序。然后开始第二次merge,这次是在reduce端,在此期间数据在内存和磁盘上都有,其实这个阶段的merge并不是严格意义上的排序,也是跟前面类似的合并+排序,只是将多个整体有序的文件merge成一个大的文件,最终完成排序工作。分析完整个过程后,是不是觉得如果自己实现MapReduce框架的话,考虑用HashMap 输出map内容即可。

2.1 MapTask运行机制详解

整个流程图如下:

Hadoop和spark为何要对key进行排序

详细步骤:

  1. 首先,读取数据组件InputFormat(默认TextInputFormat)会通过getSplits方法对输⼊入⽬目录中文件进行逻辑切⽚片规划得到splits,有多少个split就对应启动多少个MapTasksplitblock的对应关系默认是⼀对⼀。

  2. 将输入文件切分为splits之后,由RecordReader对象(默认LineRecordReader)进行读取,以\n作为分隔符,读取⼀行数据,返回Key表示每⾏行行⾸首字符偏移值,value表示这⼀行文本内容。

  3. 读取split返回,进⼊入⽤用户自己继承的Mapper类中,执行用户重写的map函数。RecordReader读取⼀行这里调用一次。

  4. map逻辑完之后,将map的每条结果通过context.write进⾏行行collect数据收集。在collect中,会先对其进行分区处理,默认使用HashPartitionerMapReduce提供Partitioner接口,它的作用就是根据keyvaluereduce的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。默认对key hash后再以reduce task数量量取模。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job上。

  5. 接下来,会将数据写入内存,内存中这⽚片区域叫做环形缓冲区,缓冲区的作用是批量量收集map结果,减少磁盘IO的影响。我们的key/value对以及Partition的结果都会被写⼊入缓冲区。当然写⼊入之前,keyvalue值都会被序列列化成字节数组

    • 环形缓冲区其实是一个数组,数组中存放着keyvalue的序列化数据和keyvalue的元数据信息,包括partitionkey的起始位置、value的起始位置以及value的长度。环形结构是一个抽象概念。

    • 缓冲区是有大小限制,默认是100MB。当map task的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写⼊入磁盘,然后重新利利⽤用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill,中文可译为溢写。这个溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。溢写线程启动时不不应该阻⽌map的结果输出,所以整个缓冲区有个溢写的⽐比例例spill.percent。这个⽐比例例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size * spillpercent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程Maptask的输出结果还可以往剩下的20MB内存中写,互不不影响、

  6. 当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是MapReduce模型默认的⾏行行为!

    • 如果job设置过Combiner,那么现在就是使⽤用Combiner的时候了了。将有相同keykey/value对的value加起来,减少溢写到磁盘的数据量量。Combiner会优化MapReduce的中间结果,所以它在整个模型中会多次使用。

    • 那哪些场景才能使⽤用Combiner呢?从这⾥里里分析,Combiner的输出是Reducer的输⼊,Combiner绝不不能改变最终的计算结果。Combiner只应该⽤用于那种Reduce的输入key/value与输出key/value类型完全一致,且不不影响最终结果的场景。⽐比如累加,最⼤大值等。Combiner的使⽤用一定得慎重如果用的好,它对job执⾏行行效率有帮助,反之会影响reduce的最终结果

  7. 合并溢写文件:每次溢写会在磁盘上生成一个临时文件(写之前判断是否有combiner),如果map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个临时文件存在。当整个数据处理理结束之后开始对磁盘中的临时文件进⾏行行merge合并,因为最终文件只有一个,写⼊磁盘,并且为这个文件提供了一个索文件,以记录每个reduce对应数据的偏移量量。

2.2 ReduceTask运行机制详解

Hadoop和spark为何要对key进行排序

Reduce⼤大致分为copysortreduce三个阶段,重点在前两个阶段。copy阶段包含⼀一个 eventFetcher来获取已完成的map列列表,由Fetcher线程去copy数据,在此过程中会启动两个merge线程,分别为inMemoryMergeronDiskMerger,分别将内存中的数据merge到磁盘和将磁盘中的数据进⾏merge。待数据copy完成之后,copy阶段就完成了,开始进⾏行行sort阶段,sort阶段主要是执⾏finalMerge操作,纯粹的sort阶段,完成之后就是reduce阶段,调⽤用⽤用户定义的reduce函数进⾏处理。 详细步骤

2.2.1 Copy阶段

简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求maptask获取属于自己的文件。

2.2.2 Merge阶段

Merge阶段。这⾥里里的mergemap端的merge动作,只是数组中存放的是不不同mapcopy来的数值。Copy过来的数据会先放入内存缓冲区中,这⾥里里的缓冲区大小要⽐比map端的更更为灵活。merge有三种形式:内存到内存;内存到磁盘;磁盘到磁盘。默认情况下第⼀一种形式不不启⽤用。当内存中的数据量量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启⽤用的,然后在磁盘中生成了了众多的溢写文件。第二种merge方式⼀一直在运⾏行行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的文件。

2.2.3 合并排序

把分散的数据合并成一个⼤大的数据后,还会再对合并后的数据排序。对排序后的键值对调⽤用reduce方法,键相等的键值对调⽤用一次reduce方法,每次调⽤用会产生零个或者多个键值对,最后把这些输出的键值对写入到HDFS文件中。

“Hadoop和spark为何要对key进行排序”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注创新互联网站,小编将为大家输出更多高质量的实用文章!


本文名称:Hadoop和spark为何要对key进行排序
分享路径:http://scyanting.com/article/ispddo.html