- Spark大數據商業實戰三部曲:內核解密|商業案例|性能調優
- 王家林
- 974字
- 2019-12-12 17:29:55
3.9 基于DataSet的代碼到底是如何一步步轉化成為RDD的
基于DataSet的代碼轉換為RDD之前需要一個Action的操作,基于Spark中的新解析引擎Catalyst進行優化,Spark中的Catalyst不僅限于SQL的優化,Spark的五大子框架(Spark Cores、Spark SQL、Spark Streaming、Spark GraphX、Spark Mlib)將來都會基于Catalyst基礎之上。
Dataset.scala的collect方法的源碼如下。
Spark 2.1.1版本的Dataset.scala的源碼如下。
1. def collect(): Array[T] = collect(needCallback = true)
Spark 2.2.0版本Dataset.scala的源碼與Spark 2.1.1版本相比具有如下特點:將Dataset的action包裹起來,這樣可跟蹤QueryExecution和時間成本,然后匯報給用戶注冊的回調函數。
1. def collect(): Array[T] = withAction("collect", queryExecution) (collectFromPlan)
進入collect(needCallback:true)的方法如下。
Spark 2.1.1版本的Dataset.scala的源碼如下。
1. private def collect(needCallback: Boolean): Array[T] = { 2. def execute(): Array[T] = withNewExecutionId { 3. queryExecution.executedPlan.executeCollect().map(boundEnc.fromRow) 4. } 5. 6. if (needCallback) { 7. withCallback("collect", toDF())(_ => execute()) 8. } else { 9. execute() 10. } 11. }
Spark 2.2.0版本的Dataset.scala的源碼與Spark 2.1.1版本相比具有如下特點。
調用關鍵的代碼SQLExecution.withNewExecutionId(sparkSession,qe){action(qe. executedPlan)}得到計算結果。
action(qe.executedPlan)是collect方法中傳入的函數collectFromPlan,在函數collectFromPlan中傳入參數qe.executedPlan。
1. private def withAction[U](name: String, qe: QueryExecution)(action: SparkPlan => U) = { 2. try { 3. qe.executedPlan.foreach { plan => 4. plan.resetMetrics() 5. } 6. val start = System.nanoTime() 7. val result = SQLExecution.withNewExecutionId(sparkSession, qe) { 8. action(qe.executedPlan) 9. } 10. val end = System.nanoTime() 11. sparkSession.listenerManager.onSuccess(name, qe, end - start) 12. result 13. } catch { 14. case e: Exception => 15. sparkSession.listenerManager.onFailure(name, qe, e) 16. throw e 17. } 18. }
Spark 2.2.0版本的Dataset.scala的源碼從spark plan中獲取所有的數據。
1. private def collectFromPlan(plan: SparkPlan): Array[T] = { 2. plan.executeCollect().map(boundEnc.fromRow) 3. }
collect方法中關鍵的一行代碼是queryExecution.executedPlan.executeCollect().map (boundEnc.fromRow),我們看一下executedPlan。executedPlan不用來初始化任何SparkPlan,僅用于執行。
QueryExecution.scala的源碼如下。
1. class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { 2. ...... 3. //executePlan不應該被用來初始化任何Spark Plan,executePlan只用于執行 4. lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan) 5. ...... 6. lazy val toRdd: RDD[InternalRow] = executedPlan.execute() 7. ......
queryExecution.executedPlan.executeCollect()代碼中的executeCollect方法運行此查詢,將結果作為數組返回。executeCollect方法調用了byteArrayRdd.collect()方法。
SparkPlan .scala的executeCollect的源碼如下。
1. def executeCollect(): Array[InternalRow] = { 2. val byteArrayRdd = getByteArrayRdd() 3. 4. val results = ArrayBuffer[InternalRow]() 5. byteArrayRdd.collect().foreach { bytes => 6. decodeUnsafeRows(bytes).foreach(results.+=) 7. } 8. results.toArray 9. }
byteArrayRdd.collect()方法調用RDD.scala的collect方法。collect方法最終通過sc.runJob提交Spark集群運行。
RDD.scala的collect方法的源碼如下。
1. def collect(): Array[T] = withScope { 2. val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) 3. Array.concat(results: _*) 4. }
回到QueryExecution.scala中,executedPlan.execute()是關鍵性的代碼。
1. lazy val toRdd: RDD[InternalRow] = executedPlan.execute()
進入SparkPlan.scala的execute返回的查詢結果類型為RDD[InternalRow]。調用doExecute執行,SparkPlan應重寫doExecute進行具體實現。在execute方法中就生成了RDD[InternalRow]。execute方法的源碼如下。
1. final def execute(): RDD[InternalRow] = executeQuery { 2. doExecute() 3. }
SparkPlan.scala的doExecute()抽象方法沒有具體實現,通過SparkPlan重寫具體實現。產生的查詢結果作為RDD[InternalRow]。
1. protected def doExecute(): RDD[InternalRow]
InternalRow是通過語法樹生成的一些數據結構。其子類包括BaseGenericInternalRow、JoinedRow、Row、UnsafeRow。
InternalRow.scala的源碼如下。
1. abstract class InternalRow extends SpecializedGetters with Serializable { 2. ...... 3. def setBoolean(i: Int, value: Boolean): Unit = update(i, value) 4. def setByte(i: Int, value: Byte): Unit = update(i, value) 5. def setShort(i: Int, value: Short): Unit = update(i, value) 6. def setInt(i: Int, value: Int): Unit = update(i, value) 7. def setLong(i: Int, value: Long): Unit = update(i, value) 8. def setFloat(i: Int, value: Float): Unit = update(i, value) 9. def setDouble(i: Int, value: Double): Unit = update(i, value) 10. ........
DataSet的代碼轉化成為RDD的內部流程如下。
Parse SQL(DataSet)→Analyze Logical Plan→Optimize Logical Plan→Generate Physical Plan→Prepareed Spark Plan→Execute SQL→Generate RDD
基于DataSet的代碼一步步轉化成為RDD:最終調用execute()生成RDD。