- Spark大數據商業實戰三部曲:內核解密|商業案例|性能調優
- 王家林
- 2323字
- 2019-12-12 17:29:51
1.2 通過DataFrame和DataSet實戰電影點評系統
DataFrameAPI是從Spark 1.3開始就有的,它是一種以RDD為基礎的分布式無類型數據集,它的出現大幅度降低了普通Spark用戶的學習門檻。
DataFrame類似于傳統數據庫中的二維表格。DataFrame與RDD的主要區別在于,前者帶有schema元信息,即DataFrame表示的二維表數據集的每一列都帶有名稱和類型。這使得Spark SQL得以解析到具體數據的結構信息,從而對DataFrame中的數據源以及對DataFrame的操作進行了非常有效的優化,從而大幅提升了運行效率。
DataSetAPI是從1.6版本提出的,在Spark 2.2的時候,DataSet和DataFrame趨于穩定,可以投入生產環境使用。與DataFrame不同的是,DataSet是強類型的,而DataFrame實際上就是DataSet[Row](也就是Java中的DataSet<Row>)。
DataSet是Lazy級別的,Transformation級別的算子作用于DataSet會得到一個新的DataSet。當Action算子被調用時,Spark的查詢優化器會優化Transformation算子形成的邏輯計劃,并生成一個物理計劃,該物理計劃可以通過并行和分布式的方式來執行。
反觀RDD,由于無從得知其中數據元素的具體內部結構,故很難被Spark本身自行優化,對于新手用戶很不友好,但是,DataSet底層是基于RDD的,所以最終的優化盡頭還是對RDD的優化,這就意味著優化引擎不能自動優化的地方,用戶在RDD上可能有機會進行手動優化。
1.2.1 通過DataFrame實戰電影點評系統案例
現在我們通過實現幾個功能來了解DataFrame的具體用法。先來看第一個功能:通過DataFrame實現某部電影觀看者中男性和女性不同年齡分別有多少人。
1. println("功能一:通過DataFrame實現某部電影觀看者中男性和女性不同年齡人數") 2. //首先把Users的數據格式化,即在RDD的基礎上增加數據的元數據信息 3. val schemaForUsers = StructType( 4. "UserID::Gender::Age::OccupationID::Zip-code".split("::") 5. .map(column => StructField(column, StringType, true))) 6. //然后把我們的每一條數據變成以Row為單位的數據 7. val usersRDDRows = usersRDD 8. .map(_.split("::")) 9. .map(line => 10. Row(line(0).trim,line(1).trim,line(2).trim,line(3).trim,line(4).trim)) 11. //使用SparkSession的createDataFrame方法,結合Row和StructType的元數據信息 //基于RDD創建DataFrame,這時RDD就有了元數據信息的描述 12. val usersDataFrame = spark.createDataFrame(usersRDDRows, schemaForUsers) 13. //也可以對StructType調用add方法來對不同的StructField賦予不同的類型 14. val schemaforratings = StructType("UserID::MovieID".split("::"). 15. map(column => StructField(column, StringType, true))) 16. .add("Rating", DoubleType, true) 17. .add("Timestamp",StringType, true) 18. 19. val ratingsRDDRows = ratingsRDD 20. .map(_.split("::")) 21. .map(line => 22. Row(line(0).trim,line(1). trim,line(2).trim.toDouble,line(3).trim)) 23. val ratingsDataFrame = spark.createDataFrame(ratingsRDDRows, schemaforratings) 24. //接著構建movies的DataFrame 25. val schemaformovies = StructType("MovieID::Title::Genres".split("::") 26. .map(column => StructField(column, StringType, true))) 27. val moviesRDDRows = moviesRDD 28. .map(_.split("::")) 29. .map(line => Row(line(0).trim,line(1).trim,line(2).trim)) 30. val moviesDataFrame = spark.createDataFrame(moviesRDDRows, schemaformovies) 31. 32. //這里能夠直接通過列名MovieID為1193過濾出這部電影,這些列名都是在上面指定的 33. ratingsDataFrame.filter(s" MovieID = 1193") 34. //Join的時候直接指定基于UserID進行Join,這相對于原生的RDD操作而言更加方便快捷 35. .join(usersDataFrame, "UserID") 36. //直接通過元數據信息中的Gender和Age進行數據的篩選 37. .select("Gender", "Age") 38. //直接通過元數據信息中的Gender和Age進行數據的groupBy操作 39. .groupBy("Gender", "Age") 40. //基于groupBy分組信息進行count統計操作,并顯示出分組統計后的前10條信息 41. .count().show(10)
最終打印結果如圖1-6所示,類似一張普通的數據庫表。

圖1-6 電影觀看者中男性和女性人數
上面案例中的代碼無論是從思路上,還是從結構上都和SQL語句十分類似,下面通過寫SQL語句的方式來實現上面的案例。
1. println("功能二:用LocalTempView實現某部電影觀看者中不同性別不同年齡分別有多少 人?") 2. //既然使用SQL語句,那么表肯定是要有的,所以需要先把DataFrame注冊為臨時表 3. ratingsDataFrame.createTempView("ratings") 4. usersDataFrame.createTempView("users") 5. //然后寫SQL語句,直接使用SparkSession的sql方法執行SQL語句即可 6. val sql_local = "SELECT Gender, Age, count(*) from users u join 7. ratings as r on u.UserID = r.UserID where MovieID = 1193 group by Gender, Age" 8. spark.sql(sql_local).show(10)
這樣我們就可以得到與上面案例相同的結果,這對寫SQL比較多的用戶是十分友好的。但是有一個問題需要注意,這里調用createTempView創建的臨時表是會話級別的,會話結束時這個表也會消失。那么,怎么創建一個Application級別的臨時表呢?可以使用createGlobalTempView來創建臨時表,但是這樣就要在寫SQL語句時在表名前面加上global_temp,例如:
1. ratingsDataFrame.createGlobalTempView("ratings") 2. usersDataFrame.createGlobalTempView("users") 3. 4. val sql = "SELECT Gender, Age, count(*) from global_temp.users u join global_temp.ratings as r on u.UserID = r.UserID where MovieID = 1193 group by Gender, Age" 5. spark.sql(sql).show(10)
第一個DataFrame案例實現了簡單的類似SQL語句的功能,但這是遠遠不夠的,我們要引入一個隱式轉換來實現復雜的功能:
1. import spark.sqlContext.implicits._ 2. ratingsDataFrame.select("MovieID", "Rating") 3. .groupBy("MovieID").avg("Rating") 4. //接著我們可以使用“$”符號把引號里的字符串轉換成列來實現相對復雜的功能,例如,下面 //我們把avg(Rating)作為排序的字段降序排列 5. .orderBy($"avg(Rating)".desc).show(10)
從圖1-7的結果可以看到,求平均值的那一列列名和在SQL語句里使用函數時的列名一樣變成了avg(Rating),程序中的orderBy里傳入的列名要和這個列名一致,否則會報錯,提示找不到列。

圖1-7 電影系統SQL運行結果
有時我們也可能會在使用DataFrame的時候在中間某一步轉換到RDD里操作,以便實現更加復雜的邏輯。下面來看一下DataFrame和RDD的混合編程。
1. ratingsDataFrame.select("MovieID", "Rating") 2. .groupBy("MovieID").avg("Rating") 3. //這里直接使用DataFrame的rdd方法轉到RDD里操作 4. .rdd.map(row =>(row(1),(row(0), row(1)))) 5. .sortBy(_._1.toString.toDouble, false) 6. .map(tuple => tuple._2) 7. .collect.take(10).foreach(println)
1.2.2 通過DataSet實戰電影點評系統案例
前面提到的DataFrame其實就是DataSet[Row],所以只要學會了DataFrame的使用,就可以快速接入DataSet,只不過在創建DataSet的時候要注意與創建DataFrame的方式略有不同。DataSet可以由DataFrame轉換而來,只需要用yourDataFrame.as[yourClass]即可得到封裝了yourClass類型的DataSet,之后就可以像操作DataFrame一樣操作DataSet了。接下來我們講一下如何直接創建DataSet,因為DataSet是強類型的,封裝的是具體的類(DataFrame其實封裝了Row類型),而類本身可以視作帶有Schema的,所以只需要把數據封裝進具體的類,然后直接創建DataSet即可。
首先引入一個隱式轉換,并創建幾個caseClass用來封裝數據。
1. import spark.implicits._ 2. case class User(UserID:String, Gender:String, Age:String, OccupationID: String, Zip_Code:String) 3. case class Rating(UserID:String, MovieID:String, Rating:Double, Timestamp:String) 4. 然后把數據封裝進這些Class: 5. val usersForDSRDD = usersRDD.map(_.split("::")).map(line => 6. User(line(0).trim,line(1).trim,line(2).trim,line(3).trim,line(4).trim)) 7. 最后直接創建DataSet: 8. val usersDataSet = spark.createDataset[User](usersForDSRDD) 9. usersDataSet.show(10) 10.
電影系統運行結果如圖1-8所示,列名為User類的屬性名。下面使用同樣的方法創建ratingsDataSet并實現一個案例:找出觀看某部電影的不同性別不同年齡的人數。
1. val ratingsForDSRDD = ratingsRDD.map(_.split("::")).map(line => 2. Rating(line(0).trim,line(1).trim,line(2).trim.toDouble,line(3).trim)) 3. val ratingsDataSet = spark.createDataset(ratingsForDSRDD) 4. //下面的實現代碼和使用DataFrame方法幾乎完全一樣(把DataFrame換成DataSet即可) 5. ratingsDataSet.filter(s" MovieID = 1193").join(usersDataSet, "UserID") 6. .select("Gender", "Age").groupBy("Gender", "Age").count() 7. .orderBy($"Gender".desc,$"Age").show()
觀看電影性別、年齡統計結果如圖1-9所示。

圖1-8 電影系統運行結果

圖1-9 觀看電影性別、年齡統計結果
當然,也可以把DataFrame和DataSet混著用(這樣做會導致代碼混亂,故不建議這樣做),得到的結果完全一樣。
最后根據源碼,有幾點需要補充:
RDD的cache方法等于MEMORY_ONLY級別的persist,而DataSet的cache方法等于MEMORY_AND_DISK級別的persist,因為重新計算的代價非常昂貴。如果想使用其他級別的緩存,可以使用persist并傳入相應的級別。
RDD.scala源碼:
1. /** 2. *使用默認的存儲級別持久化RDD (`MEMORY_ONLY`). 3. */ 4. def cache(): this.type = persist()
Dataset.scala源碼:
1. /** 2. *使用默認的存儲級別持久化DataSet (`MEMORY_AND_DISK`). 3. * 4. * @group basic 5. * @since 1.6.0 6. */ 7. def cache(): this.type = persist()
基于DataSet的計算會像SQL一樣被Catalyst引擎解析生成執行查詢計劃,然后執行。我們可以使用explain方法來查看執行計劃。