您好,欢迎访问三七文档
当前位置:首页 > IT计算机/网络 > 数据挖掘与识别 > hadoop源码分析-mapreduce部分
2009-02-21Hadoop源代码分析(MapReduce概论)大家都熟悉文件系统,在对HDFS进行分析前,我们并没有花很多的时间去介绍HDFS的背景,毕竟大家对文件系统的还是有一定的理解的,而且也有很好的文档。在分析Hadoop的MapReduce部分前,我们还是先了解系统是如何工作的,然后再进入我们的分析部分。下面的图来自,是我看到的讲MapReduce最好的图。以Hadoop带的wordcount为例子(下面是启动行):hadoopjarhadoop-0.19.0-examples.jarwordcount/usr/input/usr/output用户提交一个任务以后,该任务由JobTracker协调,先执行Map阶段(图中M1,M2和M3),然后执行Reduce阶段(图中R1和R2)。Map阶段和Reduce阶段动作都受TaskTracker监控,并运行在独立于TaskTracker的Java虚拟机中。我们的输入和输出都是HDFS上的目录(如上图所示)。输入由InputFormat接口描述,它的实现如ASCII文件,JDBC数据库等,分别处理对于的数据源,并提供了数据的一些特征。通过InputFormat实现,可以获取InputSplit接口的实现,这个实现用于对数据进行划分(图中的splite1到splite5,就是划分以后的结果),同时从InputFormat也可以获取RecordReader接口的实现,并从输入中生成k,v对。有了k,v,就可以开始做map操作了。map操作通过context.collect(最终通过OutputCollector.collect)将结果写到context中。当Mapper的输出被收集后,它们会被Partitioner类以指定的方式区分地写出到输出文件里。我们可以为Mapper提供Combiner,在Mapper输出它的k,v时,键值对不会被马上写到输出里,他们会被收集在list里(一个key值一个list),当写入一定数量的键值对时,这部分缓冲会被Combiner中进行合并,然后再输出到Partitioner中(图中M1的黄颜色部分对应着Combiner和Partitioner)。Map的动作做完以后,进入Reduce阶段。这个阶段分3个步骤:混洗(Shuffle),排序(sort)和reduce。混洗阶段,Hadoop的MapReduce框架会根据Map结果中的key,将相关的结果传输到某一个Reducer上(多个Mapper产生的同一个key的中间结果分布在不同的机器上,这一步结束后,他们传输都到了处理这个key的Reducer的机器上)。这个步骤中的文件传输使用了HTTP协议。排序和混洗是一块进行的,这个阶段将来自不同Mapper具有相同key值的key,value对合并到一起。Reduce阶段,上面通过Shuffle和sort后得到的key,(listofvalues)会送到Reducer.reduce方法中处理,输出的结果通过OutputFormat,输出到DFS中。2009-02-25Hadoop源代码分析(包org.apache.hadoop.mapreduce)有了前一节的分析,我们来看一下具体的接口,它们都处于包org.apache.hadoop.mapreduce中。上面的图中,类可以分为4种。右上角的是从Writeable继承的,和Counter(还有CounterGroup和Counters,也在这个包中,并没有出现在上面的图里)和ID相关的类,它们保持MapReduce过程中需要的一些计数器和标识;中间大部分是和Context相关的*Context类,它为Mapper和Reducer提供了相关的上下文;关于Map和Reduce,对应的类是Mapper,Reducer和描述他们的Job(在Hadoop中一次计算任务称之为一个job,下面的分析中,中文为“作业”,相应的task我们称为“任务”);图中其他类是配合Mapper和Reduce工作的一些辅助类。如果你熟悉HTTPServlet,那就能很轻松地理解Hadoop采用的结构,把整个Hadoop看作是容器,那么Mapper和Reduce就是容器里的组件,*Context保存了组件的一些配置信息,同时也是和容器通信的机制。和ID相关的类我们就不再讨论了。我们先看JobContext,它位于*Context继承树的最上方,为Job提供一些只读的信息,如Job的ID,名称等。下面的信息是MapReduce过程中一些较关键的定制信息:(来自):参数作用缺省值其它实现InputFormat将输入的数据集切割成小数据集InputSplits,每一个InputSplit将由一个Mapper负责处理。此外InputFormat中还提供一个RecordReader的实现,将一个InputSplit解析成key,value对提供给map函数。TextInputFormat(针对文本文件,按行将文本文件切割成InputSplits,并用LineRecordReader将InputSplit解析成key,value对,key是行在文件中的位置,value是文件中的一行)SequenceFileInputFormatOutputFormat提供一个RecordWriter的实现,负责输出最终结果TextOutputFormat(用LineRecordWriter将最终结果写成纯文件文件,每个key,value对一行,key和value之间用tab分隔)SequenceFileOutputFormatOutputKeyClass输出的最终结果中key的类型LongWritableOutputValueClass输出的最终结果中value的类型TextMapperClassMapper类,实现map函数,完成输入的key,value到中间结果的映射IdentityMapper(将输入的key,value原封不动的输出为中间结果)LongSumReducer,LogRegexMapper,InverseMapperCombinerClass实现combine函数,将中间结果中的重复key做合并null(不对中间结果中的重复key做合并)ReducerClassReducer类,实现reduce函数,对中间结果做合并,形成最终结果IdentityReducer(将中间结果直接输出为最终结果)AccumulatingReducer,LongSumReducerInputPath设定job的输入目录,job运行时会处理输入目录下的所有文件nullOutputPath设定job的输出目录,job的最终结果会写入输出目录下nullMapOutputKeyClass设定map函数输出的中间结果中key的类型如果用户没有设定的话,使用OutputKeyClassMapOutputValueClass设定map函数输出的中间结果中value的类型如果用户没有设定的话,使用OutputValuesClassOutputKeyComparator对结果中的key进行排序时的使用的比较器WritableComparablePartitionerClass对中间结果的key排序后,用此Partition函数将其划分为R份,每份由一个Reducer负责处理。HashPartitioner(使用Hash函数做partition)KeyFieldBasedPartitionerPipesPartitionerJob继承自JobContext,提供了一系列的set方法,用于设置Job的一些属性(Job更新属性,JobContext读属性),同时,Job还提供了一些对Job进行控制的方法,如下:mapProgress:map的进度(0—1.0);reduceProgress:reduce的进度(0—1.0);isComplete:作业是否已经完成;isSuccessful:作业是否成功;killJob:结束一个在运行中的作业;getTaskCompletionEvents:得到任务完成的应答(成功/失败);killTask:结束某一个任务;2009-02-25Hadoop源代码分析(包mapreduce.lib.input)接下来我们按照MapReduce过程中数据流动的顺序,来分解org.apache.hadoop.mapreduce.lib.*的相关内容,并介绍对应的基类的功能。首先是input部分,它实现了MapReduce的数据输入部分。类图如下:类图的右上角是InputFormat,它描述了一个MapReduceJob的输入,通过InputFormat,Hadoop可以:检查MapReduce输入数据的正确性;将输入数据切分为逻辑块InputSplit,这些块会分配给Mapper;提供一个RecordReader实现,Mapper用该实现从InputSplit中读取输入的K,V对。在org.apache.hadoop.mapreduce.lib.input中,Hadoop为所有基于文件的InputFormat提供了一个虚基类FileInputFormat。下面几个参数可以用于配置FileInputFormat:mapred.input.pathFilter.class:输入文件过滤器,通过过滤器的文件才会加入InputFormat;mapred.min.split.size:最小的划分大小;mapred.max.split.size:最大的划分大小;mapred.input.dir:输入路径,用逗号做分割。类中比较重要的方法有:protectedListFileStatuslistStatus(Configurationjob)递归获取输入数据目录中的所有文件(包括文件信息),输入的job是系统运行的配置Configuration,包含了上面我们提到的参数。publicListInputSplitgetSplits(JobContextcontext)将输入划分为InputSplit,包含两个循环,第一个循环处理所有的文件,对于每一个文件,根据输入的划分最大/最小值,循环得到文件上的划分。注意,划分不会跨越文件。FileInputFormat没有实现InputFormat的createRecordReader方法。FileInputFormat有两个子类,SequenceFileInputFormat是Hadoop定义的一种二进制形式存放的键/值文件(参考),它有自己定义的文件布局。由于它有特殊的扩展名,所以SequenceFileInputFormat重载了listStatus,同时,它实现了createRecordReader,返回一个SequenceFileRecordReader对象。TextInputFormat处理的是文本文件,createRecordReader返回的是LineRecordReader的实例。这两个类都没有重载FileInputFormat的getSplits方法,那么,在他们对于的RecordReader中,必须考虑FileInputFormat对输入的划分方式。FileInputFormat的getSplits,返回的是FileSplit。这是一个很简单的类,包含的属性(文件名
本文标题:hadoop源码分析-mapreduce部分
链接地址:https://www.777doc.com/doc-5161973 .html