Apache Crunch(孵化器項目)是基於Google的FlumeJava庫編寫的Java庫,用於創建MapReduce流水線。與其他用來創建 MapReduce作業的高層工具(如Apache Hive、Apache Pig和Cascading等)類似,Crunch提供了用於實現如連接數據、執行 聚合和排序記錄等常見任務的模式庫。而與其他工具不同的是,Crunch並不強制所有輸入遵循同一數據類型。相反,Crunch 使用了一種定制的類型系統,非常靈活,能夠直接處理復雜數據類型,如時間序列、HDF5文件、Apache HBase表和序列化對 象(像protocol buffer或Avro記錄)等。
Crunch並不想阻止開發者以MapReduce方式思考,而是嘗試使之簡化。盡管 MapReduce有諸多優點,但對很多問題而言,並非正確的抽象級別:大部分有意思的計算都是由多個MapReduce作業組成的, 情況往往是這樣——出於性能考慮,我們需要將邏輯上獨立的操作(如數據過濾、數據投影和數據變換)組合為一個物理上 的MapReduce作業。
本質上,Crunch設計為MapReduce之上的一個薄層,希望在不犧牲MapReduce力量(或者說不影響 開發者使用MapReduce API)的前提下,更容易在正確的抽象級別解決手頭問題。
盡管Crunch會讓人想起歷史悠久的 Cascading API,但是它們各自的數據模型有很大不同:按照常識簡單總結一下,可以認為把問題看做數據流的人會偏愛 Crunch和Pig,而考慮SQL風格連接的人會偏愛Cascading和Hive。
Crunch的理念
PCollection和PTable<K, V>是Crunch的核心抽象,前者代表一個分布式、不可變的對象集合,後者是Pcollection的一個子接口,其中包含了處理 鍵值對的額外方法。這兩個核心類支持如下四個基本操作:
combineValues:執行一個關聯操作來聚合來自groupByKey操作的值。
union:將兩個或多個Pcollection看做一個虛擬的PCollection。
Crunch的所有高階操作(joins、cogroups和set operations等)都是通過這些基本原語實現的。Crunch的作業計劃器( job planner)接收流水線開發者定義的操作圖,將操作分解為一系列相關的MapReduce作業,然後在Hadoop集群上執行。 Crunch也支持內存執行引擎,可用於本地數據上流水線的測試與調試。
有些問題可以從能夠操作定制數據類型的大 量用戶定義函數受益,而Crunch就是為這種問題設計的。Crunch中的用戶定義函數設計為輕量級的,為滿足應用程序的需要 ,仍然提供了完整的訪問底層MapReduce API的功能。Crunch開發者也可以使用Crunch原語來定義API,為客戶提供涉及一系 列復雜MapReduce作業的高級ETL、機器學習和科學計算功能。
Crunch起步
可以從Crunch的網站: http://incubator.apache.org/crunch/download.html下載最新版本的源代碼或二進制文件,或者使用在Maven Central發 布的dependencies。
源代碼中有很多示例應用。下面是Crunch中WordCount應用的源代碼:
import org.apache.crunch.DoFn; import org.apache.crunch.Emitter; import org.apache.crunch.PCollection; import org.apache.crunch.PTable; import org.apache.crunch.Pair; import org.apache.crunch.Pipeline; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.type.writable.Writables; public class WordCount { public static void main(String[] args) throws Exception { // Create an object to coordinate pipeline creation and execution. Pipeline pipeline = new MRPipeline(WordCount.class); // Reference a given text file as a collection of Strings. PCollection<String> lines = pipeline.readTextFile(args[0]); // Define a function that splits each line in a PCollection of Strings into a // PCollection made up of the incenteridual words in the file. PCollection<String> words = lines.parallelDo(new DoFn<String, String>() { public void process(String line, Emitter<String> emitter) { for (String word : line.split("\\s+")) { emitter.emit(word); } } }, Writables.strings()); // Indicates the serialization format // The count method applies a series of Crunch primitives and returns // a map of the top 20 unique words in the input PCollection to their counts. // We then read the results of the MapReduce jobs that performed the // computations into the client and write them to stdout. for (Pair<String, Long> wordCount : words.count().top(20).materialize()) { System.out.println(wordCount); } } }
Crunch優化方案
Crunch優化器的目標是盡可能減少運行的MapReduce作業數。大多數MapReduce作業都是 IO密集 型的,因此訪問數據的次數越少越好。公平地說,每種優化器(Hive、Pig、Cascading和Crunch)的工作方式本質上是相同 的。但與其他框架不同的是,Crunch把優化器原語暴露給了客戶開發人員,對於像構造ETL流水線或構建並評估一組隨機森 林模型這樣的任務而言,構造可復用的高階操作更容易。
結論
Crunch目前仍處於Apache的孵化器階段,我們 非常歡迎社區貢獻(參見項目主頁:http://incubator.apache.org/projects/crunch.html)讓這個庫更好 。特別的是,我們正在尋求更高效的MapReduce編譯思想(包括基於成本考慮的優化)、新的MapReduce設計模式,還希望支 持更多的數據源和目標,如HCatalog、Solr和ElasticSearch等。還有很多把Crunch帶向如Scala和Clojure等其他JVM語言的 項目,也有很多使用Crunch以R語言來創建MapReduce流水線的工具。