hadoop核心逻辑shuffle代码分析-map端 阿里的大牛在上面的文章中比较详细的介绍了shuffle过程中mapper和reduce的每个过程,强烈推荐先读一下。
不过,上文没有写明一些实现的细节,比如:spill的过程,mapper生成文件的partition是怎么做的等等,相信有很多人跟我一样在看了上面的文章后还是有很多疑问,我也是带着疑问花了很久的看了cdh4.1.0版本shuffle的逻辑,整理成本文,为以后回顾所用。
首先用一张图展示下map的流程:毕业论文
在上图中,我们假设此次mapreduce有多个mapper和2个reducer,p0 p1分别代表该数据应该分配到哪个reducer端。我将mapper的过程大致分为5个过程。 1.prepare Input。 Mapreduce程序都需要指定输入文件,输入的格式有很多种,最常见的是保存在hdfs上的文本文件。在用户提交job到jobtrack(ResourceManager)前的job就会根据用户的输入文件计算出需要多少mapper,多少reducer,mapper的输入InputSplit有多大,block块名称等。mapper在prepare input阶段只需要根据inputFormat类型创建对应的RecordReader打开对应的inputSplit分片即可。如果job配置了combiner还需初始化combiner。代码见MapTask类run方法 2.mapper process 这里的mapper指用户使用或自己继承的mapper类,这也是所有初学mapreduce的同学首先看到的类。 [java] view plaincopyprint? 01.<SPAN style="FONT-SIZE: 18px"> /** 02. * Called once for each key/value pair in the input split. Most applications 03. * should override this, but the default is the identity function. 04. */ 05. @SuppressWarnings("unchecked") 06. protected void map(KEYIN key, VALUEIN value, 07. Context context) throws IOException, InterruptedException { 08. context.write((KEYOUT) key, (VALUEOUT) value); 09. } 10.</SPAN> /** * Called once for each key/value pair in the input split. Most applications * should override this, but the default is the identity function. */ @SuppressWarnings("unchecked") protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { context.write((KEYOUT) key, (VALUEOUT) value); } 可以看到mapper默认的map方法就是取出key,value并放到context对象中。context对象包装了一个内存中的buf,下面会介绍。 [java] view plaincopyprint? 01.<SPAN style="FONT-SIZE: 18px">public void run(Context context) throws IOException, InterruptedException { 02. setup(context); 03. while (context.nextKeyValue()) { 04. map(context.getCurrentKey(), context.getCurrentValue(), context); 05. } 06. cleanup(context); 07. }</SPAN> public void run(Context context) throws IOException, InterruptedException { setup(context); while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } cleanup(context); }run方法就是mapper实际运行的过程:不停的从context的inputSplit对象中取出keyvalue对,通过map方法处理再保存到context包装的内存buf中。 3.buffer in memery key value在写入context中后实际是写入MapOutputBuffer类中。在第一个阶段的初始化过程中,MapOutputBuffer类会根据配置文件初始化内存buffer,我们来看下都有哪些参数: [java] view plaincopyprint? 01.<SPAN style="FONT-SIZE: 18px">partitions = job.getNumReduceTasks(); 02. rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw(); 03. 04. //sanity checks 05. final float spillper = 06. job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8); 07. final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100); 08. indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT, 09. INDEX_CACHE_MEMORY_LIMIT_DEFAULT); [1] [2] [3] 下一页
|