【前言】Nosql技術只掌握了MongoDB。看到一篇文章介紹如何在MongoDB上使用Spark,趕緊翻譯過來學習,提高一點核心競爭力。原文http://codeforhire.com/2014/02/18/using-spark-with-mongodb/
【正文】
在MongoDB上使用Spark
發布於 2014.02.18 作者 Sampo N
我最近開始研究Apache Spark作為數據挖掘框架。Spark建立在Apache Hadoop之上,它能夠實施除Map-Reduce外更多的操作。同樣它支持用迭代算法處理流數據。
既然Spark是基於Hadoop和HDFS,那麼它就適於任何HDFS的數據源。我們的服務器使用了MongoDB,因而我們自然選擇了mongo-hadoop 連接器,可以用它來實現從MongoDB上讀寫數據。
然而,這樣做距離我們搞清楚如何配置、使用mongo-hadoop + spark還很遠(至少對Spark入門者)。經過一番試驗,以及令人沮喪的過程,以及向spark用戶郵件列表發郵件咨詢,我最終在Java和Scala環境上獲得了成功。現在我寫出這篇教程來解救大家。
仔細閱讀以下內容,伸手黨可以看這裡:應用的樣例代碼
版本和APIsHadoop生態中充斥著各種不同的庫,他們之間可能存在的APIs沖突會讓人抓狂。主要的API變化在Hadoop 0.20。在這個版本中,老的org.apache.hadoop.mapred API變成了org.apache.hadoop.mapreduce API。API變化反過來影響了這些庫:mongo-hadoop的包com.mongodb.hadoop.mapred變成com.mongodb.hadoop,同時SparkContext包含了方法hadoopRDD和newAPIHadoopRDD。
你需要小心選擇出每個API的正確版本。這讓事情更為復雜,因為在大多數情況下兩個API的類名完全相同,只有包名不同。如果你碰到了謎一般的錯誤,再次檢查一下使用的API是一致的。
樣例使用了Hadoop 2.2.0 和新API。
庫依賴
Apache Spark依賴於多個支撐庫從Apache Commons和Hadoop 到 slf4j和Jetty。不要自己管理這些庫依賴,使用Maven,Ivy,SBT或其他版本構建工具。
樣例使用了SBT加載Akka Maven 倉庫。這個Maven倉庫包含了針對不同Hadoop版本的mongo-hadoop連接器,但是沒有2.2.0的。因此單獨添加了mongo-hadoop連接器。
在spark中使用mongo-hadoop
mongo-hadoop配置參數使用配置對象(從Hadoop包中獲得)傳遞。最重要的參數是mongo.input.uri和mongo.output.uri,這個參數提供了MongoDB主機、端口、鑒權、db和collection名字。你也可以提供其他的配置選項,例如Mong查詢語句用來限制輸出數據。
每一個Mongo Collection分別作為獨立的RDD載入,載入用的sparkcontext:
JavaPairRDD
這裡用了新的API,並且MongoInputFormat必須從com.mongodb.hadoop導入。對於舊的API,你應該使用hadoopRDD方法和com.mongodb.hadoop.mapred.MongoInputFormat。
返回類型是RDD
保存RDD到MongoDB,使用了saveAsNewAPIHadoopFile方法:
rdd.saveAsNewAPIHadoopFile("file:///bogus", Object.class, Object.class, MongoOutputFormat.class, config);
只有最後兩個參數看起來相關(雖然第一個參數必須是合法HDFS的URI)。RDD同樣是RDD
樣例app樣例應用 包含簡單的單詞計數算法,既有java的也有scala的。他們從MongoDB的beowulf.input collection中讀出數據,在本地運行。(MongoDB的)文檔只包含文字域,計數算法在文字域上工作。
結果存儲在同樣的beowulf庫中,collection為output,文檔包含word(文章中的單詞)域和計數域。
樣例要求MongoDB運行在本地,Scala版本2.10,SBT安裝。然後你可以導入樣例數據,運行程序,輸出結果,使用如下的命令:
mongoimport -d beowulf -c input beowulf.json
sbt 'run-main JavaWordCount'
sbt 'run-main ScalaWordCount'
mongo beowulf --eval 'printjson(db.output.find().toArray())' | less