官术网_书友最值得收藏!

1.2 通過DataFrame和DataSet實戰電影點評系統

DataFrameAPI是從Spark 1.3開始就有的,它是一種以RDD為基礎的分布式無類型數據集,它的出現大幅度降低了普通Spark用戶的學習門檻。

DataFrame類似于傳統數據庫中的二維表格。DataFrame與RDD的主要區別在于,前者帶有schema元信息,即DataFrame表示的二維表數據集的每一列都帶有名稱和類型。這使得Spark SQL得以解析到具體數據的結構信息,從而對DataFrame中的數據源以及對DataFrame的操作進行了非常有效的優化,從而大幅提升了運行效率。

DataSetAPI是從1.6版本提出的,在Spark 2.2的時候,DataSet和DataFrame趨于穩定,可以投入生產環境使用。與DataFrame不同的是,DataSet是強類型的,而DataFrame實際上就是DataSet[Row](也就是Java中的DataSet<Row>)。

DataSet是Lazy級別的,Transformation級別的算子作用于DataSet會得到一個新的DataSet。當Action算子被調用時,Spark的查詢優化器會優化Transformation算子形成的邏輯計劃,并生成一個物理計劃,該物理計劃可以通過并行和分布式的方式來執行。

反觀RDD,由于無從得知其中數據元素的具體內部結構,故很難被Spark本身自行優化,對于新手用戶很不友好,但是,DataSet底層是基于RDD的,所以最終的優化盡頭還是對RDD的優化,這就意味著優化引擎不能自動優化的地方,用戶在RDD上可能有機會進行手動優化。

1.2.1 通過DataFrame實戰電影點評系統案例

現在我們通過實現幾個功能來了解DataFrame的具體用法。先來看第一個功能:通過DataFrame實現某部電影觀看者中男性和女性不同年齡分別有多少人。

1.   println("功能一:通過DataFrame實現某部電影觀看者中男性和女性不同年齡人數")
2.  //首先把Users的數據格式化,即在RDD的基礎上增加數據的元數據信息
3.  val schemaForUsers = StructType(
4.  "UserID::Gender::Age::OccupationID::Zip-code".split("::")
5.  .map(column => StructField(column, StringType, true)))
6.  //然后把我們的每一條數據變成以Row為單位的數據
7.  val usersRDDRows = usersRDD
8.  .map(_.split("::"))
9.  .map(line =>
10. Row(line(0).trim,line(1).trim,line(2).trim,line(3).trim,line(4).trim))
11. //使用SparkSession的createDataFrame方法,結合Row和StructType的元數據信息
    //基于RDD創建DataFrame,這時RDD就有了元數據信息的描述
12. val usersDataFrame = spark.createDataFrame(usersRDDRows, schemaForUsers)
13. //也可以對StructType調用add方法來對不同的StructField賦予不同的類型
14. val schemaforratings = StructType("UserID::MovieID".split("::").
15. map(column => StructField(column, StringType, true)))
16. .add("Rating", DoubleType, true)
17. .add("Timestamp",StringType, true)
18.
19. val ratingsRDDRows = ratingsRDD
20. .map(_.split("::"))
21. .map(line =>
22.  Row(line(0).trim,line(1). trim,line(2).trim.toDouble,line(3).trim))
23. val ratingsDataFrame = spark.createDataFrame(ratingsRDDRows,
    schemaforratings)
24. //接著構建movies的DataFrame
25. val schemaformovies = StructType("MovieID::Title::Genres".split("::")
26. .map(column => StructField(column, StringType, true)))
27. val moviesRDDRows = moviesRDD
28. .map(_.split("::"))
29. .map(line => Row(line(0).trim,line(1).trim,line(2).trim))
30. val moviesDataFrame = spark.createDataFrame(moviesRDDRows,
    schemaformovies)
31.
32. //這里能夠直接通過列名MovieID為1193過濾出這部電影,這些列名都是在上面指定的
33. ratingsDataFrame.filter(s" MovieID = 1193")
34. //Join的時候直接指定基于UserID進行Join,這相對于原生的RDD操作而言更加方便快捷
35. .join(usersDataFrame, "UserID")
36. //直接通過元數據信息中的Gender和Age進行數據的篩選
37. .select("Gender", "Age")
38. //直接通過元數據信息中的Gender和Age進行數據的groupBy操作
39. .groupBy("Gender", "Age")
40. //基于groupBy分組信息進行count統計操作,并顯示出分組統計后的前10條信息
41. .count().show(10)

最終打印結果如圖1-6所示,類似一張普通的數據庫表。

圖1-6 電影觀看者中男性和女性人數

上面案例中的代碼無論是從思路上,還是從結構上都和SQL語句十分類似,下面通過寫SQL語句的方式來實現上面的案例。

1.   println("功能二:用LocalTempView實現某部電影觀看者中不同性別不同年齡分別有多少
    人?")
2.  //既然使用SQL語句,那么表肯定是要有的,所以需要先把DataFrame注冊為臨時表
3.  ratingsDataFrame.createTempView("ratings")
4.  usersDataFrame.createTempView("users")
5.  //然后寫SQL語句,直接使用SparkSession的sql方法執行SQL語句即可
6.  val sql_local = "SELECT Gender, Age, count(*) from  users u join
7.  ratings as r on u.UserID = r.UserID where MovieID = 1193 group by Gender,
    Age"
8.  spark.sql(sql_local).show(10)

這樣我們就可以得到與上面案例相同的結果,這對寫SQL比較多的用戶是十分友好的。但是有一個問題需要注意,這里調用createTempView創建的臨時表是會話級別的,會話結束時這個表也會消失。那么,怎么創建一個Application級別的臨時表呢?可以使用createGlobalTempView來創建臨時表,但是這樣就要在寫SQL語句時在表名前面加上global_temp,例如:

1.   ratingsDataFrame.createGlobalTempView("ratings")
2.  usersDataFrame.createGlobalTempView("users")
3.
4.  val sql = "SELECT Gender, Age, count(*) from  global_temp.users u join
    global_temp.ratings as r on u.UserID = r.UserID where MovieID = 1193 group
    by Gender, Age"
5.  spark.sql(sql).show(10)

第一個DataFrame案例實現了簡單的類似SQL語句的功能,但這是遠遠不夠的,我們要引入一個隱式轉換來實現復雜的功能:

1.   import spark.sqlContext.implicits._
2.  ratingsDataFrame.select("MovieID", "Rating")
3.  .groupBy("MovieID").avg("Rating")
4.  //接著我們可以使用“$”符號把引號里的字符串轉換成列來實現相對復雜的功能,例如,下面
    //我們把avg(Rating)作為排序的字段降序排列
5.  .orderBy($"avg(Rating)".desc).show(10)

從圖1-7的結果可以看到,求平均值的那一列列名和在SQL語句里使用函數時的列名一樣變成了avg(Rating),程序中的orderBy里傳入的列名要和這個列名一致,否則會報錯,提示找不到列。

圖1-7 電影系統SQL運行結果

有時我們也可能會在使用DataFrame的時候在中間某一步轉換到RDD里操作,以便實現更加復雜的邏輯。下面來看一下DataFrame和RDD的混合編程。

1.  ratingsDataFrame.select("MovieID", "Rating")
2.  .groupBy("MovieID").avg("Rating")
3.  //這里直接使用DataFrame的rdd方法轉到RDD里操作
4.  .rdd.map(row =>(row(1),(row(0), row(1))))
5.  .sortBy(_._1.toString.toDouble, false)
6.  .map(tuple => tuple._2)
7.  .collect.take(10).foreach(println)

1.2.2 通過DataSet實戰電影點評系統案例

前面提到的DataFrame其實就是DataSet[Row],所以只要學會了DataFrame的使用,就可以快速接入DataSet,只不過在創建DataSet的時候要注意與創建DataFrame的方式略有不同。DataSet可以由DataFrame轉換而來,只需要用yourDataFrame.as[yourClass]即可得到封裝了yourClass類型的DataSet,之后就可以像操作DataFrame一樣操作DataSet了。接下來我們講一下如何直接創建DataSet,因為DataSet是強類型的,封裝的是具體的類(DataFrame其實封裝了Row類型),而類本身可以視作帶有Schema的,所以只需要把數據封裝進具體的類,然后直接創建DataSet即可。

首先引入一個隱式轉換,并創建幾個caseClass用來封裝數據。

1.   import spark.implicits._
2.  case class User(UserID:String, Gender:String, Age:String, OccupationID:
    String, Zip_Code:String)
3.  case    class     Rating(UserID:String,        MovieID:String,      Rating:Double,
    Timestamp:String)
4.  然后把數據封裝進這些Class:
5.  val usersForDSRDD = usersRDD.map(_.split("::")).map(line =>
6.       User(line(0).trim,line(1).trim,line(2).trim,line(3).trim,line(4).trim))
7.       最后直接創建DataSet:
8.  val usersDataSet = spark.createDataset[User](usersForDSRDD)
9.  usersDataSet.show(10)
10.

電影系統運行結果如圖1-8所示,列名為User類的屬性名。下面使用同樣的方法創建ratingsDataSet并實現一個案例:找出觀看某部電影的不同性別不同年齡的人數。

1.   val ratingsForDSRDD = ratingsRDD.map(_.split("::")).map(line =>
2.  Rating(line(0).trim,line(1).trim,line(2).trim.toDouble,line(3).trim))
3.  val ratingsDataSet = spark.createDataset(ratingsForDSRDD)
4.  //下面的實現代碼和使用DataFrame方法幾乎完全一樣(把DataFrame換成DataSet即可)
5.  ratingsDataSet.filter(s" MovieID = 1193").join(usersDataSet, "UserID")
6.        .select("Gender", "Age").groupBy("Gender", "Age").count()
7.       .orderBy($"Gender".desc,$"Age").show()

觀看電影性別、年齡統計結果如圖1-9所示。

圖1-8 電影系統運行結果

圖1-9 觀看電影性別、年齡統計結果

當然,也可以把DataFrame和DataSet混著用(這樣做會導致代碼混亂,故不建議這樣做),得到的結果完全一樣。

最后根據源碼,有幾點需要補充:

RDD的cache方法等于MEMORY_ONLY級別的persist,而DataSet的cache方法等于MEMORY_AND_DISK級別的persist,因為重新計算的代價非常昂貴。如果想使用其他級別的緩存,可以使用persist并傳入相應的級別。

RDD.scala源碼:

1.  /**
2.    *使用默認的存儲級別持久化RDD (`MEMORY_ONLY`).
3.    */
4.   def cache(): this.type = persist()

Dataset.scala源碼:

1.  /**
2.    *使用默認的存儲級別持久化DataSet (`MEMORY_AND_DISK`).
3.    *
4.    * @group basic
5.    * @since 1.6.0
6.    */
7.   def cache(): this.type = persist()

基于DataSet的計算會像SQL一樣被Catalyst引擎解析生成執行查詢計劃,然后執行。我們可以使用explain方法來查看執行計劃。

主站蜘蛛池模板: 平凉市| 将乐县| 深水埗区| 台北市| 长子县| 汕头市| 宁夏| 崇州市| 安阳县| 昭觉县| 延川县| 赤峰市| 张家川| 阿鲁科尔沁旗| 波密县| 昌宁县| 小金县| 大竹县| 周口市| 宜章县| 曲阜市| 龙海市| 葵青区| 湟中县| 贡觉县| 垣曲县| 西吉县| 宁德市| 邯郸县| 留坝县| 房产| 溆浦县| 丹巴县| 黄大仙区| 镇安县| 徐州市| 仙桃市| 阳西县| 章丘市| 景德镇市| 南木林县|