MapTask和ReduceTask运⾏机制、MapReduce的shuffle过程⼀、MapTask运⾏机制详解以及Map任务的并⾏度
整个Map阶段流程⼤体如上图所⽰。简单概述:inputFile通过split被逻辑切分为多个split⽂件,通过Record按⾏读取内容给map(⽤户⾃⼰实现的)进⾏处理,数据被map处理结束之后交给OutputCollector收集器,对其结果key进⾏分区(默认使⽤hash分区),然后写⼊buffer,每个map task都有⼀个内存缓冲区,存储着map的输出结果,当缓冲区快满的时候需要将缓冲区的数据以⼀个临时⽂件的⽅式存放到磁盘,当整个map task结束后再对磁盘中这个map task产⽣的所有临时⽂件做合并,⽣成最终的正式输出⽂件,然后等待reduce task来拉数据。
详细步骤:
1、⾸先,读取数据组件InputFormat(默认TextInputFormat)会通过getSplits⽅法对输⼊⽬录中⽂件进⾏逻辑切⽚规划得到splits,有多少个split就对应启动多少个MapTask。默认情况下split与block的对应关系默认是⼀对⼀。
2、将输⼊⽂件切分为splits之后,由RecordReader对象(默认LineRecordReader)进⾏读取,以\n作为分隔符,读取⼀⾏数据,返回<key,value>。Key表⽰每⾏⾸字符偏移值,value表⽰这⼀⾏⽂本内容。
3、读取split返回<key,value>,进⼊⽤户⾃⼰继承的Mapper类中,执⾏⽤户重写的map函数。RecordReader读取⼀⾏⽤户重写的map 调⽤⼀次,并输出⼀个<key,value>。
4、Map输出的数据会写⼊内存,内存中这⽚区域叫做环形缓冲区,缓冲区的作⽤是批量收集map结果,减少磁盘IO的影响。key/value对以及Partition的结果都会被写⼊缓冲区。当然写⼊之前,key与value值都会被序列化成字节数组。
环形缓冲区其实是⼀个数组,数组中存放着key、value的序列化数据和key、value的元数据信息,包括partition、key的起始位置、value 的起始位置以及value的长度。环形结构是⼀个抽象概念。
缓冲区是有⼤⼩限制,默认是100MB。当map task的输出结果很多时,就可能会撑爆内存,所以需要在⼀定条件下将缓冲区中的数据临时写⼊磁盘,然后重新利⽤这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill,中⽂可译为溢写。这个溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。溢写线程启动时不应该阻⽌map的结果输出,所以整个缓冲区有个溢写的⽐例spill.percent。这个⽐例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执⾏溢写过程。Map task的输出结果还可以往剩下的20MB内存中写,互不影响。李连杰的爷爷是谁
5、合并溢写⽂件:每次溢写会在磁盘上⽣成⼀个临时⽂件(写之前判断是否有combiner),如果map
的输出结果真的很⼤,有多次这样的溢写发⽣,磁盘上相应的就会有多个临时⽂件存在。当整个数据处理结束之后开始对磁盘中的临时⽂件进⾏merge合并,因为最终的⽂件只有⼀个,写⼊磁盘,并且为这个⽂件提供了⼀个索引⽂件,以记录每个reduce对应数据的偏移量。
⾄此map整个阶段结束。
mapTask的⼀些基础设置配置(l当中社会):
读取配置文件失败设置⼀:设置环型缓冲区的内存值⼤⼩(默认设置如下)
朴嘉熙朴有天mapreduce.task.io.sort.mb:100
设置⼆:设置溢写百分⽐(默认设置如下)
mapreduce.map.sort.spill.percent:0.80
设置三:设置溢写数据⽬录(默认设置)
mapreduce.cluster.local.dir:${p.dir}/mapred/local
设置四:设置⼀次最多合并多少个溢写⽂件(默认设置如下)
mapreduce.task.io.sort.factor:10
⼆、 ReduceTask ⼯作机制以及reduceTask的并⾏度
Reduce⼤致分为copy、sort、reduce三个阶段,重点在前两个阶段。copy阶段包含⼀个eventFetcher来获取已完成的map列表,由Fetcher线程去copy数据,在此过程中会启动两个merge线程,分别为inMemoryMerger和onDiskMerger,分别将内存中的数据merge 到磁盘和将磁盘中的数据进⾏merge。待数据copy完成之后,copy阶段就完成了,开始进⾏sort阶段,sort阶段主要是执⾏finalMerge操作,纯粹的sort阶段,完成之后就是reduce阶段,调⽤⽤户定义的reduce函数进⾏处理。
详细步骤:
1、Copy阶段,简单地拉取数据。Reduce进程启动⼀些数据copy线程(Fetcher),通过HTTP⽅式请求maptask获取属于⾃⼰的⽂件。
2、Merge阶段。这⾥的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放⼊内存缓冲区中,这⾥的缓冲区⼤⼩要⽐map端的更为灵活。merge有三种形式:内存到内存;内存到磁盘;磁盘到磁盘。默认情况下第⼀种形式不启⽤。当内存中的数据量到达⼀定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启⽤的,然后在磁盘中⽣成了众多的溢写⽂件。第⼆种merge⽅式⼀直在运⾏,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge⽅式⽣成最终的⽂件。
3、合并排序。把分散的数据合并成⼀个⼤的数据后,还会再对合并后的数据排序。
4、对排序后的键值对调⽤reduce⽅法,键相等的键值对调⽤⼀次reduce⽅法,每次调⽤会产⽣零个或者多个键值对,最后把这些输出的键值对写⼊到HDFS⽂件中。
5、map逻辑完之后,将map的每条结果通过context.write进⾏collect数据收集。在collect中,会先对其进⾏分区处理,默认使⽤HashPartitioner。
MapReduce提供Partitioner接⼝,它的作⽤就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。默认对key hash后再以reduce task数量取模。默认的取模⽅式只是为了平均reduce的处理能⼒,如果⽤户⾃⼰对Partitioner 有需求,可以订制并设置到job上。
6、当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是MapReduce模型默认的⾏为,这⾥的排序也是对序列化的字节做的排序。
如果job设置过Combiner,那么现在就是使⽤Combiner的时候了。将有相同key的key/value对的value加起来,减少溢写到磁盘的数据量。Combiner会优化MapReduce的中间结果,所以它在整个模型中会多次使⽤。刀剑封魔录上古传说攻略
哪些场景才能使⽤Combiner呢?从这⾥分析,Combiner的输出是Reducer的输⼊,Combiner绝不能改变最终的计算结果。Combiner 只应该⽤于那种Reduce的输⼊key/value与输出key/value类型完全⼀致,且不影响最终结果的场景。⽐如累加,最⼤值等(求平均值绝不能⽤Combiner)。Combiner的使⽤⼀定得慎重,如果⽤好,它对job执⾏效率有帮助,反之会影响reduce的最终结果。
MapReduce总体⼯作机制
倩女幽魂歌曲宁财神老婆程娇娥三、shuffle
核⼼机制:数据分区,排序,分组,ComBine,合并等过程)。
shuffle是Mapreduce的核⼼,它分布在Mapreduce的map阶段和reduce阶段。⼀般把从Map产⽣输出开始到Reduce取得数据作为输⼊之前的过程称作shuffle。
维度⼀、流程维度(从Map输出到Reduce输⼊)
维度⼆,内存维度(从Map输出到Reduce输⼊)
发布评论