- Spark大數據商業實戰三部曲:內核解密|商業案例|性能調優
- 王家林
- 1449字
- 2019-12-12 17:30:07
12.2 通過RDD實現電影流行度分析
本節統計所有電影中平均得分最高(口碑最好)的電影及觀看人數最多(流行度最高)的電影。
所有電影中平均得分最高的Top10電影實現思路:如果想算總的評分,一般肯定需要reduceByKey操作或者aggregateByKey操作。
評級文件ratings.dat的格式描述如下。
1. UserID::MovieID::Rating::Timestamp 2. 用戶ID、電影ID、評分數據、時間戳
第一步:把數據變成Key-Value,大家想一下在這里什么是Key,什么是Value?把MovieID設置為Key,把Rating設置為Value。具體實現過程:將ratingsRDD中的每行數據按"::"分隔符進行分割,然后map格式化為(用戶ID,電影ID,評分)元組;接下來對ratings進行map轉換,取ratings元組的第2個元素"電影ID"作為Key,(第3個元素即評分,1)元組值作為Value,格式化成為Key-Value的方式,即(電影ID,(評分,1))。
第二步:通過reduceByKey操作或者aggregateByKey實現聚合,然后呢?具體實現過程:對兩個具有相同Key,而Value不同的元組,如(電影ID,(x.評分,x.1)),(電影ID,(y.評分,y.1)),我們使用reduceByKey算子對Value值進行匯聚轉換,計算得出(x的評分+y的評分,x的計數+y的計數),轉換以后Key是電影ID,Value是(總的評分,總的點評人數),格式化為Key-Value,即(電影ID,(總評分,總點評人數))。
第三步:排序,如何做?進行Key和Value的交換。上一步reduceByKey算子執行完畢,接下來進行map轉換操作,交換Key-Value值,并且計算出電影平均分=總評分/總點評人數,即將(電影ID,(總評分,總點評人數))轉換成((總評分/總點評人數),電影ID),然后使用sortByKey(false)算子按電影平均分降序排列,再通過take(10)算子獲取所有電影中平均得分最高的Top10,打印輸出。
所有電影中電影粉絲或者觀看人數最多的電影實現思路:
第一步:把數據變成Key-Value:取ratings元組的第2個元素電影ID作為Key,計數1次作為Value,格式化成為Key-Value,即(電影ID,1)。
第二步:通過reduceByKey操作實現聚合:對相同Key的Value值進行累加。生成Key-Value,即(電影ID,總次數)。
第三步:排序,進行Key和Value的交換。上一步reduceByKey算子執行完畢,然后進行map轉換操作,交換Key-Value值,即將(電影ID,總次數)轉換成(總次數,電影ID),然后使用sortByKey(false)算子按總次數降序排列。
第四步:再次進行Key和Value的交換,打印輸出。我們使用map轉換函數將(總次數,電影ID)進行交換,轉換為(電影ID,總次數),再通過take(10)算子獲取所有電影中粉絲或者觀看人數最多的電影Top10,打印輸出。
大數據電影點評系統中,電影流行度分析須注意以下事項。
(1)轉換數據格式的時候一般都會使用map操作,有時轉換可能特別復雜,需要在map方法中調用第三方jar或者so庫。
(2)RDD從文件中提取的數據成員默認都是String類型,需要根據實際需要進行轉換類型。
(3)RDD如果要重復使用,一般都會進行Cache操作。
(4)重磅注意事項,RDD的Cache操作之后不能直接再跟其他的算子操作,否則在一些版本中Cache不生效。
電影點評系統用戶行為分析,統計所有電影中平均得分最高(口碑最好)的電影以及電影粉絲或者觀看人數最多的電影的代碼如下。
1. println("所有電影中平均得分最高(口碑最好)的電影:") 2. val ratings= ratingsRDD.map(_.split("::")).map(x => (x(0), x(1), x(2))).cache() //格式化出電影ID和評分 3. ratings.map(x => (x._2, (x._3.toDouble, 1)))//格式化為Key-Value的方式 4. .reduceByKey((x, y) => (x._1 + y._1,x._2 + y._2)) //對Value進行reduce操作,分別得出每部電影的總的評分和總的點評人數 5. .map(x => (x._2._1.toDouble / x._2._2, x._1)) //求出電影平均分 6. .sortByKey(false) //降序排列 7. .take(10) //取Top10 8. .foreach(println) //打印到控制臺 9. 10. /** 11. *上面的功能計算的是口碑最好的電影,接下來分析粉絲或者觀看人數最多的電影 12. */ 13. println("所有電影中粉絲或者觀看人數最多的電影:") 14. ratings.map(x => (x._2, 1)).reduceByKey(_+_).map(x => (x._2, x._1)). sortByKey(false) 15. .map(x => (x._2, x._1)).take(10).foreach(println)
在IDEA中運行代碼,結果如下。
1. 所有電影中平均得分最高(口碑最好)的電影: 2. [Stage 17:=============================> (4 + 4) / 8](5.0,33264) 3. (5.0,64275) 4. (5.0,42783) 5. (5.0,53355) 6. (5.0,51209) 7. (4.75,26073) 8. (4.75,26048) 9. (4.75,65001) 10. (4.75,5194) 11. (4.75,4454) 12. 所有電影中粉絲或者觀看人數最多的電影: 13. (296,34864) 14. (356,34457) 15. (593,33668) 16. (480,32631) 17. (318,31126) 18. (110,29154) 19. (457,28951) 20. (589,28948) 21. (260,28566) 22. (150,27035)