您现在的位置: 爱51代码网 >> 范文 >> 文章正文
hadoop核心逻辑shuffle代码分析-map端
24.                            + PARTITION) == i) { 
25.                ++spindex; 
26.              } 
27.              // Note: we would like to avoid the combiner if we've fewer  
28.              // than some threshold of records for a partition  
29.              if (spstart != spindex) { 
30.                combineCollector.setWriter(writer); 
31.                RawKeyValueIterator kvIter = 
32.                  new MRResultIterator(spstart, spindex); 
33.                combinerRunner.combine(kvIter, combineCollector); 
34.              } 
35.            }</SPAN> 
for (int i = 0; i < partitions; ++i) {
          IFile.Writer<K, V> writer = null;
          try {
            long segmentStart = out.getPos();
            writer = new Writer<K, V>(job, out, keyClass, valClass, codec,
                                      spilledRecordsCounter);
            if (combinerRunner == null) {
              // spill directly
              DataInputBuffer key = new DataInputBuffer();
              while (spindex < mend &&
                  kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {
                final int kvoff = offsetFor(spindex % maxRec);
                key.reset(kvbuffer, kvmeta.get(kvoff + KEYSTART),
                          (kvmeta.get(kvoff + VALSTART) -
                           kvmeta.get(kvoff + KEYSTART)));
                getVBytesForOffset(kvoff, value);
                writer.append(key, value);
                ++spindex;
              }
            } else {
              int spstart = spindex;
              while (spindex < mend &&
                  kvmeta.get(offsetFor(spindex % maxRec)
                            + PARTITION) == i) {
                ++spindex;
              }
              // Note: we would like to avoid the combiner if we've fewer
              // than some threshold of records for a partition
              if (spstart != spindex) {
                combineCollector.setWriter(writer);
                RawKeyValueIterator kvIter =
                  new MRResultIterator(spstart, spindex);
                combinerRunner.combine(kvIter, combineCollector);
              }
            }如果job没有定义combiner则直接写文件,如果有combiner则在这里进行combine。
在生成spill文件后还会将此次spillRecord的记录写在一个index文件中。

[java] view plaincopyprint?
01.<SPAN style="FONT-SIZE: 18px">Path indexFilename = 
02.              mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions 
03.                  * MAP_OUTPUT_INDEX_RECORD_LENGTH); 
04.          spillRec.writeToFile(indexFilename, job);</SPAN> 
Path indexFilename =
              mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
                  * MAP_OUTPUT_INDEX_RECORD_LENGTH);
          spillRec.writeToFile(indexFilename, job);[java] view plaincopyprint?
01.<SPAN style="FONT-SIZE: 18px">rec.startOffset = segmentStart; 
02.            rec.rawLength = writer.getRawLength(); 
03.            rec.partLength = writer.getCompressedLength(); 
04.            spillRec.putIndex(rec, i);</SPAN> 
rec.startOffset = segmentStart;
            rec.rawLength = writer.getRawLength();
            rec.partLength = writer.getCompressedLength();
            spillRec.putIndex(rec, i);
5.merge
当mapper执行完毕后,就进入merge阶段。首先看下相关的配置参数:
[java] view plaincopyprint?
01.<SPAN style="FONT-SIZE: 18px">int mergeFactor = job.getInt(JobContext.IO_SORT_FACTOR, 100);</SPAN> 
int mergeFactor = job.getInt(JobContext.IO_SORT_FACTOR, 100);mergeFactor:同时merge的文件数。
 
merge阶段的目的是将多个spill生成的中间文件合并为一个输出文件,这里的合并不同于combiner,无论有没有配置combiner这里的merge都会执行。merge阶段的输出是一个数据文件MapFinalOutputFile和一个index文件。看下相关代码:
[java] view plaincopyprint?
01.<SPAN style="FONT-SIZE: 18px">RawKeyValueIterator kvIter = Merger.merge(job, rfs, 
02.                         keyClass, valClass, codec, 
03.                         segmentList, mergeFactor, 
04.                         new Path(mapId.toString()), 
05.                         job.getOutputKeyComparator(), reporter, sortSegments, 
06.                         null, spilledRecordsCounter, sortPhase.phase()); 
07. 
08.          //write merged output to disk  
09.          long segmentStart = finalOut.getPos(); 
10.          Writer<K, V> writer = 
11.              new Writer<K, V>(job, finalOut, keyClass, valClass, codec, 
12.                               spilledRecordsCounter); 
13.          if (combinerRunner == null || numSpills < minSpillsForCombine) { 
14.            Merger.writeFile(kvIter, writer, reporter, job); 
15.          } else { 
16.            combineCollector.setWriter(writer); 
17.            combinerRunner.combine(kvIter, combineCollector); 
18.          }</SPAN> 
RawKeyValueIterator kvIter = Merger.merge(job, rfs,
                         keyClass, valClass, codec,
                         segmentList, mergeFactor,
                         new Path(mapId.toString()),
                         job.getOutputKeyComparator(), reporter, sortSegments,
                         null, spilledRecordsCounter, sortPhase.phase());

          //write merged output to disk
          long segmentStart = finalOut.getPos();
          Writer<K, V> writer =
              new Writer<K, V>(job, finalOut, keyClass, valClass, codec,
                               spilledRecordsCounter);
          if (combinerRunner == null || numSpills < minSpillsForCombine) {
            Merger.writeFile(kvIter, writer, reporter, job);
          } else {
            combineCollector.setWriter(writer);
            combinerRunner.combine(kvIter, combineCollector);
          }说下merge的算法。每个spill生成的文件中keyvalue都是有序的,但不同的文件却是乱序的,类似多个有序文件的多路归并算法。Merger分别取出需要merge的spillfile的最小的keyvalue,放入一个内存堆中,每次从堆中取出一个最小的值,并把此值保存到merge的输出文件中。这里和hbase中scan的算法非常相似,在分布式系统中多路归并排序真是当红小生啊!
这里merge时不同的partition的key是不会比较的,只有相同的partition的keyvalue才会进行排序和合并。最后的输出文件类似下图。
 
如果用户定义了combiner,在merge的过程中也会进行combine,因为虽然第四步中combine过但那只是部分输入的combine,在merge时仍然需要combine。这里有人问了,既然这里有combiner,为啥在spill输出时还要combine纳,我认为是因为每次combine都会大大减少输出文件的大小,spill时就combine能减少一定的IO操作。
 
在merge完后会把不同partition的信息保存进一个index文件以便之后reducer来拉自己部分的数据。
[java] view plaincopyprint?
01.<SPAN style="FONT-SIZE: 18px">// record offsets  
02.          rec.startOffset = segmentStart; 
03.          rec.rawLength = writer.getRawLength(); 
04.          rec.partLength = writer.getCompressedLength(); 
05.          spillRec.putIndex(rec, parts);</SPAN>  毕业论文 
// record offsets
          rec.startOffset = segmentStart;
          rec.rawLength = writer.getRawLength();
          rec.partLength = writer.getCompressedLength();
          spillRec.putIndex(rec, parts);
最后,我们再对mapper过程中的要点总结一下:
1.对map输出<key,value>的分区(partition)是在写入内存buf前就做好的了,方法是对key的hash。我们可以通过继承Partitioner类自己实现分区,将自己想要的数据分到同一个reducer中。
2.写入内存buf速度是非常快的,但spill过程会block写入。因此,对内存buf相关参数的调优是mapreduce调优的重点之一。
3.对数据的排序是基于MapOutKey排序的,因此,我们可以重载对应的方法实现customize的排序顺序
4.combine在spill和merge中都是进行。多次的combine会减少mapreduce中的IO操作,如果使用得当会很好的提高性能。但需要注意的是要深刻理解combine的意义,比如平均值就不适合用combine。

上一页  [1] [2] [3] 

  • 上一篇文章:

  • 下一篇文章: 没有了
  • 最新文章 热点文章 相关文章
    sharepoint 2010 获取用户信息Us
    设计包含max函数的队列
    随机从数组中取出指定的不重复的
    mysql主从同步延迟方案解决的学习
    青岛科学六年级下册教材分析
    生日旅行总结
    中小板生日快乐随感
    送生日快乐桑葚乳酪小蛋糕
    写给女儿的生日快乐
    总分公司财务核算
    mysql主从同步延迟方案解决的学习
    生日旅行总结
    中小板生日快乐随感
    送生日快乐桑葚乳酪小蛋糕
    写给女儿的生日快乐
    总分公司财务核算
    恢复使用繁体字可行性研究报告
    保险受益人制度相关问题的探讨
    初中生地理读图能力培养的研究
    搞笑生日祝福
    The layout of PID & PORT i
    The layout of PID & PORT i
    The layout of PID & PORT i
    MapReduce错误任务失败处理 
    Oracle恢复内部原理(介质恢
    在 Oracle 中如何确定远程 s
    为什么RHEL 6上没有ASMLIB?
    sharepoint 2010 获取用户信
    设计包含max函数的队列
    随机从数组中取出指定的不重
     



    设为首页 | 加入收藏 | 网站地图 | 友情链接 |