官术网_书友最值得收藏!

1.1 通過RDD實戰電影點評系統入門及源碼閱讀

日常的數據來源有很多渠道,如網絡爬蟲、網頁埋點、系統日志等。下面的案例中使用的是用戶觀看電影和點評電影的行為數據,數據來源于網絡上的公開數據,共有3個數據文件:uers.dat、ratings.dat和movies.dat。

其中,uers.dat的格式如下:

1.  UserID::Gender::Age::Occupation::Zip-code

這個文件里共有6040個用戶的信息,每行中用“::”隔開的詳細信息包括ID、性別(F、M分別表示女性、男性)、年齡(使用7個年齡段標記)、職業和郵編。

ratings.dat的格式如下:

1.  UserID::MovieID::Rating::Timestamp

這個文件記錄的是評分信息,即用戶ID、電影ID、評分(滿分是5分)和時間戳。

movies.dat的格式如下:

1.  MovieID::Title::Genres

這個文件記錄的是電影信息,即電影ID、電影名稱和電影類型。

1.1.1 Spark核心概念圖解

進入到案例實戰前,首先來看幾個至關重要的概念,這些概念承載著Spark集群運轉和程序運行的重要使命。Spark運行架構圖如圖1-1所示。

Master(圖1-1中的Cluster Manager):就像Hadoop有NameNode和DataNode一樣,Spark有Master和Worker。Master是集群的領導者,負責管理集群資源,接收Client提交的作業,以及向Worker發送命令。

Worker(圖1-1中的Worker Node):集群中的Worker,執行Master發送的指令,來具體分配資源,并在這些資源中執行任務。

Driver:一個Spark作業運行時會啟動一個Driver進程,也是作業的主進程,負責作業的解析、生成Stage,并調度Task到Executor上。

Executor:真正執行作業的地方。Executor分布在集群中的Worker上,每個Executor接收Driver的命令來加載和運行Task,一個Executor可以執行一到多個Task。

圖1-1 Spark運行架構圖

SparkContext:是程序運行調度的核心,由高層調度器DAGScheduler劃分程序的每個階段,底層調度器TaskScheduler劃分每個階段的具體任務。SchedulerBackend管理整個集群中為正在運行的程序分配的計算資源Executor。

DAGScheduler:負責高層調度,劃分stage并生成程序運行的有向無環圖。

TaskScheduler:負責具體stage內部的底層調度,具體task的調度、容錯等。

Job:(正在執行的叫ActiveJob)是Top-level的工作單位,每個Action算子都會觸發一次Job,一個Job可能包含一個或多個Stage。

Stage:是用來計算中間結果的Tasksets。Tasksets中的Task邏輯對于同一個RDD內的不同partition都一樣。Stage在Shuffle的地方產生,此時下一個Stage要用到上一個Stage的全部數據,所以要等到上一個Stage全部執行完才能開始。Stage有兩種:ShuffleMapStage和ResultStage,除了最后一個Stage是ResultStage外,其他Stage都是ShuffleMapStage。ShuffleMapStage會產生中間結果,以文件的方式保存在集群里,Stage經常被不同的Job共享,前提是這些Job重用了同一個RDD。

Task:任務執行的工作單位,每個Task會被發送到一個節點上,每個Task對應RDD的一個partition。

RDD:是不可變的、Lazy級別的、粗粒度的(數據集級別的而不是單個數據級別的)數據集合,包含了一個或多個數據分片,即partition。

另外,Spark程序中有兩種級別的算子:Transformation和Action。Transformation算子會由DAGScheduler劃分到pipeline中,是Lazy級別的不會觸發任務的執行;Action算子會觸發Job來執行pipeline中的運算。

介紹完上面的關鍵概念,下面開始進入到程序編寫階段。

首先寫好Spark程序的固定框架,以便于在處理和分析數據的時候專注于業務邏輯本身。

創建一個Scala的object類,在main方法中配置SparkConf和SparkContext,這里指定程序在本地運行,并且把程序名字設置為“RDD_Movie_Users_Analyzer”。

RDD_Movie_Users_Analyzer代碼如下。

1.   val conf = new SparkConf().setMaster("local[*]")
2.  .setAppName("RDD_Movie_Users_Analyzer")
3.   /**
    *Spark 2.0    引入 SparkSession    封裝了  SparkContext    和 SQLContext,并且會在
    *builder的getOrCreate方法中判斷是否有符合要求的SparkSession存在,有則使用,
    *沒有則進行創建
    */
4.  val spark = SparkSession.builder.config(conf).getOrCreate()
5.  //獲取SparkSession的SparkContext
6.  val sc = spark.sparkContext
7.  //把Spark程序運行時的日志設置為warn級別,以方便查看運行結果
8.  sc.setLogLevel("warn")
9.  //把用到的數據加載進來轉換為RDD,此時使用sc.textFile并不會讀取文件,而是標記了有
    //這個操作,遇到Action級別算子時才會真正去讀取文件
10. val usersRDD = sc.textFile(dataPath + "users.dat")
11. val moviesRDD = sc.textFile(dataPath + "movies.dat")
12. val ratingsRDD = sc.textFile(dataPath + "ratings.dat")
13. /**具體數據處理的業務邏輯*/
14. //最后關閉SparkSession
15. spark.stop

1.1.2 通過RDD實戰電影點評系統案例

首先我們來寫一個案例計算,并打印出所有電影中評分最高的前10個電影名和平均評分。

第一步:從ratingsRDD中取出MovieID和rating,從moviesRDD中取出MovieID和Name,如果后面的代碼重復使用這些數據,則可以把它們緩存起來。首先把使用map算子上面的RDD中的每一個元素(即文件中的每一行)以“::”為分隔符進行拆分,然后再使用map算子從拆分后得到的數組中取出需要用到的元素,并把得到的RDD緩存起來。

1.  println("所有電影中平均得分最高(口碑最好)的電影:")
2.  val movieInfo = moviesRDD.map(_.split("::")).map(x => (x(0), x(1))).cache()
3.  val ratings = ratingsRDD.map(_.split("::"))
4.  .map(x => (x(0), x(1), x(2))).cache()

第二步:從ratings的數據中使用map算子獲取到形如(movieID,(rating,1))格式的RDD,然后使用reduceByKey把每個電影的總評分以及點評人數算出來。

1.  val moviesAndRatings = ratings.map(x => (x._2, (x._3.toDouble, 1)))
2.        .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))

此時得到的RDD格式為(movieID,(Sum(ratings),Count(ratings)))。

第三步:把每個電影的Sum(ratings)和Count(ratings)相除,得到包含了電影ID和平均評分的RDD:

1.  val avgRatings= moviesAndRatings.map(x => (x._1,x._2._1.toDouble / x._2._2))

第四步:把avgRatings與movieInfo通過關鍵字(key)連接到一起,得到形如(movieID, (MovieName,AvgRating))的RDD,然后格式化為(AvgRating,MovieName),并按照key(也就是平均評分)降序排列,最終取出前10個并打印出來。

1.  avgRatings.join(movieInfo).map(item => (item._2._1,item._2._2))
2.        .sortByKey(false).take(10)
3.        .foreach(record => println(record._2+"評分為:"+record._1))

評分最高電影運行結果如圖1-2所示。

圖1-2 評分最高電影運行結果

接下來我們來看另外一個功能的實現:分析最受男性喜愛的電影Top10和最受女性喜愛的電影Top10。

首先來分析一下:單從ratings中無法計算出最受男性或者女性喜愛的電影Top10,因為該RDD中沒有Gender信息,如果需要使用Gender信息進行Gender的分類,此時一定需要聚合。當然,我們力求聚合使用的是mapjoin(分布式計算的一大痛點是數據傾斜,map端的join一定不會數據傾斜),這里是否可使用mapjoin?不可以,因為map端的join是使用broadcast把相對小得多的變量廣播出去,這樣可以減少一次shuffle,這里,用戶的數據非常多,所以要使用正常的join。

1.   Val usersGender = usersRDD.map(_.split("::")).map(x => (x(0), x(1)))
2.  val genderRatings = ratings.map(x => (x._1, (x._1, x._2, x._3)))
3.  .join(usersGender).cache()
4.  genderRatings.take(10).foreach(println)

使用join連接ratings和users之后,對分別過濾出男性和女性的記錄進行處理:

1.  val maleFilteredRatings = genderRatings
2.  .filter(x => x._2._2.equals("M")).map(x => x._2._1)
3.  val femaleFilteredRatings = genderRatings
4.  .filter(x => x._2._2.equals("F")).map(x => x._2._1)

接下來對兩個RDD進行處理,處理邏輯和上面的案例相同,最終打印出來的結果分別如圖1-3和圖1-4所示:

所有電影中最受男性喜愛的電影Top10業務代碼如下。所有電影中最受女性喜愛的電影Top10業務代碼如下。

1.   println("所有電影中最受男性喜愛的電影Top10:")
2.  maleFilteredRatings.map(x => (x._2, (x._3.toDouble, 1)))
3.    .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
4.  .map(x => (x._1,x._2._1.toDouble / x._2._2))
5.  .join(movieInfo)
6.  .map(item => (item._2._1,item._2._2))
7.  .sortByKey(false) .take(10)
8.  .foreach(record => println(record._2+"評分為:"+record._1))

圖1-3 最受男性喜愛的電影運行結果

1.   println("所有電影中最受女性喜愛的電影Top10:")
2.  femaleFilteredRatings.map(x => (x._2, (x._3.toDouble, 1)))
3.    .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
4.  .map(x => (x._1,x._2._1.toDouble / x._2._2)) .join(movieInfo)
5.  .map(item => (item._2._1,item._2._2)).sortByKey(false) .take(10)
6.  .foreach(record => println(record._2+"評分為:"+record._1))

圖1-4 最受女性喜愛的電影運行結果

在現實業務場景中,二次排序非常重要,并且經常遇到。下面來模擬一下這些場景,實現對電影評分數據進行二次排序,以Timestamp和Rating兩個維度降序排列,值得一提的是,Java版本的二次排序代碼非常煩瑣,而使用Scala實現就會很簡捷,首先我們需要一個繼承自Ordered和Serializable的類。

1.   class SecondarySortKey(val first: Double, val second: Double)
2.  extends Ordered[SecondarySortKey] with Serializable {
3.  //在這個類中重寫compare方法
4.  override def compare(other: SecondarySortKey): Int = {
5.  //既然是二次排序,那么首先要判斷第一個排序字段是否相等,如果不相等,就直接排序
6.  if (this.first - other.first != 0) {
7.        (this.first - other.first).toInt
8.      } else {
9.       //如果第一個字段相等,則比較第二個字段,若想實現多次排序,也可以按照這個模式繼
         //續比較下去
10.       if (this.second - other.second > 0) {
11.         Math.ceil(this.second - other.second).toInt
12.       } else if (this.second - other.second < 0) {
13.         Math.floor(this.second - other.second).toInt
14.       } else {
15.         (this.second - other.second).toInt
16.       }
17.     }
18.   }
19. }

然后再把RDD的每條記錄里想要排序的字段封裝到上面定義的類中作為key,把該條記錄整體作為value。

1.    println("對電影評分數據以Timestamp和Rating兩個維度進行二次降序排列:")
2.  val pairWithSortkey = ratingsRDD.map(line => {
3.        val splited = line.split("::")
4.        (new SecondarySortKey(splited(3).toDouble, splited(2).toDouble), line)
5.      })
6.  //直接調用sortByKey,此時會按照之前實現的compare方法排序
7.  val sorted = pairWithSortkey.sortByKey(false)
8.
9.  val sortedResult = sorted.map(sortedline => sortedline._2)
10. sortedResult.take(10).foreach(println)

取出排序后的RDD的value,此時這些記錄已經是按照時間戳和評分排好序的,最終打印出的結果如圖1-5所示,從圖中可以看到已經按照timestamp和評分降序排列了。

圖1-5 電影系統二次排序運行結果

主站蜘蛛池模板: 民县| 洞口县| 西城区| 武隆县| 东海县| 榕江县| 遂溪县| 江油市| 呼伦贝尔市| 贵阳市| 临沭县| 柘城县| 元阳县| 阿图什市| 金门县| 县级市| 洛南县| 衡南县| 绵阳市| 巩留县| 郸城县| 区。| 临漳县| 泉州市| 濮阳县| 枣阳市| 陕西省| 安多县| 阳城县| 临夏市| 卫辉市| 呼图壁县| 湖州市| 申扎县| 汽车| 绥中县| 岫岩| 乐都县| 枝江市| 榆社县| 漳平市|