10. if (spillper > (float)1.0 || spillper <= (float)0.0) { 11. throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT + 12. "\": " + spillper); 13. } 14. if ((sortmb & 0x7FF) != sortmb) { 15. throw new IOException( 16. "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb); 17. } 18. sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class", 19. QuickSort.class, IndexedSorter.class), job);</SPAN> partitions = job.getNumReduceTasks(); rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();
//sanity checks final float spillper = job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8); final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100); indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT, INDEX_CACHE_MEMORY_LIMIT_DEFAULT); if (spillper > (float)1.0 || spillper <= (float)0.0) { throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT + "\": " + spillper); } if ((sortmb & 0x7FF) != sortmb) { throw new IOException( "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb); } sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class", QuickSort.class, IndexedSorter.class), job);partition:mapper的数据需要分配到reduce端的个数,由用户的job指定,默认为1. spillper:内存buf使用到此比例就会触发spill,将内存中的数据flush成一个文件。默认为0.8 sortmb:内存buf的大小,默认100MB indexCacheMemoryLimit:内存index的大小。默认为1024*1024 sorter:对mapper输出的key的排序,默认是快排 内存buffer比较复杂,贴一张图介绍一下这块内存buf的结构:
当一对keyvalue写入时首先会从wrap buf的右侧开始往左写,同时,会把一条keyvalue的meta信息(partition,keystart,valuestart)写入到最左边的index区域。当wrap buf大小达到spill的触发比例后会block写入,挖出一部分数据开始spill,直到spill完成后才能继续写,不过写入位置不会置零,而是类似循环buf那样,在spill掉数据后可以重复利用内存中的buf区域。 这里单独讲一下partition: [java] view plaincopyprint? 01.<SPAN style="FONT-SIZE: 18px">@Override 02. public void write(K key, V value) throws IOException, InterruptedException { 03. collector.collect(key, value, 04. partitioner.getPartition(key, value, partitions)); 05. }</SPAN> @Override public void write(K key, V value) throws IOException, InterruptedException { collector.collect(key, value, partitioner.getPartition(key, value, partitions)); }在keyvalue对写入MapOutputBuffer时会调用partitioner.getPartition方法计算partition即应该分配到哪个reducer,这里的partition只是在内存的buf的index区写入一条记录而已,和下一个部分的partition不一样哦。看下默认的partitioner:HashPartition
[java] view plaincopyprint? 01.<SPAN style="FONT-SIZE: 18px">/** Use {@link Object#hashCode()} to partition. */ 02. public int getPartition(K key, V value, 03. int numReduceTasks) { 04. return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; 05. }</SPAN> /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; }HashPartition只是把key hash后按reduceTask的个数取模,因此一般来说,不同的key分配到哪个reducer是随即的!所以,reducer内的所有数据是有序的,但reducer之间的数据却是乱序的!要想数据整体排序,要不只设一个reducer,要不使用TotalOrderPartitioner!
4.Partition Sort Store 在第四步中,partition是和sort一起做的,负责Spill的线程在拿到一段内存buf后会调用QuickSort的sort方法进行内存中的快排。 [java] view plaincopyprint? 01.<SPAN style="FONT-SIZE: 18px">sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);</SPAN> sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);排序的算法是先按keyvalue记录的partition排序后按key的compare方法: [java] view plaincopyprint? 01.<SPAN style="FONT-SIZE: 18px">public int compare(final int mi, final int mj) { 02. final int kvi = offsetFor(mi % maxRec); 03. final int kvj = offsetFor(mj % maxRec); 04. final int kvip = kvmeta.get(kvi + PARTITION); 05. final int kvjp = kvmeta.get(kvj + PARTITION); 06. // sort by partition 07. if (kvip != kvjp) { 08. return kvip - kvjp; 09. } 10. // sort by key 11. return comparator.compare(kvbuffer, 12. kvmeta.get(kvi + KEYSTART), 13. kvmeta.get(kvi + VALSTART) - kvmeta.get(kvi + KEYSTART), 14. kvbuffer, 15. kvmeta.get(kvj + KEYSTART), 16. kvmeta.get(kvj + VALSTART) - kvmeta.get(kvj + KEYSTART)); 17. }</SPAN> public int compare(final int mi, final int mj) { final int kvi = offsetFor(mi % maxRec); final int kvj = offsetFor(mj % maxRec); final int kvip = kvmeta.get(kvi + PARTITION); final int kvjp = kvmeta.get(kvj + PARTITION); // sort by partition if (kvip != kvjp) { return kvip - kvjp; } // sort by key return comparator.compare(kvbuffer, kvmeta.get(kvi + KEYSTART), kvmeta.get(kvi + VALSTART) - kvmeta.get(kvi + KEYSTART), kvbuffer, kvmeta.get(kvj + KEYSTART), kvmeta.get(kvj + VALSTART) - kvmeta.get(kvj + KEYSTART)); }因此,mapper输出的keyvalue首先是按partition聚合。而我们如果指定key的compare方法会在这里生效并进行排序。最后,一次spill的输出文件类似下图。 在对内存中的buf排序后开始写文件。 [java] view plaincopyprint? 01.<SPAN style="FONT-SIZE: 18px">for (int i = 0; i < partitions; ++i) { 02. IFile.Writer<K, V> writer = null; 03. try { 04. long segmentStart = out.getPos(); 05. writer = new Writer<K, V>(job, out, keyClass, valClass, codec, 06. spilledRecordsCounter); 07. if (combinerRunner == null) { 08. // spill directly 09. DataInputBuffer key = new DataInputBuffer(); 10. while (spindex < mend && 11. kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) { 12. final int kvoff = offsetFor(spindex % maxRec); 13. key.reset(kvbuffer, kvmeta.get(kvoff + KEYSTART), 14. (kvmeta.get(kvoff + VALSTART) - 15. kvmeta.get(kvoff + KEYSTART))); 16. getVBytesForOffset(kvoff, value); 17. writer.append(key, value); 18. ++spindex; 19. } 20. } else { 21. int spstart = spindex; 22. while (spindex < mend && 23. kvmeta.get(offsetFor(spindex % maxRec) 上一页 [1] [2] [3] 下一页
|