您现在的位置: 爱51代码网 >> 范文 >> 文章正文
hadoop核心逻辑shuffle代码分析-map端

10.      if (spillper > (float)1.0 || spillper <= (float)0.0) { 
11.        throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT + 
12.            "\": " + spillper); 
13.      } 
14.      if ((sortmb & 0x7FF) != sortmb) { 
15.        throw new IOException( 
16.            "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb); 
17.      } 
18.      sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class", 
19.            QuickSort.class, IndexedSorter.class), job);</SPAN> 
partitions = job.getNumReduceTasks();
      rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();

      //sanity checks
      final float spillper =
        job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
      final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
      indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
                                         INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
      if (spillper > (float)1.0 || spillper <= (float)0.0) {
        throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +
            "\": " + spillper);
      }
      if ((sortmb & 0x7FF) != sortmb) {
        throw new IOException(
            "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);
      }
      sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
            QuickSort.class, IndexedSorter.class), job);partition:mapper的数据需要分配到reduce端的个数,由用户的job指定,默认为1.
spillper:内存buf使用到此比例就会触发spill,将内存中的数据flush成一个文件。默认为0.8
sortmb:内存buf的大小,默认100MB
indexCacheMemoryLimit:内存index的大小。默认为1024*1024
sorter:对mapper输出的key的排序,默认是快排
 
内存buffer比较复杂,贴一张图介绍一下这块内存buf的结构:

当一对keyvalue写入时首先会从wrap buf的右侧开始往左写,同时,会把一条keyvalue的meta信息(partition,keystart,valuestart)写入到最左边的index区域。当wrap buf大小达到spill的触发比例后会block写入,挖出一部分数据开始spill,直到spill完成后才能继续写,不过写入位置不会置零,而是类似循环buf那样,在spill掉数据后可以重复利用内存中的buf区域。
 
这里单独讲一下partition:
[java] view plaincopyprint?
01.<SPAN style="FONT-SIZE: 18px">@Override 
02.    public void write(K key, V value) throws IOException, InterruptedException { 
03.      collector.collect(key, value, 
04.                        partitioner.getPartition(key, value, partitions)); 
05.    }</SPAN> 
@Override
    public void write(K key, V value) throws IOException, InterruptedException {
      collector.collect(key, value,
                        partitioner.getPartition(key, value, partitions));
    }在keyvalue对写入MapOutputBuffer时会调用partitioner.getPartition方法计算partition即应该分配到哪个reducer,这里的partition只是在内存的buf的index区写入一条记录而已,和下一个部分的partition不一样哦。看下默认的partitioner:HashPartition

[java] view plaincopyprint?
01.<SPAN style="FONT-SIZE: 18px">/** Use {@link Object#hashCode()} to partition. */ 
02.  public int getPartition(K key, V value, 
03.                          int numReduceTasks) { 
04.    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; 
05.  }</SPAN> 
/** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K key, V value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }HashPartition只是把key hash后按reduceTask的个数取模,因此一般来说,不同的key分配到哪个reducer是随即的!所以,reducer内的所有数据是有序的,但reducer之间的数据却是乱序的!要想数据整体排序,要不只设一个reducer,要不使用TotalOrderPartitioner!

 
4.Partition Sort Store
在第四步中,partition是和sort一起做的,负责Spill的线程在拿到一段内存buf后会调用QuickSort的sort方法进行内存中的快排。
[java] view plaincopyprint?
01.<SPAN style="FONT-SIZE: 18px">sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);</SPAN> 
sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);排序的算法是先按keyvalue记录的partition排序后按key的compare方法:
[java] view plaincopyprint?
01.<SPAN style="FONT-SIZE: 18px">public int compare(final int mi, final int mj) { 
02.      final int kvi = offsetFor(mi % maxRec); 
03.      final int kvj = offsetFor(mj % maxRec); 
04.      final int kvip = kvmeta.get(kvi + PARTITION); 
05.      final int kvjp = kvmeta.get(kvj + PARTITION); 
06.      // sort by partition  
07.      if (kvip != kvjp) { 
08.        return kvip - kvjp; 
09.      } 
10.      // sort by key  
11.      return comparator.compare(kvbuffer, 
12.          kvmeta.get(kvi + KEYSTART), 
13.          kvmeta.get(kvi + VALSTART) - kvmeta.get(kvi + KEYSTART), 
14.          kvbuffer, 
15.          kvmeta.get(kvj + KEYSTART), 
16.          kvmeta.get(kvj + VALSTART) - kvmeta.get(kvj + KEYSTART)); 
17.    }</SPAN> 
public int compare(final int mi, final int mj) {
      final int kvi = offsetFor(mi % maxRec);
      final int kvj = offsetFor(mj % maxRec);
      final int kvip = kvmeta.get(kvi + PARTITION);
      final int kvjp = kvmeta.get(kvj + PARTITION);
      // sort by partition
      if (kvip != kvjp) {
        return kvip - kvjp;
      }
      // sort by key
      return comparator.compare(kvbuffer,
          kvmeta.get(kvi + KEYSTART),
          kvmeta.get(kvi + VALSTART) - kvmeta.get(kvi + KEYSTART),
          kvbuffer,
          kvmeta.get(kvj + KEYSTART),
          kvmeta.get(kvj + VALSTART) - kvmeta.get(kvj + KEYSTART));
    }因此,mapper输出的keyvalue首先是按partition聚合。而我们如果指定key的compare方法会在这里生效并进行排序。最后,一次spill的输出文件类似下图。
 
在对内存中的buf排序后开始写文件。
[java] view plaincopyprint?
01.<SPAN style="FONT-SIZE: 18px">for (int i = 0; i < partitions; ++i) { 
02.          IFile.Writer<K, V> writer = null; 
03.          try { 
04.            long segmentStart = out.getPos(); 
05.            writer = new Writer<K, V>(job, out, keyClass, valClass, codec, 
06.                                      spilledRecordsCounter); 
07.            if (combinerRunner == null) { 
08.              // spill directly  
09.              DataInputBuffer key = new DataInputBuffer(); 
10.              while (spindex < mend && 
11.                  kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) { 
12.                final int kvoff = offsetFor(spindex % maxRec); 
13.                key.reset(kvbuffer, kvmeta.get(kvoff + KEYSTART), 
14.                          (kvmeta.get(kvoff + VALSTART) - 
15.                           kvmeta.get(kvoff + KEYSTART))); 
16.                getVBytesForOffset(kvoff, value); 
17.                writer.append(key, value); 
18.                ++spindex; 
19.              } 
20.            } else { 
21.              int spstart = spindex; 
22.              while (spindex < mend && 
23.                  kvmeta.get(offsetFor(spindex % maxRec) 

上一页  [1] [2] [3] 下一页

  • 上一篇文章:

  • 下一篇文章: 没有了
  • 最新文章 热点文章 相关文章
    sharepoint 2010 获取用户信息Us
    设计包含max函数的队列
    随机从数组中取出指定的不重复的
    mysql主从同步延迟方案解决的学习
    青岛科学六年级下册教材分析
    生日旅行总结
    中小板生日快乐随感
    送生日快乐桑葚乳酪小蛋糕
    写给女儿的生日快乐
    总分公司财务核算
    mysql主从同步延迟方案解决的学习
    生日旅行总结
    中小板生日快乐随感
    送生日快乐桑葚乳酪小蛋糕
    写给女儿的生日快乐
    总分公司财务核算
    恢复使用繁体字可行性研究报告
    保险受益人制度相关问题的探讨
    初中生地理读图能力培养的研究
    搞笑生日祝福
    The layout of PID & PORT i
    The layout of PID & PORT i
    The layout of PID & PORT i
    MapReduce错误任务失败处理 
    Oracle恢复内部原理(介质恢
    在 Oracle 中如何确定远程 s
    为什么RHEL 6上没有ASMLIB?
    sharepoint 2010 获取用户信
    设计包含max函数的队列
    随机从数组中取出指定的不重
     



    设为首页 | 加入收藏 | 网站地图 | 友情链接 |