您好,欢迎访问三七文档
当前位置:首页 > IT计算机/网络 > 数据挖掘与识别 > MongoDB+Spark:完整大数据解决方案
完整的大数据解決方案 ABOUTME 唐建法/TJ MongoDB高级方案架构师 MongoDB中文社区联合发起人 AGENDA Spark介绍 Spark和MongoDB 案例 演示 Hive Pig Spark SQL Dataframe Spark Streaming Spark Hadoop MapReduce Mesos Spark Stand Alone YARN Mesos Spark Stand Alone HDFS HDFS HDFSMongoDB共同点横向扩展,支持TB-PB级数据量 低成本,x86 数据自动多份复制 支持非结构化数据差异点粗颗粒度存储 细颗粒度,结构化存储无索引二级索引一次写入,多次读读写混合非交互式,离线 分钟级SLA交互式,实时在线 毫秒级SLA{ ts: 2016-07-31 23:50:50, host: xyz, error:404, .body: { } …} { ts: 2016-07-31 23:49:23, host: def, error:019, .body: { } …} { ts: 2016-07-31 23:49:22, host: xyz, error:null, body: { } …} . . . { ts: 2016-07-01 02:04:12, host: abc, error: 500, body: { } …} . . . HDFSMongoDBParellelize Parellelize Parellelize Parellelize Transform Transform Transform Transform Action Action Action Action map filter union intersect Result Result Result ResultSTORAGESTORAGE NYC Stay Duration Date LAX BOS Stay Duration Date NYC LAX Stay Duration Date BOS LAX Stay Duration Date BOS NYC Stay Duration Date PVG SZX Date 365 x 1000+ DISMISSED!TEXTB2T IB2C B2M Call Center Mobile B2C SparkMasterSparkMaster...SparkSubmitDRV Inv API Fare API Fare Inv Impl DRV SeatInventoryFareCacheDRV Inv API Fare API Fare Inv Impl DRV SubmitBatchJobENDLoadReferenceDataCollectResultsBroadcastVariablesParallelComputeMasterSTARTParallelComputeCollectResultsSplitJobsInputjobInputjoboutputoutputvarsvarsVars: Flight schedule Base price Price Rules // initialization dependencies including base prices, pricing rules and some reference data Map dependencies = MyDependencyManager.loadDependencies(); // broadcasting dependencies javaSparkContext.broadcast(dependencies); // create job rdd cabinsRDD = MongoSpark.load(javaSparkContext).withPipeline(pipeline) // for each cabin, date, airport pair, calculate the price cabinsRDD.map(function calc_price); // collect the result, which will cause the data to be stored into MongoDB cabinsRDD.collect() cabinsRDD.saveToMongo() 0 500 1000 1500 2000 2500 3000 3500 LegacySpark+MongoThroughput 050100150200250300350LegacySpark+MongoLatency# curl -OL # mkdir -p ~/spark # tar -xvf spark-1.6.0-bin-hadoop2.6.tgz -C ~/spark --strip-components=1 # cd spark # ./bin/spark-shell \ --conf spark.mongodb.input.uri=mongodb://127.0.0.1/flights.av \ --conf spark.mongodb.output.uri=mongodb://127.0.0.1/flights.output \ --packages org.mongodb.spark:mongo-spark-connector_2.10:1.0.0 import com.mongodb.spark._ import org.bson.Document MongoSpark.load(sc).take(10).foreach(println) MongoSpark.load(sc) .map(doc=(doc.getString(flight) ,doc.getLong(seats))) .reduceByKey((x,y)=(x+y)) .take(10) .foreach(println) import org.bson.Document MongoSpark.load(sc) .withPipeline(Seq(Document.parse({ $match: { orig : 'KMG' } }))) .map(doc=(doc.getString(flight) ,doc.getLong(seats))) .reduceByKey((x,y)=(x+y)) .take(10) .foreach(println) CPUSparkTotaldatasize/chunksize=chunks=RDDpartitions=sparktasks1-2corespark+mongoIOchunksize(MB)+MongoDBSparkSparkHadoopMongoDB
本文标题:MongoDB+Spark:完整大数据解决方案
链接地址:https://www.777doc.com/doc-6432879 .html