|  
 
 
24.                            + PARTITION) == i) {   25.                ++spindex;   26.              }   27.              // Note: we would like to avoid the combiner if we've fewer    28.              // than some threshold of records for a partition    29.              if (spstart != spindex) {   30.                combineCollector.setWriter(writer);   31.                RawKeyValueIterator kvIter =   32.                  new MRResultIterator(spstart, spindex);   33.                combinerRunner.combine(kvIter, combineCollector);   34.              }   35.            }</SPAN>   for (int i = 0; i < partitions; ++i) {           IFile.Writer<K, V> writer = null;           try {             long segmentStart = out.getPos();             writer = new Writer<K, V>(job, out, keyClass, valClass, codec,                                       spilledRecordsCounter);             if (combinerRunner == null) {               // spill directly               DataInputBuffer key = new DataInputBuffer();               while (spindex < mend &&                   kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {                 final int kvoff = offsetFor(spindex % maxRec);                 key.reset(kvbuffer, kvmeta.get(kvoff + KEYSTART),                           (kvmeta.get(kvoff + VALSTART) -                            kvmeta.get(kvoff + KEYSTART)));                 getVBytesForOffset(kvoff, value);                 writer.append(key, value);                 ++spindex;               }             } else {               int spstart = spindex;               while (spindex < mend &&                   kvmeta.get(offsetFor(spindex % maxRec)                             + PARTITION) == i) {                 ++spindex;               }               // Note: we would like to avoid the combiner if we've fewer               // than some threshold of records for a partition               if (spstart != spindex) {                 combineCollector.setWriter(writer);                 RawKeyValueIterator kvIter =                   new MRResultIterator(spstart, spindex);                 combinerRunner.combine(kvIter, combineCollector);               }             }如果job没有定义combiner则直接写文件,如果有combiner则在这里进行combine。 在生成spill文件后还会将此次spillRecord的记录写在一个index文件中。
[java] view plaincopyprint? 01.<SPAN style="FONT-SIZE: 18px">Path indexFilename =   02.              mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions   03.                  * MAP_OUTPUT_INDEX_RECORD_LENGTH);   04.          spillRec.writeToFile(indexFilename, job);</SPAN>   Path indexFilename =               mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions                   * MAP_OUTPUT_INDEX_RECORD_LENGTH);           spillRec.writeToFile(indexFilename, job);[java] view plaincopyprint? 01.<SPAN style="FONT-SIZE: 18px">rec.startOffset = segmentStart;   02.            rec.rawLength = writer.getRawLength();   03.            rec.partLength = writer.getCompressedLength();   04.            spillRec.putIndex(rec, i);</SPAN>   rec.startOffset = segmentStart;             rec.rawLength = writer.getRawLength();             rec.partLength = writer.getCompressedLength();             spillRec.putIndex(rec, i);  5.merge 当mapper执行完毕后,就进入merge阶段。首先看下相关的配置参数: [java] view plaincopyprint? 01.<SPAN style="FONT-SIZE: 18px">int mergeFactor = job.getInt(JobContext.IO_SORT_FACTOR, 100);</SPAN>   int mergeFactor = job.getInt(JobContext.IO_SORT_FACTOR, 100);mergeFactor:同时merge的文件数。   merge阶段的目的是将多个spill生成的中间文件合并为一个输出文件,这里的合并不同于combiner,无论有没有配置combiner这里的merge都会执行。merge阶段的输出是一个数据文件MapFinalOutputFile和一个index文件。看下相关代码: [java] view plaincopyprint? 01.<SPAN style="FONT-SIZE: 18px">RawKeyValueIterator kvIter = Merger.merge(job, rfs,   02.                         keyClass, valClass, codec,   03.                         segmentList, mergeFactor,   04.                         new Path(mapId.toString()),   05.                         job.getOutputKeyComparator(), reporter, sortSegments,   06.                         null, spilledRecordsCounter, sortPhase.phase());   07.   08.          //write merged output to disk    09.          long segmentStart = finalOut.getPos();   10.          Writer<K, V> writer =   11.              new Writer<K, V>(job, finalOut, keyClass, valClass, codec,   12.                               spilledRecordsCounter);   13.          if (combinerRunner == null || numSpills < minSpillsForCombine) {   14.            Merger.writeFile(kvIter, writer, reporter, job);   15.          } else {   16.            combineCollector.setWriter(writer);   17.            combinerRunner.combine(kvIter, combineCollector);   18.          }</SPAN>   RawKeyValueIterator kvIter = Merger.merge(job, rfs,                          keyClass, valClass, codec,                          segmentList, mergeFactor,                          new Path(mapId.toString()),                          job.getOutputKeyComparator(), reporter, sortSegments,                          null, spilledRecordsCounter, sortPhase.phase()); 
          //write merged output to disk           long segmentStart = finalOut.getPos();           Writer<K, V> writer =               new Writer<K, V>(job, finalOut, keyClass, valClass, codec,                                spilledRecordsCounter);           if (combinerRunner == null || numSpills < minSpillsForCombine) {             Merger.writeFile(kvIter, writer, reporter, job);           } else {             combineCollector.setWriter(writer);             combinerRunner.combine(kvIter, combineCollector);           }说下merge的算法。每个spill生成的文件中keyvalue都是有序的,但不同的文件却是乱序的,类似多个有序文件的多路归并算法。Merger分别取出需要merge的spillfile的最小的keyvalue,放入一个内存堆中,每次从堆中取出一个最小的值,并把此值保存到merge的输出文件中。这里和hbase中scan的算法非常相似,在分布式系统中多路归并排序真是当红小生啊! 这里merge时不同的partition的key是不会比较的,只有相同的partition的keyvalue才会进行排序和合并。最后的输出文件类似下图。   如果用户定义了combiner,在merge的过程中也会进行combine,因为虽然第四步中combine过但那只是部分输入的combine,在merge时仍然需要combine。这里有人问了,既然这里有combiner,为啥在spill输出时还要combine纳,我认为是因为每次combine都会大大减少输出文件的大小,spill时就combine能减少一定的IO操作。   在merge完后会把不同partition的信息保存进一个index文件以便之后reducer来拉自己部分的数据。 [java] view plaincopyprint? 01.<SPAN style="FONT-SIZE: 18px">// record offsets    02.          rec.startOffset = segmentStart;   03.          rec.rawLength = writer.getRawLength();   04.          rec.partLength = writer.getCompressedLength();   05.          spillRec.putIndex(rec, parts);</SPAN>  毕业论文  // record offsets           rec.startOffset = segmentStart;           rec.rawLength = writer.getRawLength();           rec.partLength = writer.getCompressedLength();           spillRec.putIndex(rec, parts); 最后,我们再对mapper过程中的要点总结一下: 1.对map输出<key,value>的分区(partition)是在写入内存buf前就做好的了,方法是对key的hash。我们可以通过继承Partitioner类自己实现分区,将自己想要的数据分到同一个reducer中。 2.写入内存buf速度是非常快的,但spill过程会block写入。因此,对内存buf相关参数的调优是mapreduce调优的重点之一。 3.对数据的排序是基于MapOutKey排序的,因此,我们可以重载对应的方法实现customize的排序顺序 4.combine在spill和merge中都是进行。多次的combine会减少mapreduce中的IO操作,如果使用得当会很好的提高性能。但需要注意的是要深刻理解combine的意义,比如平均值就不适合用combine。 上一页  [1] [2] [3]  
 |