您现在的位置: 爱51代码网 >> 范文 >> 文章正文
hadoop核心逻辑shuffle代码分析-map端

hadoop核心逻辑shuffle代码分析-map端
阿里的大牛在上面的文章中比较详细的介绍了shuffle过程中mapper和reduce的每个过程,强烈推荐先读一下。

 

不过,上文没有写明一些实现的细节,比如:spill的过程,mapper生成文件的partition是怎么做的等等,相信有很多人跟我一样在看了上面的文章后还是有很多疑问,我也是带着疑问花了很久的看了cdh4.1.0版本shuffle的逻辑,整理成本文,为以后回顾所用。

 

 首先用一张图展示下map的流程:毕业论文 

 
 
在上图中,我们假设此次mapreduce有多个mapper和2个reducer,p0 p1分别代表该数据应该分配到哪个reducer端。我将mapper的过程大致分为5个过程。
 
1.prepare Input。
Mapreduce程序都需要指定输入文件,输入的格式有很多种,最常见的是保存在hdfs上的文本文件。在用户提交job到jobtrack(ResourceManager)前的job就会根据用户的输入文件计算出需要多少mapper,多少reducer,mapper的输入InputSplit有多大,block块名称等。mapper在prepare input阶段只需要根据inputFormat类型创建对应的RecordReader打开对应的inputSplit分片即可。如果job配置了combiner还需初始化combiner。代码见MapTask类run方法
 
2.mapper process
这里的mapper指用户使用或自己继承的mapper类,这也是所有初学mapreduce的同学首先看到的类。
[java] view plaincopyprint?
01.<SPAN style="FONT-SIZE: 18px">  /**
02.   * Called once for each key/value pair in the input split. Most applications
03.   * should override this, but the default is the identity function.
04.   */ 
05.  @SuppressWarnings("unchecked") 
06.  protected void map(KEYIN key, VALUEIN value,  
07.                     Context context) throws IOException, InterruptedException { 
08.    context.write((KEYOUT) key, (VALUEOUT) value); 
09.  } 
10.</SPAN> 
  /**
   * Called once for each key/value pair in the input split. Most applications
   * should override this, but the default is the identity function.
   */
  @SuppressWarnings("unchecked")
  protected void map(KEYIN key, VALUEIN value,
                     Context context) throws IOException, InterruptedException {
    context.write((KEYOUT) key, (VALUEOUT) value);
  }
可以看到mapper默认的map方法就是取出key,value并放到context对象中。context对象包装了一个内存中的buf,下面会介绍。
[java] view plaincopyprint?
01.<SPAN style="FONT-SIZE: 18px">public void run(Context context) throws IOException, InterruptedException { 
02.    setup(context); 
03.    while (context.nextKeyValue()) { 
04.      map(context.getCurrentKey(), context.getCurrentValue(), context); 
05.    } 
06.    cleanup(context); 
07.  }</SPAN> 
public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    while (context.nextKeyValue()) {
      map(context.getCurrentKey(), context.getCurrentValue(), context);
    }
    cleanup(context);
  }run方法就是mapper实际运行的过程:不停的从context的inputSplit对象中取出keyvalue对,通过map方法处理再保存到context包装的内存buf中。
 
3.buffer in memery
key value在写入context中后实际是写入MapOutputBuffer类中。在第一个阶段的初始化过程中,MapOutputBuffer类会根据配置文件初始化内存buffer,我们来看下都有哪些参数:
[java] view plaincopyprint?
01.<SPAN style="FONT-SIZE: 18px">partitions = job.getNumReduceTasks(); 
02.      rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw(); 
03. 
04.      //sanity checks  
05.      final float spillper = 
06.        job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8); 
07.      final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100); 
08.      indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT, 
09.                                         INDEX_CACHE_MEMORY_LIMIT_DEFAULT); 

[1] [2] [3] 下一页

  • 上一篇文章:

  • 下一篇文章: 没有了
  • 最新文章 热点文章 相关文章
    sharepoint 2010 获取用户信息Us
    设计包含max函数的队列
    随机从数组中取出指定的不重复的
    mysql主从同步延迟方案解决的学习
    青岛科学六年级下册教材分析
    生日旅行总结
    中小板生日快乐随感
    送生日快乐桑葚乳酪小蛋糕
    写给女儿的生日快乐
    总分公司财务核算
    mysql主从同步延迟方案解决的学习
    生日旅行总结
    中小板生日快乐随感
    送生日快乐桑葚乳酪小蛋糕
    写给女儿的生日快乐
    总分公司财务核算
    恢复使用繁体字可行性研究报告
    保险受益人制度相关问题的探讨
    初中生地理读图能力培养的研究
    搞笑生日祝福
    The layout of PID & PORT i
    The layout of PID & PORT i
    The layout of PID & PORT i
    MapReduce错误任务失败处理 
    Oracle恢复内部原理(介质恢
    在 Oracle 中如何确定远程 s
    为什么RHEL 6上没有ASMLIB?
    sharepoint 2010 获取用户信
    设计包含max函数的队列
    随机从数组中取出指定的不重
     



    设为首页 | 加入收藏 | 网站地图 | 友情链接 |