官术网_书友最值得收藏!

3.4 Hadoop生態圈的其他組件

Hadoop誕生之初只有HDFS和MapReduce兩個軟件組件,以后得到非常快速的發展,開發人員貢獻了眾多組件,以至形成了Hadoop自己的生態圈。如圖3-12所示,除MapReduce外,目前已經存在Spark、Tez等分布式處理引擎。生態圈中還有一系列遷移數據、管理集群的輔助工具。

圖3-12 Hadoop生態圈

這些產品貌似各不相同,但是三種共同的特征把它們緊密聯系起來。首先,它們都依賴于Hadoop的基本組件——YARN、HDFS或MapReduce。其次,它們都用來處理大數據,并提供建立端到端數據流水線所需的各種功能。最后,它們對于應該如何建立分布式系統的理念是共通的。

本書后面章節會詳細介紹Sqoop、Hive、Oozie、Impala、Hue等組件,并使用一個簡單的實例說明如何利用這些組件實現ETL、定時自動執行工作流、數據分析、數據可視化等完整的數據倉庫功能。這里介紹另外一種分布式計算框架——Spark。

Apache Spark是一個開源的集群計算框架。它最初由加州大學伯克利分校的AMP實驗室開發,后來Spark的源代碼捐獻給了Apache軟件基金會,從此成了一個活躍的Apache項目。Spark提供了一套完整的集群編程接口,內含容錯和并行數據處理能力。

Spark基本的數據結構叫做彈性分布式數據集(Resilient Distributed Datasets,簡稱RDD)。這是一個分布于集群節點的只讀數據集合,并以容錯的、并行的方式進行維護。傳統的MapReduce框架強制在分布式編程中使用一種特定的線性數據流處理方式。MapReduce程序從磁盤讀取輸入數據,把數據分解成鍵/值對,經過混洗、排序、歸并等數據處理后產生輸出,并將最終結果保存在磁盤。Map階段和Reduce階段的結果均要寫磁盤,這大大降低了系統性能。也是由于這個原因,MapReduce大都被用于執行批處理任務。為了解決MapReduce的性能問題,Spark使用RDD作為分布式程序的工作集合,它提供一種分布式共享內存的受限形式。在分布式共享內存系統中,應用可以向全局地址空間的任意位置進行讀寫操作,而RDD是只讀的,對其只能進行創建、轉化和求值等操作。

利用RDD可以方便地實現迭代算法,簡單地說就是能夠在一個循環中多次訪問數據集合。RDD還適合探索式的數據分析,能夠對數據重復執行類似于數據庫風格的查詢。相對于MapReduce的實現,Spark應用的延遲可以降低幾個數量級,其中最為經典的迭代算法是用于機器學習系統的培訓算法,這也是開發Spark的初衷。

Spark需要一個集群管理器和一個分布式存儲系統作為支撐。對于集群管理,Spark支持獨立管理(原生的Spark集群),HadoopYARN和Apache Mesos。對于分布式存儲,Spark可以與多種系統對接,包括HDFS、MapR文件系統、Cassandra、OpenStack Swift、Amazon S3、Kudu,或者一個用戶自己實現的文件系統。Spark還支持偽分布的本地部署模式,但通常僅用于開發和測試目的。本地模式不需要分布式存儲,而是用本地文件系統代替。在這種場景中,Spark運行在一個機器上,每個CPU核是一個執行器(executor)。

Spark框架含有SparkCore、SparkSQL、SparkStreaming、MLlib Machine Learning Library、GraphX等幾個主要組件。

1. Spark Core

SparkCore是所有Spark相關組件的基礎。它以RDD這個抽象概念為核心,通過一組應用程序接口,提供分布式任務的分發、調度和基本的I/O功能。SparkCore的編程接口支持Java、Python、Scala和R等程序語言。這組接口使用的是函數式編程模式,即一個包含對RDD進行map、filter、reduce、join等并行操作的驅動程序,向Spark傳遞一個函數,然后Spark調度此函數在集群上并行執行。這些基本操作把RDD作為輸入并產生新的RDD。RDD自身是一個不變的數據集,對RDD的所有轉換操作都是lazy模式,即Spark不會立刻計算結果,而只是簡單地記住所有對數據集的轉換操作。這些轉換只有遇到action操作的時候才會開始真正執行,這樣的設計使Spark更加高效。容錯功能是通過跟蹤每個RDD的“血統”(lineage,指的是產生此RDD的一系列操作)實現的。一旦RDD的數據丟失,還可以使用血統進行重建。RDD可以由任意類型的Python、Java或Scala對象構成。除了面向函數的編程風格,Spark還有兩種形式的共享變量:broadcast和accumulators。broadcast變量引用的是需要在所有節點上有效的只讀數據,accumulators可以簡便地對各節點返回給驅動程序的值進行聚合。

一個典型的Spark函數式編程的例子是,統計文本文件中每個單詞出現的次數,也就是常說的詞頻統計。在下面這段Scala程序代碼中,每個flatMap函數以一個空格作為分隔符,將文件分解為由單詞組成的列表,map函數將每個單詞列表條目轉化為一個以單詞為鍵,數字1為值的RDD對,reduceByKey函數對所有的單詞進行計數。每個函數調用都將一個RDD轉化為一個新的RDD。對比相同功能的Java代碼,Scala語言的簡潔性一目了然。

    // 將一個本地文本文件讀取到(文件名,文件內容)的RDD對。
    val data = sc.textFile("file:///home/mysql/mysql-5.6.14/README")
    // 以一個空格作為分隔符,將文件分解成一個由單詞組成的列表。
    val words = data.flatMap(_.split(" "))
    // 為每個單詞添加計數,并進行聚合計算
    val wordFreq = words.map((_, 1)).reduceByKey(_ + _)
    // 取得出現次數最多的10個單詞
    wordFreq.sortBy(s => -s._2).map(x => (x._2, x._1)).top(10)

在spark-shell(spark-shell是spark自帶的一個快速原型開發的命令行工具)里,這段代碼執行結果如下。可以看到the出現的次數最多,有26次。

    scala> val data = sc.textFile("file:///home/mysql/mysql-5.6.14/README")
    data: org.apache.spark.rdd.RDD[String] = file:///home/mysql/mysql-5.6.14/README
    MapPartitionsRDD[13] at textFile at <console>:27

    scala> val words = data.flatMap(_.split(" "))
    words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[14] at flatMap at <console>:29

    scala> val wordFreq = words.map((_, 1)).reduceByKey(_ + _)
    wordFreq: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[16] at reduceByKey at
    <console>:31

    scala> wordFreq.sortBy(s => -s._2).map(x => (x._2, x._1)).top(10)
    res1: Array[(Int, String)] = Array((26, the), (15, ""), (14, of), (9, MySQL), (7, to), (7, is),
    (6, version), (6, or), (6, in), (6, a))

2. Spark SQL

SparkSQL是基于SparkCore之上的一個組件,它引入了名為DataFrames的數據抽象。DataFrames能夠支持結構化、半結構化數據。SparkSQL提供了一種“領域特定語言”(Domain-Specific Language,簡稱DSL),用于在Scala、Java或Python中操縱DataFrames。同時SparkSQL也通過命令行接口或ODBC/JDBC提供對SQL語言的支持。我們將在12.3節詳細討論SparkSQL。下面是一段Scala里的SparkSQL代碼。

      val url = "jdbc:mysql://127.0.0.1/information_schema? user=root&password=xxxxxx"
      val sqlContext = new org.apache.spark.sql.SQLContext(sc)
      val df = sqlContext.read.format("jdbc").option("url", url).option("db table", " tables").load()
      df.printSchema()
      val countsByDatabase = df.groupBy(" table_SCHEMA").count().show()

這段代碼用SparkSQL連接本地的MySQL數據庫,屏幕打印information_schema. tables的表結構,并按table_schema字段分組,計算并顯示每組的記錄數。其功能基本等價于下面的MySQL語句:

      use information_schema;
      desc  tables;
      select  table_schema, count(*) from  tables group by  table_schema;

執行代碼前先要在spark-env.sh文件的SPARK_CLASSPATH變量中添加MySQL JDBC驅動的JAR包,例如:

      export SPARK_CLASSPATH=$SPARK_CLASSPATH:/mysql-connector-java-5.1.31-bin.jar

然后進入spark-shell執行代碼,最后一條語句顯示的輸出如下所示:

      scala> val countsByDatabase = df.groupBy(" table_SCHEMA").count().show()
      +------------------+-----+
      |         table_SCHEMA|count|
      +------------------+-----+
      |performance_schema|    52|
      |                hadoop|    37|
      |information_schema|    59|
      |                  mysql|    28|
      +------------------+-----+

      countsByDatabase: Unit = ()

3. Spark Streaming

SparkStreaming利用SparkCore的快速調度能力執行流數據的分析。它以最小批次獲取數據,并對批次上的數據執行RDD轉化。這樣的設計,可以讓用于批處理分析的Spark應用程序代碼也可以用于流數據分析,因此便于實時大數據處理架構的實現。但是這種便利性帶來的問題是處理最小批次數據的延時。其他流數據處理引擎,例如Storm和Flink的streaming組件,都是以事件而不是最小批次為單位處理流數據的。SparkStreaming支持從Kafka、Flume、Twitter、ZeroMQ、Kinesis和TCP/IP sockets接收數據。

4. MLlib Machine Learning Library

Spark中還包含一個機器學習程序庫,叫做MLlib。MLlib提供了很多機器學習算法,包括分類、回歸、聚類、協同過濾等,還支持模型評估、數據導入等額外的功能。MLlib還提供了一些更底層的機器學習原語,如一個通用的梯度下降算法等。所有這些方法都被設計為可以在集群上輕松伸縮的架構。

5. GraphX

GraphX是Spark上的圖(如社交網絡的朋友關系圖)處理框架。可以進行并行的圖計算。與SparkStreaming和SparkSQL類似,GraphX也擴展了Spark的RDDAPI,能用來創建一個頂點和邊都包含任意屬性的有向圖。GraphX還支持針對圖的各種操作,比如進行圖分割的subgraph和操作所有頂點的mapVertices,以及一些常用的圖算法,如PageRank和三角計算等。由于RDD是只讀的,因此GraphX不適合需要更新圖的場景。

主站蜘蛛池模板: 乌拉特后旗| 贵阳市| 景德镇市| 临西县| 秀山| 兴海县| 教育| 锦屏县| 阿瓦提县| 应用必备| 曲麻莱县| 聂拉木县| 宾川县| 紫金县| 绥宁县| 嘉定区| 天柱县| 襄垣县| 维西| 临泽县| 金昌市| 莱芜市| 丰城市| 加查县| 中方县| 洪湖市| 崇义县| 蓬溪县| 迁西县| 项城市| 布尔津县| 安丘市| 郎溪县| 南涧| 营口市| 钦州市| 惠东县| 定结县| 白水县| 六安市| 湘阴县|