24. + PARTITION) == i) { 25. ++spindex; 26. } 27. // Note: we would like to avoid the combiner if we've fewer 28. // than some threshold of records for a partition 29. if (spstart != spindex) { 30. combineCollector.setWriter(writer); 31. RawKeyValueIterator kvIter = 32. new MRResultIterator(spstart, spindex); 33. combinerRunner.combine(kvIter, combineCollector); 34. } 35. }</SPAN> for (int i = 0; i < partitions; ++i) { IFile.Writer<K, V> writer = null; try { long segmentStart = out.getPos(); writer = new Writer<K, V>(job, out, keyClass, valClass, codec, spilledRecordsCounter); if (combinerRunner == null) { // spill directly DataInputBuffer key = new DataInputBuffer(); while (spindex < mend && kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) { final int kvoff = offsetFor(spindex % maxRec); key.reset(kvbuffer, kvmeta.get(kvoff + KEYSTART), (kvmeta.get(kvoff + VALSTART) - kvmeta.get(kvoff + KEYSTART))); getVBytesForOffset(kvoff, value); writer.append(key, value); ++spindex; } } else { int spstart = spindex; while (spindex < mend && kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) { ++spindex; } // Note: we would like to avoid the combiner if we've fewer // than some threshold of records for a partition if (spstart != spindex) { combineCollector.setWriter(writer); RawKeyValueIterator kvIter = new MRResultIterator(spstart, spindex); combinerRunner.combine(kvIter, combineCollector); } }如果job没有定义combiner则直接写文件,如果有combiner则在这里进行combine。 在生成spill文件后还会将此次spillRecord的记录写在一个index文件中。
[java] view plaincopyprint? 01.<SPAN style="FONT-SIZE: 18px">Path indexFilename = 02. mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions 03. * MAP_OUTPUT_INDEX_RECORD_LENGTH); 04. spillRec.writeToFile(indexFilename, job);</SPAN> Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH); spillRec.writeToFile(indexFilename, job);[java] view plaincopyprint? 01.<SPAN style="FONT-SIZE: 18px">rec.startOffset = segmentStart; 02. rec.rawLength = writer.getRawLength(); 03. rec.partLength = writer.getCompressedLength(); 04. spillRec.putIndex(rec, i);</SPAN> rec.startOffset = segmentStart; rec.rawLength = writer.getRawLength(); rec.partLength = writer.getCompressedLength(); spillRec.putIndex(rec, i); 5.merge 当mapper执行完毕后,就进入merge阶段。首先看下相关的配置参数: [java] view plaincopyprint? 01.<SPAN style="FONT-SIZE: 18px">int mergeFactor = job.getInt(JobContext.IO_SORT_FACTOR, 100);</SPAN> int mergeFactor = job.getInt(JobContext.IO_SORT_FACTOR, 100);mergeFactor:同时merge的文件数。 merge阶段的目的是将多个spill生成的中间文件合并为一个输出文件,这里的合并不同于combiner,无论有没有配置combiner这里的merge都会执行。merge阶段的输出是一个数据文件MapFinalOutputFile和一个index文件。看下相关代码: [java] view plaincopyprint? 01.<SPAN style="FONT-SIZE: 18px">RawKeyValueIterator kvIter = Merger.merge(job, rfs, 02. keyClass, valClass, codec, 03. segmentList, mergeFactor, 04. new Path(mapId.toString()), 05. job.getOutputKeyComparator(), reporter, sortSegments, 06. null, spilledRecordsCounter, sortPhase.phase()); 07. 08. //write merged output to disk 09. long segmentStart = finalOut.getPos(); 10. Writer<K, V> writer = 11. new Writer<K, V>(job, finalOut, keyClass, valClass, codec, 12. spilledRecordsCounter); 13. if (combinerRunner == null || numSpills < minSpillsForCombine) { 14. Merger.writeFile(kvIter, writer, reporter, job); 15. } else { 16. combineCollector.setWriter(writer); 17. combinerRunner.combine(kvIter, combineCollector); 18. }</SPAN> RawKeyValueIterator kvIter = Merger.merge(job, rfs, keyClass, valClass, codec, segmentList, mergeFactor, new Path(mapId.toString()), job.getOutputKeyComparator(), reporter, sortSegments, null, spilledRecordsCounter, sortPhase.phase());
//write merged output to disk long segmentStart = finalOut.getPos(); Writer<K, V> writer = new Writer<K, V>(job, finalOut, keyClass, valClass, codec, spilledRecordsCounter); if (combinerRunner == null || numSpills < minSpillsForCombine) { Merger.writeFile(kvIter, writer, reporter, job); } else { combineCollector.setWriter(writer); combinerRunner.combine(kvIter, combineCollector); }说下merge的算法。每个spill生成的文件中keyvalue都是有序的,但不同的文件却是乱序的,类似多个有序文件的多路归并算法。Merger分别取出需要merge的spillfile的最小的keyvalue,放入一个内存堆中,每次从堆中取出一个最小的值,并把此值保存到merge的输出文件中。这里和hbase中scan的算法非常相似,在分布式系统中多路归并排序真是当红小生啊! 这里merge时不同的partition的key是不会比较的,只有相同的partition的keyvalue才会进行排序和合并。最后的输出文件类似下图。 如果用户定义了combiner,在merge的过程中也会进行combine,因为虽然第四步中combine过但那只是部分输入的combine,在merge时仍然需要combine。这里有人问了,既然这里有combiner,为啥在spill输出时还要combine纳,我认为是因为每次combine都会大大减少输出文件的大小,spill时就combine能减少一定的IO操作。 在merge完后会把不同partition的信息保存进一个index文件以便之后reducer来拉自己部分的数据。 [java] view plaincopyprint? 01.<SPAN style="FONT-SIZE: 18px">// record offsets 02. rec.startOffset = segmentStart; 03. rec.rawLength = writer.getRawLength(); 04. rec.partLength = writer.getCompressedLength(); 05. spillRec.putIndex(rec, parts);</SPAN> 毕业论文 // record offsets rec.startOffset = segmentStart; rec.rawLength = writer.getRawLength(); rec.partLength = writer.getCompressedLength(); spillRec.putIndex(rec, parts); 最后,我们再对mapper过程中的要点总结一下: 1.对map输出<key,value>的分区(partition)是在写入内存buf前就做好的了,方法是对key的hash。我们可以通过继承Partitioner类自己实现分区,将自己想要的数据分到同一个reducer中。 2.写入内存buf速度是非常快的,但spill过程会block写入。因此,对内存buf相关参数的调优是mapreduce调优的重点之一。 3.对数据的排序是基于MapOutKey排序的,因此,我们可以重载对应的方法实现customize的排序顺序 4.combine在spill和merge中都是进行。多次的combine会减少mapreduce中的IO操作,如果使用得当会很好的提高性能。但需要注意的是要深刻理解combine的意义,比如平均值就不适合用combine。 上一页 [1] [2] [3]
|