您好,欢迎访问三七文档
当前位置:首页 > IT计算机/网络 > 数据挖掘与识别 > #LAMP人#分布式计算与Hadoop - 刘鹏
分布式计算与HadoopMediaV刘鹏Email:liupeng@mediav.com微博:weibo.com/bmchsLAMP人主题分享交流会群:83304912•创建于2009年•领先的互联网精准营销技术服务机构•分支:上海(总部),北京、深圳、广州、南京、厦门•员工人数:超过300人•营业额:2009年2000万,2010年2.4亿,2011年预计5亿•客户数:超过100位B2C企业找到客户,成就业绩MediaV(聚胜万合)简介董事长兼首席执行官杨炯纬复旦大学计算机MBA原好耶广告网络总裁首席技术官胡宁博士CarnegieMellonUniversity博士原GOOGLE技术总监北京研发中心总经理魏小勇复旦大学计算机硕士原微软大中华区软件安全事务部总监首席科学家刘鹏博士清华大学电子工程博士原雅虎Y!Labs高级科学家MediaVersHadoop概况•Apache开源项目–源于Lucene项目的一部分,2006.1成为子项目,现为Apache顶级项目之一–Yahoo!是最主要的源代码贡献者,其他贡献者:Powerset,Facebook等–已知为接近150家的大型组织实际使用:Yahoo!,Amazon,EBay,AOL,Google,IBM,Facebook,Twitter,Baidu,Alibaba,Tencent,…()•Hadoop核心功能–高可靠性,高效率的分布式文件系统–一个海量数据处理的编程框架•Hadoop目标–可扩展性:Petabytes(1015Bytes)级别的数据量,数千个节点–经济性:利用商品级(commodity)硬件完成海量数据存储和计–可靠性:在大规模集群上提供应用级别的可靠性分布式计算常用工具AvroHbaseS4ChuhwaPigBigTableGFSChubbyZooKeeperHiveOozieElephant-birdStormScribe7HDFS架构Map/Reduce•什么是Map/Reduce–一种高效,海量的分布式计算编程模型–海量:相比于MPI,Map处理之间的独立性使得整个系统的可靠性大为提高.–高效:用调度计算代替调度数据!–分布式操作和容错机制由系统实现,应用级编程非常简单.•计算流程非常类似于简单的Unixpipe:–Pipe:catinput|grep|sort|uniq-coutput–M/R:Input|map|shuffle&sort|reduce|output•多样的编程接口:–Javanativemap/reduce–可以操作M/R各细节–Streaming–利用标准输入输出模拟以上pipeline–Pig–只关注数据逻辑,无须考虑M/R实现Map/reduce计算流程InputdataOutputdataMapMapMapReduceReduceInputMapShuffle&SortReduceOutputCombinePartitionpublicvoidrun(Contextcontext)throwsIOException,InterruptedException{setup(context);while(context.nextKeyValue()){map(context.getCurrentKey(),context.getCurrentValue(),context);}cleanup(context);}Mapper.run()Reducer.run()publicvoidrun(Contextcontext)throwsIOException,InterruptedException{setup(context);while(context.nextKey()){reduce(context.getCurrentKey(),context.getValues();context);}cleanup(context);}用Java进行Map/Reduce编程MapMapMap11111111111111Reduce664CombinerCombinerCombiner•实现:Combiner是一个Reducer的子类•调用:mapreduce.Job类的setCombinerClass()方法:job.setCombinerClass(CombinerName.class)CombinerMapMapMapReduceReducePart-0Part-1Partitioner•实现:继承hadoop.mapreduce.PartitionerKEY,VALUE,实现抽象方法intgetPartition(KEYkey,VALUEvalue,intnumPartitions)•调用:mapreduce.Job类的setPartitionerClass()方法:job.setPartitionerClass(PartitionerName.class)HadoopStreaming•模拟Pipe方式执行Map/ReduceJob,并利用标准输入/输出调度数据:–Input|map|shuffle&sort|reduceoutput•开发者可以使用任何编程语言实现map和reduce过程,只需要从标准输入读入数据,并将处理结果打印到标准输出.•只支持文本格式数据,数据缺省配置为每行为一个Record,Key和value之间用\t分隔.•例,生成大量文本上的字典:–map:awk„{for(i=1;i=NF;i++){print$i}}‟–reduce:uniq14常用统计模型•指数族分布:–举例:Gaussian,multinomial,maximumentropy–Maximumlikelihood(ML)estimationislinkedtodatathroughsufficientstatistics.(e.g.forGaussian)•指数族混合分布:–Examples:MixtureofGaussians,hiddenMarkovmodels,probabilisticlatentsemanticanalysis(pLSA)–MLestimationcanbeiterativelyapproached.Ineachiteration,weupdatethemodelwiththecurrentstatistics.•指数族分布的贝叶斯学习:–Examples:2,iixx15mappermodelreducer(sufficient)statisticsdata•Keypoints:a.Onlygeneratecompactstatistics(withsizeproportionaltothemodelsize)inamapper,toreducetheshuffle/sortcost.b.Summarizesuchagenericflowchartbyatemplatelibrarywithc++.Map/Reduce统计学习流程16Map/reduce基本统计模型训练templateclassTModelclassCTrainReducer:publicIDataAnalyzer{protected:TModel*pModel;public:///Comsumeadatarecord\authorPengLiuvirtualboolconsume(constCRecord&record){returnpModel-consume(record);}///Trytoupdatemodelafterallinputdatafinish\authorPengLiuvirtualvoidfinish(){pModel-update();}///Producedmodel\authorPengLiuvirtualboolproduce(CRecord&record){pModel-produce(record);if(record.getField(STAT)!=NULL){record.rmvField(STAT);returntrue;}returnfalse;}};templateclassTModelclassCTrainMapper:publicCFeature,publicIDataNnalyzer{protected:TModel*pModel;public:///Comsumeadatarecord\authorPengLiuvirtualboolconsume(constCRecord&record){CFeature::consume(record);pModel-accumulate(*this,1.0f);returntrue;}///Producedstatistics(ormodifieddataincaseneeded)\authorPengLiuvirtualboolproduce(CRecord&record){staticboolfirst=true;pModel-produce(record);if(record.getField(STAT)!=NULL){record.rmvField(PARAM)returntrue;}returnfalse;};}MapperReducer17示例:Gaussian模型训练•Sufficientstatistics:•Statisticsaccumulation:voidCGaussDiag::accumulate(CFeature&x,floatocc){size_tdim=getFeaDim();assert(x.size()==dim);accumOcc(occ);for(size_td=0;ddim;d++){stats[d]+=occ*x[d];stats[dim+d]+=occ*x[d]*x[d];}}2,iixx•Modelparameter:•Modelupdate:voidCGaussDiag::update(){size_tdim=getFeaDim();for(size_td=0;ddim;d++){floatX=stats[d],floatX2=stats[dim+d];params[d]=X/occ();params[dim+d]=occ()/(X2-X*X/occ());}}2,σμPig–类SQLHadoop数据处理•Example:Users=load„users‟as(name,age);Fltrd=filterUsersbyage=18andage=25;Pages=load„pages‟as(user,url);Jnd=joinFltrdbyname,Pagesbyuser;Grpd=groupJndbyurl;Smmd=foreachGrpdgenerategroup,COUNT(Jnd)asclicks;Srtd=orderSmmdbyclicksdesc;Top5=limitSrtd5;storeTop5into„top5sites‟;•Pig解释器进行整体规划以减少总的map/reduce次数•可用UDF自定义数据格式,在只需要访问大量数据的部分字段时,可以采用列存储的Zebra(Pig子项目)NativeJavacodePig常用语句一览CategoryOperatorDescriptionLoadandstoringLOADSTOREDUMPLoadsdatafromthefilesystemintoarelationSavesarelationtothefilesystemPrintsarelationtotheconsoleFilteringFILTERDISTINCTFOR
本文标题:#LAMP人#分布式计算与Hadoop - 刘鹏
链接地址:https://www.777doc.com/doc-5090960 .html