官术网_书友最值得收藏!

3.1 為什么說RDD和DataSet是Spark的靈魂

Spark建立在抽象的RDD上,使得它可以用一致的方式處理大數據不同的應用場景,把所有需要處理的數據轉化成為RDD,然后對RDD進行一系列的算子運算,從而得到結果。RDD是一個容錯的、并行的數據結構,可以將數據存儲到內存和磁盤中,并能控制數據分區,且提供了豐富的API來操作數據。Spark一體化、多元化的解決方案極大地減少了開發和維護的人力成本和部署平臺的物力成本,并在性能方面有極大的優勢,特別適合于迭代計算,如機器學習和圖計算;同時,Spark對Scala和Python交互式shell的支持也極大地方便了通過shell直接使用Spark集群來驗證解決問題的方法,這對于原型開發至關重要,對數據分析人員有著無法拒絕的吸引力。

3.1.1 RDD的定義及五大特性剖析

RDD是分布式內存的一個抽象概念,是一種高度受限的共享內存模型,即RDD是只讀的記錄分區的集合,能橫跨集群所有節點并行計算,是一種基于工作集的應用抽象。

RDD底層存儲原理:其數據分布存儲于多臺機器上,事實上,每個RDD的數據都以Block的形式存儲于多臺機器上,每個Executor會啟動一個BlockManagerSlave,并管理一部分Block;而Block的元數據由Driver節點上的BlockManagerMaster保存,BlockManagerSlave生成Block后向BlockManagerMaster注冊該Block,BlockManagerMaster管理RDD與Block的關系,當RDD不再需要存儲的時候,將向BlockManagerSlave發送指令刪除相應的Block。

BlockManager管理RDD的物理分區,每個Block就是節點上對應的一個數據塊,可以存儲在內存或者磁盤上。而RDD中的Partition是一個邏輯數據塊,對應相應的物理塊Block。本質上,一個RDD在代碼中相當于數據的一個元數據結構,存儲著數據分區及其邏輯結構映射關系,存儲著RDD之前的依賴轉換關系。

BlockManager在每個節點上運行管理Block(Driver和Executors),它提供一個接口檢索本地和遠程的存儲變量,如memory、disk、off-heap。使用BlockManager前必須先初始化。

BlockManager.scala的部分源碼如下所示。

1.   private[spark] class BlockManager(
2.      executorId: String,
3.      rpcEnv: RpcEnv,
4.      val master: BlockManagerMaster,
5.      val serializerManager: SerializerManager,
6.      val conf: SparkConf,
7.      memoryManager: MemoryManager,
8.      mapOutputTracker: MapOutputTracker,
9.      shuffleManager: ShuffleManager,
10.     val blockTransferService: BlockTransferService,
11.     securityManager: SecurityManager,
12.     numUsableCores: Int)
13.   extends BlockDataManager with BlockEvictionHandler with Logging {

BlockManagerMaster會持有整個Application的Block的位置、Block所占用的存儲空間等元數據信息,在Spark的Driver的DAGScheduler中,就是通過這些信息來確認數據運行的本地性的。Spark支持重分區,數據通過Spark默認的或者用戶自定義的分區器決定數據塊分布在哪些節點。RDD的物理分區是由Block-Manager管理的,每個Block就是節點上對應的一個數據塊,可以存儲在內存或者磁盤。而RDD中的partition是一個邏輯數據塊,對應相應的物理塊Block。本質上,一個RDD在代碼中相當于數據的一個元數據結構(一個RDD就是一組分區),存儲著數據分區及Block、Node等的映射關系,以及其他元數據信息,存儲著RDD之前的依賴轉換關系。分區是一個邏輯概念,Transformation前后的新舊分區在物理上可能是同一塊內存存儲。

Spark通過讀取外部數據創建RDD,或通過其他RDD執行確定的轉換Transformation操作(如map、union和groubByKey)而創建,從而構成了線性依賴關系,或者說血統關系(Lineage),在數據分片丟失時可以從依賴關系中恢復自己獨立的數據分片,對其他數據分片或計算機沒有影響,基本沒有檢查點開銷,使得實現容錯的開銷很低,失效時只需要重新計算RDD分區,就可以在不同節點上并行執行,而不需要回滾(Roll Back)整個程序。落后任務(即運行很慢的節點)是通過任務備份,重新調用執行進行處理的。

因為RDD本身支持基于工作集的運用,所以可以使Spark的RDD持久化(persist)到內存中,在并行計算中高效重用。多個查詢時,我們就可以顯性地將工作集中的數據緩存到內存中,為后續查詢提供復用,這極大地提升了查詢的速度。在Spark中,一個RDD就是一個分布式對象集合,每個RDD可分為多個片(Partitions),而分片可以在集群環境的不同節點上計算。

RDD作為泛型的抽象的數據結構,支持兩種計算操作算子:Transformation(變換)與Action(行動)。且RDD的寫操作是粗粒度的,讀操作既可以是粗粒度的,也可以是細粒度的。

RDD.scala的源碼如下。

1.  /** 每個RDD都有5個主要特性
2.    *-分區列表
3.    *-每個分區都有一個計算函數
4.    *-依賴于其他RDD的列表
5.    *- 數據類型(Key-Value)的RDD分區器
6.    *- 每個分區都有一個分區位置列表
7.    */
8.   abstract class RDD[T: ClassTag](
9.     @transient private var _sc: SparkContext,
10.    @transient private var deps: Seq[Dependency[_]]
11.  ) extends Serializable with Logging {

其中,SparkContext是Spark功能的主要入口點,一個SparkContext代表一個集群連接,可以用其在集群中創建RDD、累加變量、廣播變量等,在每一個可用的JVM中只有一個SparkContext,在創建一個新的SparkContext之前,必須先停止該JVM中可用的SparkContext,這種限制可能最終會被修改。SparkContext被實例化時需要一個SparkConf對象去描述應用的配置信息,在這個配置對象中設置的信息,會覆蓋系統默認的配置。

RDD五大特性:

(1)分區列表(a list of partitions)。Spark RDD是被分區的,每一個分區都會被一個計算任務(Task)處理,分區數決定并行計算數量,RDD的并行度默認從父RDD傳給子RDD。默認情況下,一個HDFS上的數據分片就是一個Partition,RDD分片數決定了并行計算的力度,可以在創建RDD時指定RDD分片個數,如果不指定分區數量,當RDD從集合創建時,則默認分區數量為該程序所分配到的資源的CPU核數(每個Core可以承載2~4個Partition),如果是從HDFS文件創建,默認為文件的Block數。

(2)每一個分區都有一個計算函數(a function for computing each split)。每個分區都會有計算函數,Spark的RDD的計算函數是以分片為基本單位的,每個RDD都會實現compute函數,對具體的分片進行計算,RDD中的分片是并行的,所以是分布式并行計算。有一點非常重要,就是由于RDD有前后依賴關系,遇到寬依賴關系,例如,遇到reduceBykey等寬依賴操作的算子,Spark將根據寬依賴劃分Stage,Stage內部通過Pipeline操作,通過Block Manager獲取相關的數據,因為具體的split要從外界讀數據,也要把具體的計算結果寫入外界,所以用了一個管理器,具體的split都會映射成BlockManager的Block,而具體split會被函數處理,函數處理的具體形式是以任務的形式進行的。

(3)依賴于其他RDD的列表(a list of dependencies on other RDDs)。RDD的依賴關系,由于RDD每次轉換都會生成新的RDD,所以RDD會形成類似流水線的前后依賴關系,當然,寬依賴就不類似于流水線了,寬依賴后面的RDD具體的數據分片會依賴前面所有的RDD的所有的數據分片,這時數據分片就不進行內存中的Pipeline,這時一般是跨機器的。因為有前后的依賴關系,所以當有分區數據丟失的時候,Spark會通過依賴關系重新計算,算出丟失的數據,而不是對RDD所有的分區進行重新計算。RDD之間的依賴有兩種:窄依賴(Narrow Dependency)、寬依賴(Wide Dependency)。RDD是Spark的核心數據結構,通過RDD的依賴關系形成調度關系。通過對RDD的操作形成整個Spark程序。

RDD有Narrow Dependency和Wide Dependency兩種不同類型的依賴,其中的Narrow Dependency指的是每一個parent RDD的Partition最多被child RDD的一個Partition所使用,而Wide Dependency指的是多個child RDD的Partition會依賴于同一個parent RDD的Partition。可以從兩個方面來理解RDD之間的依賴關系:一方面是該RDD的parent RDD是什么;另一方面是依賴于parent RDD的哪些Partitions;根據依賴于parent RDD的Partitions的不同情況,Spark將Dependency分為寬依賴和窄依賴兩種。Spark中寬依賴指的是生成的RDD的每一個partition都依賴于父RDD的所有partition,寬依賴典型的操作有groupByKey、sortByKey等,寬依賴意味著shuffle操作,這是Spark劃分Stage邊界的依據,Spark中寬依賴支持兩種Shuffle Manager,即HashShuffleManager和SortShuffleManager,前者是基于Hash的Shuffle機制,后者是基于排序的Shuffle機制。Spark 2.2現在的版本中已經沒有Hash Shuffle的方式。

(4)key-value數據類型的RDD分區器(-Optionally,a Partitioner for key-value RDDS),控制分區策略和分區數。每個key-value形式的RDD都有Partitioner屬性,它決定了RDD如何分區。當然,Partition的個數還決定每個Stage的Task個數。RDD的分片函數,想控制RDD的分片函數的時候可以分區(Partitioner)傳入相關的參數,如HashPartitioner、RangePartitioner,它本身針對key-value的形式,如果不是key-value的形式,它就不會有具體的Partitioner。Partitioner本身決定了下一步會產生多少并行的分片,同時,它本身也決定了當前并行(parallelize)Shuffle輸出的并行數據,從而使Spark具有能夠控制數據在不同節點上分區的特性,用戶可以自定義分區策略,如Hash分區等。Spark提供了“partitionBy”運算符,能通過集群對RDD進行數據再分配來創建一個新的RDD。

(5)每個分區都有一個優先位置列表(-Optionally,a list of preferred locations to compute each split on)。它會存儲每個Partition的優先位置,對于一個HDFS文件來說,就是每個Partition塊的位置。觀察運行spark集群的控制臺會發現Spark的具體計算,具體分片前,它已經清楚地知道任務發生在什么節點上,也就是說,任務本身是計算層面的、代碼層面的,代碼發生運算之前已經知道它要運算的數據在什么地方,有具體節點的信息。這就符合大數據中數據不動代碼動的特點。數據不動代碼動的最高境界是數據就在當前節點的內存中。這時有可能是memory級別或Alluxio級別的,Spark本身在進行任務調度時候,會盡可能將任務分配到處理數據的數據塊所在的具體位置。據Spark的RDD.Scala源碼函數getPreferredLocations可知,每次計算都符合完美的數據本地性。

RDD類源碼文件中的4個方法和一個屬性對應上述闡述的RDD的5大特性。

RDD.scala的源碼如下。

1.
2.   /**
3.     * :: DeveloperApi ::
4.     * 通過子類實現給定分區的計算
5.     */
6.    @DeveloperApi
7.    def compute(split: Partition, context: TaskContext): Iterator[T]
8.
9.   /**
       *  通過子類實現,返回一個RDD分區列表,這個方法僅只被調用一次,它是安全地執行一次
       *  耗時計算數組中的分區必須符合以下屬性設置
10.    *  'rdd.partitions.zipWithIndex.forall { case (partition, index) =>
       *  partition.index == index }'
11.    */
12.   protected def getPartitions: Array[Partition]
13.
14.  /**
       *返回對父RDD的依賴列表,這個方法僅只被調用一次,它是安全地執行一次耗時計算
15.    */
16.   protected def getDependencies: Seq[Dependency[_]] = deps
17.
18.  /**
       * 可選的,指定優先位置,輸入參數是split分片,輸出結果是一組優先的節點位置
19.    */
20.  protected def getPreferredLocations(split: Partition): Seq[String] = Nil
21.
22.  /** 可選的,通過子類來實現。指定如何分區 */
23.  @transient val partitioner: Option[Partitioner] = None

其中,TaskContext是讀取或改變執行任務的環境,用org.apache.spark.TaskContext.get()可返回當前可用的TaskContext,可以調用內部的函數訪問正在運行任務的環境信息。Partitioner是一個對象,定義了如何在key-Value類型的RDD元素中用Key分區,從0到numPartitions-1區間內映射每一個Key到Partition ID。Partition是一個RDD的分區標識符。Partition.scala的源碼如下。

1.   trait Partition extends Serializable {
2.    /**
       * 獲取父RDD的分區索引
3.     */
4.
5.    def index: Int
6.
7.    //最好默認實現HashCode
8.    override def hashCode(): Int = index
9.    override def equals(other: Any): Boolean = super.equals(other)
10. }

3.1.2 DataSet的定義及內部機制剖析

DataSet是可以并行使用函數或關系操作轉換特定域對象的強類型集合。每個DataSet有一個非類型化的DataFrame。DataFrame是Dataset[Row]的別名。DataSet中可用的算子分為轉換算子和行動算子。轉換算子可以產生新的DataSet;行動算子將觸發計算和返回結果。轉換算子包括map、filter、select和聚集算子,如groupBy。行動算子包括count、show,或者將數據保存到文件系統中。

DataSet是“懶加載”的,即只有在行動算子被觸發時,才進行計算操作。本質上,DataSet表示一個邏輯計劃,它描述了生成數據所需的計算。當行動算子被觸發時,Spark查詢優化器將優化邏輯計劃,生成一個并行、分布式有效執行的物理計劃。使用explain函數可以查看邏輯計劃以及優化的物理計劃。

為了有效地支持特定領域的對象,編碼器[Encoder]是必需的。編碼器將特定類型T轉換為Spark的內部類型。例如,給定一個類Person有兩個屬性,包括‘名字’(string)和‘年齡’(int),編碼器告訴Spark在運行生成代碼時將Person對象序列化成二進制數據。二進制數據通常占用更少的內存以及更優化的數據處理效率(如列存儲格式)。可以使用schema函數來查看了解數據的內部二進制結構。

通常有兩種創建數據集Dataset的方法。

方法一:最常見的方式是Spark在SparkSession中使用read功能讀入存儲系統中的文件。例如,Scala版本:可以使用spark.read.parquet方式讀入parquet格式的文件,使用as方法轉換為[Person]數據類型的DataSet。Java版本:使用spark.read.parquet方式讀入parquet格式的文件,在as方法中使用編碼器對Person.class數據類型進行編碼,生成DataSet。

1.  * {{{
2.  *   val people = spark.read.parquet("...").as[Person]  //Scala
3.  *   Dataset<Person> people = spark.read().parquet("...").as(Encoders
        .bean(Person.class)); //Java
4.  * }}}

方法二:DataSet也可以通過現有DataSet進行轉換創建。例如,在現有的DataSet中使用過濾算子,創建一個新的數據集。下面看一個生成新DataSet的例子。Scala版本:在已有的Dataset[Person]中使用map轉換函數,獲取Person中的姓名,將生成新的Dataset[String];Java版本:在已有的數據集Dataset<String>中使用map轉換函數,通過(Person p) -> p.name獲取Person中的姓名,編碼器指定姓名屬性的類型為String類型,生成新的姓名的數據集Dataset<String>。(Person p) -> p.name這種寫法為Lambda表達式,這是Java 8之后才有的新特性。

1.  ** {{{
2.  *   val names = people.map(_.name)
                                    //在Scala中,names是一個String類型的DataSet
3.  *   Dataset<String> names = people.map((Person p) -> p.name, Encoders.
        STRING));
4.      //in Java 8
5.  * }}}

通過各種特定領域的語言(DSL)定義的功能: Dataset(類), [Column]和[functions]等非類型化數據集的操作也可以。這些操作非常類似于R或Python語言在數據表中的抽象操作。在scala中,我們使用apply方法,從people的數據集中選擇“年齡”這一列;在Java中使用"col"方法,通過people.col("age")獲取到年齡列。

1.  * 從DataSet中選擇一列,在Scala中使用apply方法,在Java中使用col方法
2.  * {{{
3.  *   val ageCol = people("age")                     //在Scala中
4.  *   Column ageCol = people.col("age");             //在Java中
5.  * }}}

注意,[Column]類型也可以通過它的各種函數來操作。例如,以下代碼在人員數據集中創建一個新的列,每個人的年齡增加10。在Scala中使用的方法是people("age") + 10;在Java中使用plus方法。

1.  * {{{
2.  *   //下面創建一個新的列,每個人的年齡增加10
3.  *   people("age") + 10                    //在Scala中
4.  *   people.col("age").plus(10);           //在Java中
5.  * }}}

下面是一個更具體的例子:使用spark.read.parquet分別讀入parquet格式的人員數據及部門數據,過濾出年齡大于30歲的人員,根據部門ID和部門數據進行join,然后按照姓名、性別分組,再使用agg方法,調用內置函數avg計算出部門中的平均工資、人員的最大年齡。Scala版本代碼如下。

1.  * {{{
2.  *   //使用SparkSession 創建 Dataset[Row]
3.  *   val people = spark.read.parquet("...")
4.  *   val department = spark.read.parquet("...")
5.  *
6.   *   people.filter("age > 30")
7.   *     .join(department, people("deptId") === department("id"))
8.   *     .groupBy(department("name"), "gender")
9.   *     .agg(avg(people("salary")), max(people("age")))
10.  * }}}

以上例子的Java版本代碼如下。

1.   * {{{
2.   *   //To create Dataset<Row> using SparkSession
3.   *   Dataset<Row> people = spark.read().parquet("...");
4.   *   Dataset<Row> department = spark.read().parquet("...");
5.   *
6.   *   people.filter("age".gt(30))
7.   *     .join(department, people.col("deptId").equalTo(department("id")))
8.   *     .groupBy(department.col("name"), "gender")
9.   *     .agg(avg(people.col("salary")), max(people.col("age")));
10.  * }}}
主站蜘蛛池模板: 吉首市| 凌海市| 三都| 墨玉县| 东乌珠穆沁旗| 察哈| 太仆寺旗| 濮阳市| 汤阴县| 温宿县| 乌兰县| 清河县| 屏东市| 右玉县| 本溪| 资阳市| 炎陵县| 永春县| 抚宁县| 堆龙德庆县| 临漳县| 丹凤县| 孝感市| 琼海市| 即墨市| 永定县| 观塘区| 金堂县| 日照市| 抚宁县| 唐河县| 镇江市| 华阴市| 巢湖市| 镇坪县| 常熟市| 中宁县| 怀来县| 苗栗市| 玉环县| 阳江市|