官术网_书友最值得收藏!

3.9 基于DataSet的代碼到底是如何一步步轉化成為RDD的

基于DataSet的代碼轉換為RDD之前需要一個Action的操作,基于Spark中的新解析引擎Catalyst進行優化,Spark中的Catalyst不僅限于SQL的優化,Spark的五大子框架(Spark Cores、Spark SQL、Spark Streaming、Spark GraphX、Spark Mlib)將來都會基于Catalyst基礎之上。

Dataset.scala的collect方法的源碼如下。

Spark 2.1.1版本的Dataset.scala的源碼如下。

1.  def collect(): Array[T] = collect(needCallback = true)

Spark 2.2.0版本Dataset.scala的源碼與Spark 2.1.1版本相比具有如下特點:將Dataset的action包裹起來,這樣可跟蹤QueryExecution和時間成本,然后匯報給用戶注冊的回調函數。

1.  def collect(): Array[T] = withAction("collect", queryExecution)
    (collectFromPlan)

進入collect(needCallback:true)的方法如下。

Spark 2.1.1版本的Dataset.scala的源碼如下。

1.   private def collect(needCallback: Boolean): Array[T] = {
2.     def execute(): Array[T] = withNewExecutionId {
3.       queryExecution.executedPlan.executeCollect().map(boundEnc.fromRow)
4.     }
5.
6.     if (needCallback) {
7.       withCallback("collect", toDF())(_ => execute())
8.     } else {
9.       execute()
10.    }
11.  }

Spark 2.2.0版本的Dataset.scala的源碼與Spark 2.1.1版本相比具有如下特點。

 調用關鍵的代碼SQLExecution.withNewExecutionId(sparkSession,qe){action(qe. executedPlan)}得到計算結果。

 action(qe.executedPlan)是collect方法中傳入的函數collectFromPlan,在函數collectFromPlan中傳入參數qe.executedPlan。

1.  private def withAction[U](name: String, qe: QueryExecution)(action:
    SparkPlan => U) = {
2.     try {
3.       qe.executedPlan.foreach { plan =>
4.         plan.resetMetrics()
5.       }
6.       val start = System.nanoTime()
7.       val result = SQLExecution.withNewExecutionId(sparkSession, qe) {
8.         action(qe.executedPlan)
9.       }
10.      val end = System.nanoTime()
11.      sparkSession.listenerManager.onSuccess(name, qe, end - start)
12.      result
13.    } catch {
14.      case e: Exception =>
15.        sparkSession.listenerManager.onFailure(name, qe, e)
16.        throw e
17.    }
18.  }

Spark 2.2.0版本的Dataset.scala的源碼從spark plan中獲取所有的數據。

1.    private def collectFromPlan(plan: SparkPlan): Array[T] = {
2.    plan.executeCollect().map(boundEnc.fromRow)
3.  }

collect方法中關鍵的一行代碼是queryExecution.executedPlan.executeCollect().map (boundEnc.fromRow),我們看一下executedPlan。executedPlan不用來初始化任何SparkPlan,僅用于執行。

QueryExecution.scala的源碼如下。

1.  class    QueryExecution(val      sparkSession:     SparkSession,     val  logical:
    LogicalPlan) {
2.  ......
3.  //executePlan不應該被用來初始化任何Spark Plan,executePlan只用于執行
4.    lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
5.  ......
6.       lazy val toRdd: RDD[InternalRow] = executedPlan.execute()
7.  ......

queryExecution.executedPlan.executeCollect()代碼中的executeCollect方法運行此查詢,將結果作為數組返回。executeCollect方法調用了byteArrayRdd.collect()方法。

SparkPlan .scala的executeCollect的源碼如下。

1.  def executeCollect(): Array[InternalRow] = {
2.    val byteArrayRdd = getByteArrayRdd()
3.
4.    val results = ArrayBuffer[InternalRow]()
5.    byteArrayRdd.collect().foreach { bytes =>
6.      decodeUnsafeRows(bytes).foreach(results.+=)
7.    }
8.    results.toArray
9.  }

byteArrayRdd.collect()方法調用RDD.scala的collect方法。collect方法最終通過sc.runJob提交Spark集群運行。

RDD.scala的collect方法的源碼如下。

1.   def collect(): Array[T] = withScope {
2.    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
3.    Array.concat(results: _*)
4.  }

回到QueryExecution.scala中,executedPlan.execute()是關鍵性的代碼。

1.  lazy val toRdd: RDD[InternalRow] = executedPlan.execute()

進入SparkPlan.scala的execute返回的查詢結果類型為RDD[InternalRow]。調用doExecute執行,SparkPlan應重寫doExecute進行具體實現。在execute方法中就生成了RDD[InternalRow]。execute方法的源碼如下。

1.  final def execute(): RDD[InternalRow] = executeQuery {
2.    doExecute()
3.  }

SparkPlan.scala的doExecute()抽象方法沒有具體實現,通過SparkPlan重寫具體實現。產生的查詢結果作為RDD[InternalRow]。

1.  protected def doExecute(): RDD[InternalRow]

InternalRow是通過語法樹生成的一些數據結構。其子類包括BaseGenericInternalRow、JoinedRow、Row、UnsafeRow。

InternalRow.scala的源碼如下。

1.  abstract class InternalRow extends SpecializedGetters with Serializable {
2.  ......
3.    def setBoolean(i: Int, value: Boolean): Unit = update(i, value)
4.    def setByte(i: Int, value: Byte): Unit = update(i, value)
5.    def setShort(i: Int, value: Short): Unit = update(i, value)
6.    def setInt(i: Int, value: Int): Unit = update(i, value)
7.    def setLong(i: Int, value: Long): Unit = update(i, value)
8.    def setFloat(i: Int, value: Float): Unit = update(i, value)
9.    def setDouble(i: Int, value: Double): Unit = update(i, value)
10. ........

DataSet的代碼轉化成為RDD的內部流程如下。

Parse SQL(DataSet)→Analyze Logical Plan→Optimize Logical Plan→Generate Physical Plan→Prepareed Spark Plan→Execute SQL→Generate RDD

基于DataSet的代碼一步步轉化成為RDD:最終調用execute()生成RDD。

主站蜘蛛池模板: 无棣县| 和林格尔县| 呼和浩特市| 西安市| 东台市| 丰台区| 怀柔区| 伊吾县| 德格县| 福安市| 肃北| 东莞市| 呼玛县| 龙里县| 莱西市| 景谷| 宜宾市| 连江县| 墨竹工卡县| 中西区| 永寿县| 建宁县| 吉木萨尔县| 德清县| 康马县| 瑞昌市| 吉隆县| 长宁区| 广东省| 个旧市| 阿拉善左旗| 九江县| 衡阳市| 信丰县| 大邑县| 静海县| 康平县| 东山县| 隆安县| 东兰县| 兴仁县|