- Hadoop構建數據倉庫實踐
- 王雪迎
- 2705字
- 2020-11-28 16:11:54
3.4 Hadoop生態圈的其他組件
Hadoop誕生之初只有HDFS和MapReduce兩個軟件組件,以后得到非常快速的發展,開發人員貢獻了眾多組件,以至形成了Hadoop自己的生態圈。如圖3-12所示,除MapReduce外,目前已經存在Spark、Tez等分布式處理引擎。生態圈中還有一系列遷移數據、管理集群的輔助工具。

圖3-12 Hadoop生態圈
這些產品貌似各不相同,但是三種共同的特征把它們緊密聯系起來。首先,它們都依賴于Hadoop的基本組件——YARN、HDFS或MapReduce。其次,它們都用來處理大數據,并提供建立端到端數據流水線所需的各種功能。最后,它們對于應該如何建立分布式系統的理念是共通的。
本書后面章節會詳細介紹Sqoop、Hive、Oozie、Impala、Hue等組件,并使用一個簡單的實例說明如何利用這些組件實現ETL、定時自動執行工作流、數據分析、數據可視化等完整的數據倉庫功能。這里介紹另外一種分布式計算框架——Spark。
Apache Spark是一個開源的集群計算框架。它最初由加州大學伯克利分校的AMP實驗室開發,后來Spark的源代碼捐獻給了Apache軟件基金會,從此成了一個活躍的Apache項目。Spark提供了一套完整的集群編程接口,內含容錯和并行數據處理能力。
Spark基本的數據結構叫做彈性分布式數據集(Resilient Distributed Datasets,簡稱RDD)。這是一個分布于集群節點的只讀數據集合,并以容錯的、并行的方式進行維護。傳統的MapReduce框架強制在分布式編程中使用一種特定的線性數據流處理方式。MapReduce程序從磁盤讀取輸入數據,把數據分解成鍵/值對,經過混洗、排序、歸并等數據處理后產生輸出,并將最終結果保存在磁盤。Map階段和Reduce階段的結果均要寫磁盤,這大大降低了系統性能。也是由于這個原因,MapReduce大都被用于執行批處理任務。為了解決MapReduce的性能問題,Spark使用RDD作為分布式程序的工作集合,它提供一種分布式共享內存的受限形式。在分布式共享內存系統中,應用可以向全局地址空間的任意位置進行讀寫操作,而RDD是只讀的,對其只能進行創建、轉化和求值等操作。
利用RDD可以方便地實現迭代算法,簡單地說就是能夠在一個循環中多次訪問數據集合。RDD還適合探索式的數據分析,能夠對數據重復執行類似于數據庫風格的查詢。相對于MapReduce的實現,Spark應用的延遲可以降低幾個數量級,其中最為經典的迭代算法是用于機器學習系統的培訓算法,這也是開發Spark的初衷。
Spark需要一個集群管理器和一個分布式存儲系統作為支撐。對于集群管理,Spark支持獨立管理(原生的Spark集群),HadoopYARN和Apache Mesos。對于分布式存儲,Spark可以與多種系統對接,包括HDFS、MapR文件系統、Cassandra、OpenStack Swift、Amazon S3、Kudu,或者一個用戶自己實現的文件系統。Spark還支持偽分布的本地部署模式,但通常僅用于開發和測試目的。本地模式不需要分布式存儲,而是用本地文件系統代替。在這種場景中,Spark運行在一個機器上,每個CPU核是一個執行器(executor)。
Spark框架含有SparkCore、SparkSQL、SparkStreaming、MLlib Machine Learning Library、GraphX等幾個主要組件。
1. Spark Core
SparkCore是所有Spark相關組件的基礎。它以RDD這個抽象概念為核心,通過一組應用程序接口,提供分布式任務的分發、調度和基本的I/O功能。SparkCore的編程接口支持Java、Python、Scala和R等程序語言。這組接口使用的是函數式編程模式,即一個包含對RDD進行map、filter、reduce、join等并行操作的驅動程序,向Spark傳遞一個函數,然后Spark調度此函數在集群上并行執行。這些基本操作把RDD作為輸入并產生新的RDD。RDD自身是一個不變的數據集,對RDD的所有轉換操作都是lazy模式,即Spark不會立刻計算結果,而只是簡單地記住所有對數據集的轉換操作。這些轉換只有遇到action操作的時候才會開始真正執行,這樣的設計使Spark更加高效。容錯功能是通過跟蹤每個RDD的“血統”(lineage,指的是產生此RDD的一系列操作)實現的。一旦RDD的數據丟失,還可以使用血統進行重建。RDD可以由任意類型的Python、Java或Scala對象構成。除了面向函數的編程風格,Spark還有兩種形式的共享變量:broadcast和accumulators。broadcast變量引用的是需要在所有節點上有效的只讀數據,accumulators可以簡便地對各節點返回給驅動程序的值進行聚合。
一個典型的Spark函數式編程的例子是,統計文本文件中每個單詞出現的次數,也就是常說的詞頻統計。在下面這段Scala程序代碼中,每個flatMap函數以一個空格作為分隔符,將文件分解為由單詞組成的列表,map函數將每個單詞列表條目轉化為一個以單詞為鍵,數字1為值的RDD對,reduceByKey函數對所有的單詞進行計數。每個函數調用都將一個RDD轉化為一個新的RDD。對比相同功能的Java代碼,Scala語言的簡潔性一目了然。
// 將一個本地文本文件讀取到(文件名,文件內容)的RDD對。 val data = sc.textFile("file:///home/mysql/mysql-5.6.14/README") // 以一個空格作為分隔符,將文件分解成一個由單詞組成的列表。 val words = data.flatMap(_.split(" ")) // 為每個單詞添加計數,并進行聚合計算 val wordFreq = words.map((_, 1)).reduceByKey(_ + _) // 取得出現次數最多的10個單詞 wordFreq.sortBy(s => -s._2).map(x => (x._2, x._1)).top(10)
在spark-shell(spark-shell是spark自帶的一個快速原型開發的命令行工具)里,這段代碼執行結果如下。可以看到the出現的次數最多,有26次。
scala> val data = sc.textFile("file:///home/mysql/mysql-5.6.14/README") data: org.apache.spark.rdd.RDD[String] = file:///home/mysql/mysql-5.6.14/README MapPartitionsRDD[13] at textFile at <console>:27 scala> val words = data.flatMap(_.split(" ")) words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[14] at flatMap at <console>:29 scala> val wordFreq = words.map((_, 1)).reduceByKey(_ + _) wordFreq: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[16] at reduceByKey at <console>:31 scala> wordFreq.sortBy(s => -s._2).map(x => (x._2, x._1)).top(10) res1: Array[(Int, String)] = Array((26, the), (15, ""), (14, of), (9, MySQL), (7, to), (7, is), (6, version), (6, or), (6, in), (6, a))
2. Spark SQL
SparkSQL是基于SparkCore之上的一個組件,它引入了名為DataFrames的數據抽象。DataFrames能夠支持結構化、半結構化數據。SparkSQL提供了一種“領域特定語言”(Domain-Specific Language,簡稱DSL),用于在Scala、Java或Python中操縱DataFrames。同時SparkSQL也通過命令行接口或ODBC/JDBC提供對SQL語言的支持。我們將在12.3節詳細討論SparkSQL。下面是一段Scala里的SparkSQL代碼。
val url = "jdbc:mysql://127.0.0.1/information_schema? user=root&password=xxxxxx" val sqlContext = new org.apache.spark.sql.SQLContext(sc) val df = sqlContext.read.format("jdbc").option("url", url).option("db table", " tables").load() df.printSchema() val countsByDatabase = df.groupBy(" table_SCHEMA").count().show()
這段代碼用SparkSQL連接本地的MySQL數據庫,屏幕打印information_schema. tables的表結構,并按table_schema字段分組,計算并顯示每組的記錄數。其功能基本等價于下面的MySQL語句:
use information_schema; desc tables; select table_schema, count(*) from tables group by table_schema;
執行代碼前先要在spark-env.sh文件的SPARK_CLASSPATH變量中添加MySQL JDBC驅動的JAR包,例如:
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/mysql-connector-java-5.1.31-bin.jar
然后進入spark-shell執行代碼,最后一條語句顯示的輸出如下所示:
scala> val countsByDatabase = df.groupBy(" table_SCHEMA").count().show() +------------------+-----+ | table_SCHEMA|count| +------------------+-----+ |performance_schema| 52| | hadoop| 37| |information_schema| 59| | mysql| 28| +------------------+-----+ countsByDatabase: Unit = ()
3. Spark Streaming
SparkStreaming利用SparkCore的快速調度能力執行流數據的分析。它以最小批次獲取數據,并對批次上的數據執行RDD轉化。這樣的設計,可以讓用于批處理分析的Spark應用程序代碼也可以用于流數據分析,因此便于實時大數據處理架構的實現。但是這種便利性帶來的問題是處理最小批次數據的延時。其他流數據處理引擎,例如Storm和Flink的streaming組件,都是以事件而不是最小批次為單位處理流數據的。SparkStreaming支持從Kafka、Flume、Twitter、ZeroMQ、Kinesis和TCP/IP sockets接收數據。
4. MLlib Machine Learning Library
Spark中還包含一個機器學習程序庫,叫做MLlib。MLlib提供了很多機器學習算法,包括分類、回歸、聚類、協同過濾等,還支持模型評估、數據導入等額外的功能。MLlib還提供了一些更底層的機器學習原語,如一個通用的梯度下降算法等。所有這些方法都被設計為可以在集群上輕松伸縮的架構。
5. GraphX
GraphX是Spark上的圖(如社交網絡的朋友關系圖)處理框架。可以進行并行的圖計算。與SparkStreaming和SparkSQL類似,GraphX也擴展了Spark的RDDAPI,能用來創建一個頂點和邊都包含任意屬性的有向圖。GraphX還支持針對圖的各種操作,比如進行圖分割的subgraph和操作所有頂點的mapVertices,以及一些常用的圖算法,如PageRank和三角計算等。由于RDD是只讀的,因此GraphX不適合需要更新圖的場景。