您好,欢迎访问三七文档
当前位置:首页 > 商业/管理/HR > 信息化管理 > RyanWealdSpark流媒体产品化
@rwealdProductionalizingSparkStreamingSparkSummit2013RyanWeald@rweald@rwealdWhatWe’reGoingtoCover•WhatwedoandWhywechooseSpark•Faulttoleranceforlonglivedstreamingjobs•Commonpatternsandfunctionalabstractions•Testingbeforewe“doitlive”@rwealdSpecialfocusoncommonpatternsandtheirsolutions@rwealdWhatisSharethrough?AdvertisingfortheModernInternetFunctionForm@rwealdWhatisSharethrough?@rwealdWhySparkStreaming?@rwealdWhySparkStreaming•Likedtheoreticalfoundationofmini-batch•Scalacodebase+functionalAPI•Youngprojectwithopportunitiestocontribute•BatchmodelforiterativeMLalgorithms@rwealdGreat...Nowproductionalizeit@rwealdFaultTolerance@rwealdKeystoFaultTolerance1.Receiverfaulttolerance2.Monitoringjobprogress@rwealdReceiverFaultTolerance•UseActorswithsupervisors•Useselfhealingconnectionpools@rwealdUseActorsclassRabbitMQStreamReceiver(uri:String,exchangeName:String,routingKey:String)extendsActorwithReceiverwithLogging{ !implicitvalsystem=ActorSystem() overridedefpreStart()={ //Yourcodetosetupconnectionsandactors //Includeinnerclasstoprocessmessages } !defreceive:Receive={ case_=logInfo(unknownmessage) } }@rwealdTrackAllOutputs•Lowwatermarks-GoogleMillWheel•Databaseupdated_at•Expectedoutputfilesizealerting@rwealdCommonPatterns&FunctionalProgramming@rwealdMap-Aggregate-StoreCommonJobPattern@rwealdMappingDatainputData.map{rawRequest= valparams=QueryParams.parse(rawRequest) (params.getOrElse(beaconType,unknown),1L) }@rwealdAggregation@rwealdBasicAggregation//beaconsisDStream[String,Long] //exampleSeq((click,1L),(click,1L)) valsum:(Long,Long)=Long=_+_ beacons.reduceByKey(sum)@rwealdWhatHappenswhenwewanttosummultiplethings?@rwealdLongBasicAggregationvalinputData=Seq( (user_1,(1L,1L,1L)), (user_1,(2L,2L,2L)) ) defsum(l:(Long,Long,Long), r:(Long,Long,Long))={ (l._1+r._1,l._2+r._2,l._3+r._3) } inputData.reduceByKey(sum)@rwealdNowSum4Intsinstead!(ಥಥ┻━┻@rwealdMonoidstotheRescue@rwealdWTFisaMonoid?traitMonoid[T]{ defzero:T defplus(r:T,l:T):T }*Justneedtomakesureplusisassociative.(1+5)+2==(2+1)+5@rwealdMonoidBasedAggregationobjectLongMonoidextendsMonoid[(Long,Long,Long)]{ defzero=(0,0,0) defplus(r:(Long,Long,Long), l:(Long,Long,Long))={ (l._1+r._1,l._2+r._2,l._3+r._3) } } !inputData.reduceByKey(LongMonid.plus(_,_))@rwealdTwitterAlgebird!@rwealdAlgebirdBasedAggregationimportcom.twitter.algebird._ valaggregator=implicitly[Monoid[(Long,Long,Long)]] !inputData.reduceByKey(aggregator.plus(_,_))@rwealdHowmanyuniqueusersperpublisher?@rwealdToobigformemorybasednaiveMap@rwealdHyperLogLogFTW@rwealdHLLAggregationimportcom.twitter.algebird._ valaggregator=newHyperLogLogMonoid(12) inputData.reduceByKey(aggregator.plus(_,_))@rwealdMonoids==ReusableAggregation@rwealdCommonJobPatternMap-Aggregate-Store@rwealdStore@rwealdHowdowestoretheresults?@rwealdStorageAPIRequirements•Incrementalupdates(preferablyassociative)•Pluggabletosupport“bigdata”stores•Allowfortestingjobs@rwealdStorageAPItraitMergeableStore[K,V]{ defget(key:K):V defput(kv:(K,V)):V /* *Shouldfollowsameassociativeproperty *asourMonoidfromearlier */ defmerge(kv:(K,V)):V }@rwealdTwitterStorehaus!@rwealdStoringSparkResultsdefsaveResults(result:DStream[String,Long], store:RedisStore[String,Long])={ result.foreach{rdd= rdd.foreach{element= val(keys,value)=element store.merge(keys,impressions) } } }@rwealdEveryonecanbenefit@rwealdPotentialAPIadditions?classPairDStreamFunctions[K,V]{ defaggregateByKey(aggregator:Monoid[V]) defstore(store:MergeableStore[K,V]) }@rwealdTwitterSummingbird!*@rwealdTestingYourJobs@rwealdTestingbestPractices•Tryandavoidfullintegrationtests•Usein-memorystoresfortesting•KeeplogicoutsideofSpark•UseSummingbirdinmemoryplatform???@rwealdRyanWeald@rwealdThankYou
本文标题:RyanWealdSpark流媒体产品化
链接地址:https://www.777doc.com/doc-468028 .html