- Spark大數據商業實戰三部曲:內核解密|商業案例|性能調優
- 王家林
- 5476字
- 2019-12-12 17:30:06
12.1 通過RDD實現分析電影的用戶行為信息
在本節中,我們首先搭建IDEA的開發環境。電影點評系統基于IDEA開發環境進行開發,本節對大數據電影點評系統中電影數據格式和來源進行了說明,然后通過RDD方式實現分析電影的用戶行為信息的功能。
12.1.1 搭建IDEA開發環境
1.IntelliJ IDEA環境的安裝
如圖12-1所示,登錄IDEA的官網,打開http://www.jetbrains.com/idea/網站,單擊DOWNLOAD進行IDEA的下載。IDEA全稱IntelliJ IDEA,是Java語言開發的集成環境,具備智能代碼助手、代碼自動提示、重構、J2EE支持、Ant、JUnit、CVS整合、代碼審查等方面的功能,支持Maven、Gradle和STS,集成Git、SVN、Mercurial等,在Spark開發程序時通常使用IDEA。單擊DOWNLOAD下載,下載安裝包以后根據IDEA安裝提示一步步完成安裝。

圖12-1 IDEA的官網
IDEA在本地計算機上安裝完成以后,打開IDEA的默認顯示主題風格是Darcula的主題格式,也是眾多IDEA開發者喜歡的格式。但為了便于讀者閱讀,這里將IDEA的顯示主題風格調整為IntelliJ格式,單擊File→Settings→Appearance→Theme→IntelliJ,這樣書本紙質顯示更清晰,如圖12-2所示。

圖12-2 修改IDEA顯示主題格式
IDEA安裝完成,在Windows系統中完成Windows JDK的安裝與配置。安裝和配置完成以后,測試驗證JDK能否在設備上運行。選擇“開始”→“運行”命令,在運行窗口中輸入CMD命令,進入DOS環境,在命令行提示符中直接輸入java-version,按回車鍵,系統會顯示JDK的版本,說明JDK已經安裝成功,如下所示。
1. C:\Windows\System32>java -version 2. java version "1.8.0_121" 3. Java(TM) SE Runtime Environment (build 1.8.0_121-b13) 4. Java HotSpot(TM) 64-Bit Server VM (build 25.121-b13, mixed mode)
2.新建Maven工程(SparkApps工程),導入Spark 2.0相關JAR包及源碼
(1)在IDEA菜單欄中新建工程,單擊File→Project,如圖12-3所示。

圖12-3 新建工程
(2)在彈出的New Project對話框中選擇Maven方式,單擊Next按鈕,如圖12-4所示。

圖12-4 選擇Maven方式
(3)在彈出的對話框中輸入GroupId及ArtifactId,如圖12-5所示。

圖12-5 輸入GroupId及ArtifactId
(4)在彈出的對話框中輸入工程名及工程保存位置,單擊Finish按鈕完成,如圖12-6所示。

圖12-6 輸入工程名及工程保存位置
(5)在SparkApps工程中設置Maven配置參數。單擊File→Settings→Build,Execution, Deployment→Maven→User settings file及Local repository,輸入用戶配置文件及本地庫保存地址,如圖12-7所示。

圖12-7 輸入用戶配置文件及本地庫保存地址
其中,setting.xml代碼完整的配置內容如例12-1所示。
【例12-1】setting.xmll文件內容。
1. <?xml version="1.0" encoding="utf-8"?> 2. <settings xmlns="http://maven.apache.org/SETTINGS/1.0.0" 3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4. xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd"> 5. <localRepository>F:\SparkMaven2017\repository64bit</localRepository> 6. </settings>
(6)在SparkApps工程中單擊pom.xml,編輯pom.xml文件,如圖12-8所示。

圖12-8 編輯pom文件
pom代表“項目對象模型”。這是一個文件名為pom.xml的Maven項目的XML表示形式。在Maven系統中,一個項目除了代碼文件外,還包含配置文件,包括開發者需要遵循的規則、缺陷管理系統、組織和許可證、項目的url、項目的依賴,以及其他所有的項目相關因素。在Maven系統中,項目不需要包含代碼,只是一個pom.xml。pom.xml包括了所有的項目信息。
本書的案例基于Maven方式進行開發,項目中依賴的JAR包都從pom.xml中下載獲取,這里提供了一份完整的pom.xml文件,讀者可以根據pom.xml文件搭建開發環境,測試運行本書稿的各綜合案例。
pom.xml代碼完整的配置內容如例12-2所示。
【例12-2】pom.xml文件內容。
1. <?xml version="1.0" encoding="utf-8"?> 2. <project xmlns="http://maven.apache.org/POM/4.0.0" 3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven. apache.org/xsd/maven-4.0.0.xsd"> 5. <modelVersion>4.0.0</modelVersion> 6. <!-- 基礎配置 --> 7. <groupId>2017SparkCase100</groupId> 8. <artifactId>2017SparkCase100</artifactId> 9. <version>1.0-SNAPSHOT</version> 10. 11. <properties> 12. <scala.version>2.11.8</scala.version> 13. <spark.version>2.1.0</spark.version> 14. <jedis.version>2.8.2</jedis.version> 15. <fastjson.version>1.2.14</fastjson.version> 16. <jetty.version>9.2.5.v20141112</jetty.version> 17. <container.version>2.17</container.version> 18. <java.version>1.8</java.version> 19. </properties> 20. 21. <repositories> 22. <repository> 23. <id>scala-tools.org</id> 24. <name>Scala-Tools Maven2 Repository</name> 25. <url>http://scala-tools.org/repo-releases</url> 26. </repository> 27. </repositories> 28. 29. <pluginRepositories> 30. <pluginRepository> 31. <id>scala-tools.org</id> 32. <name>Scala-Tools Maven2 Repository</name> 33. <url>http://scala-tools.org/repo-releases</url> 34. </pluginRepository> 35. </pluginRepositories> 36. <!-- 依賴關系--> 37. <dependencies> 38. <!-- put javax.ws.rs as the first dependency, it is important!!! --> 39. <dependency> 40. <groupId>javax.ws.rs</groupId> 41. <artifactId>javax.ws.rs-api</artifactId> 42. <version>2.0</version> 43. </dependency> 44. 45. <dependency> 46. <groupId>org.scala-lang</groupId> 47. <artifactId>scala-library</artifactId> 48. <version>${scala.version}</version> 49. </dependency> 50. <dependency> 51. <groupId>org.scala-lang</groupId> 52. <artifactId>scala-compiler</artifactId> 53. <version>${scala.version}</version> 54. </dependency> 55. <dependency> 56. <groupId>org.scala-lang</groupId> 57. <artifactId>scala-reflect</artifactId> 58. <version>${scala.version}</version> 59. </dependency> 60. 61. <dependency> 62. <groupId>org.scala-lang</groupId> 63. <artifactId>scalap</artifactId> 64. <version>${scala.version}</version> 65. </dependency> 66. 67. <dependency> 68. <groupId>junit</groupId> 69. <artifactId>junit</artifactId> 70. <version>4.4</version> 71. <scope>test</scope> 72. </dependency> 73. <dependency> 74. <groupId>org.specs</groupId> 75. <artifactId>specs</artifactId> 76. <version>1.2.5</version> 77. <scope>test</scope> 78. </dependency> 79. <dependency> 80. <groupId>org.apache.spark</groupId> 81. <artifactId>spark-core_2.11</artifactId> 82. <version>${spark.version}</version> 83. </dependency> 84. <dependency> 85. <groupId>org.apache.spark</groupId> 86. <artifactId>spark-launcher_2.11</artifactId> 87. <version>2.1.0</version> 88. </dependency> 89. <dependency> 90. <groupId>org.apache.spark</groupId> 91. <artifactId>spark-network-shuffle_2.11</artifactId> 92. <version>2.1.0</version> 93. </dependency> 94. <dependency> 95. <groupId>org.apache.spark</groupId> 96. <artifactId>spark-sql_2.11</artifactId> 97. <version>${spark.version}</version> 98. </dependency> 99. <dependency> 100. <groupId>org.apache.spark</groupId> 101. <artifactId>spark-hive_2.11</artifactId> 102. <version>2.1.0</version> 103. </dependency> 104. <dependency> 105. <groupId>org.apache.spark</groupId> 106. <artifactId>spark-catalyst_2.11</artifactId> 107. <version>2.1.0</version> 108. </dependency> 109. <dependency> 110. <groupId>org.apache.spark</groupId> 111. <artifactId>spark-streaming-flume-assembly_2.11</artifactId> 112. <version>2.1.0</version> 113. </dependency> 114. <dependency> 115. <groupId>org.apache.spark</groupId> 116. <artifactId>spark-streaming-flume_2.11</artifactId> 117. <version>2.1.0</version> 118. </dependency> 119. <dependency> 120. <groupId>org.apache.spark</groupId> 121. <artifactId>spark-streaming_2.11</artifactId> 122. <version>${spark.version}</version> 123. </dependency> 124. <dependency> 125. <groupId>org.apache.spark</groupId> 126. <artifactId>spark-graphx_2.11</artifactId> 127. <version>2.1.0</version> 128. </dependency> 129. <dependency> 130. <groupId>org.scalanlp</groupId> 131. <artifactId>breeze_2.11</artifactId> 132. <version>0.11.2</version> 133. <scope>compile</scope> 134. <exclusions> 135. <exclusion> 136. <artifactId>junit</artifactId> 137. <groupId>junit</groupId> 138. </exclusion> 139. <exclusion> 140. <artifactId>commons-math3</artifactId> 141. <groupId>org.apache.commons</groupId> 142. </exclusion> 143. </exclusions> 144. </dependency> 145. <dependency> 146. <groupId>org.apache.commons</groupId> 147. <artifactId>commons-math3</artifactId> 148. <version>3.4.1</version> 149. <scope>compile</scope> 150. </dependency> 151. <dependency> 152. <groupId>org.apache.spark</groupId> 153. <artifactId>spark-mllib_2.11</artifactId> 154. <version>2.1.0</version> 155. </dependency> 156. <dependency> 157. <groupId>org.apache.spark</groupId> 158. <artifactId>spark-mllib-local_2.11</artifactId> 159. <version>2.1.0</version> 160. <scope>compile</scope> 161. </dependency> 162. <dependency> 163. <groupId>org.apache.spark</groupId> 164. <artifactId>spark-mllib-local_2.11</artifactId> 165. <version>2.1.0</version> 166. <type>test-jar</type> 167. <scope>test</scope> 168. </dependency> 169. <dependency> 170. <groupId>org.apache.spark</groupId> 171. <artifactId>spark-repl_2.11</artifactId> 172. <version>2.1.0</version> 173. </dependency> 174. <dependency> 175. <groupId>org.apache.hadoop</groupId> 176. <artifactId>hadoop-client</artifactId> 177. <version>2.6.0</version> 178. </dependency> 179. <dependency> 180. <groupId>org.apache.spark</groupId> 181. <artifactId>spark-streaming-kafka-0-8_2.10</artifactId> 182. <version>2.1.0</version> 183. </dependency> 184. <dependency> 185. <groupId>org.apache.spark</groupId> 186. <artifactId>spark-streaming-flume_2.11</artifactId> 187. <version>${spark.version}</version> 188. </dependency> 189. <dependency> 190. <groupId>mysql</groupId> 191. <artifactId>mysql-connector-java</artifactId> 192. <version>5.1.6</version> 193. </dependency> 194. <dependency> 195. <groupId>org.apache.hive</groupId> 196. <artifactId>hive-jdbc</artifactId> 197. <version>1.2.1</version> 198. </dependency> 199. <dependency> 200. <groupId>org.apache.httpcomponents</groupId> 201. <artifactId>httpclient</artifactId> 202. <version>4.4.1</version> 203. </dependency> 204. <dependency> 205. <groupId>org.apache.httpcomponents</groupId> 206. <artifactId>httpcore</artifactId> 207. <version>4.4.1</version> 208. </dependency> 209. 210. <!-- https://mvnrepository.com/artifact/org.apache.hadoop/ hadoop- common --> 211. <dependency> 212. <groupId>org.apache.hadoop</groupId> 213. <artifactId>hadoop-common</artifactId> 214. <version>2.6.0</version> 215. </dependency> 216. 217. <dependency> 218. <groupId>org.apache.hadoop</groupId> 219. <artifactId>hadoop-client</artifactId> 220. <version>2.6.0</version> 221. </dependency> 222. 223. <!--https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs--> 224. <dependency> 225. <groupId>org.apache.hadoop</groupId> 226. <artifactId>hadoop-hdfs</artifactId> 227. <version>2.6.0</version> 228. </dependency> 229. 230. 231. <dependency> 232. <groupId>redis.clients</groupId> 233. <artifactId>jedis</artifactId> 234. <version>${jedis.version}</version> 235. </dependency> 236. <dependency> 237. <groupId>org.json</groupId> 238. <artifactId>json</artifactId> 239. <version>20090211</version> 240. </dependency> 241. <dependency> 242. <groupId>com.fasterxml.jackson.core</groupId> 243. <artifactId>jackson-core</artifactId> 244. <version>2.6.3</version> 245. </dependency> 246. <dependency> 247. <groupId>com.fasterxml.jackson.core</groupId> 248. <artifactId>jackson-databind</artifactId> 249. <version>2.6.3</version> 250. </dependency> 251. <dependency> 252. <groupId>com.fasterxml.jackson.core</groupId> 253. <artifactId>jackson-annotations</artifactId> 254. <version>2.6.3</version> 255. </dependency> 256. <dependency> 257. <groupId>com.alibaba</groupId> 258. <artifactId>fastjson</artifactId> 259. <version>1.1.41</version> 260. </dependency> 261. <dependency> 262. <groupId>fastutil</groupId> 263. <artifactId>fastutil</artifactId> 264. <version>5.0.9</version> 265. </dependency> 266. <dependency> 267. <groupId>org.eclipse.jetty</groupId> 268. <artifactId>jetty-server</artifactId> 269. <version>${jetty.version}</version> 270. </dependency> 271. 272. <dependency> 273. <groupId>org.eclipse.jetty</groupId> 274. <artifactId>jetty-servlet</artifactId> 275. <version>${jetty.version}</version> 276. </dependency> 277. 278. <dependency> 279. <groupId>org.eclipse.jetty</groupId> 280. <artifactId>jetty-util</artifactId> 281. <version>${jetty.version}</version> 282. </dependency> 283. 284. <dependency> 285. <groupId>org.glassfish.jersey.core</groupId> 286. <artifactId>jersey-server</artifactId> 287. <version>${container.version}</version> 288. </dependency> 289. <dependency> 290. <groupId>org.glassfish.jersey.containers</groupId> 291. <artifactId>jersey-container-servlet-core</artifactId> 292. <version>${container.version}</version> 293. </dependency> 294. <dependency> 295. <groupId>org.glassfish.jersey.containers</groupId> 296. <artifactId>jersey-container-jetty-http</artifactId> 297. <version>${container.version}</version> 298. </dependency> 299. <dependency> 300. <groupId>org.apache.hadoop</groupId> 301. <artifactId>hadoop-mapreduce-client-core</artifactId> 302. <version>2.6.0</version> 303. </dependency> 304. 305. <dependency> 306. <groupId>org.antlr</groupId> 307. <artifactId>antlr4-runtime</artifactId> 308. <version>4.5.3</version> 309. </dependency> 310. 311. </dependencies> 312. <!-- 編譯配置 --> 313. <build> 314. <plugins> 315. <plugin> 316. <artifactId>maven-assembly-plugin</artifactId> 317. <configuration> 318. <classifier>dist</classifier> 319. <appendAssemblyId>true</appendAssemblyId> 320. <descriptorRefs> 321. <descriptor>jar-with-dependencies</descriptor> 322. </descriptorRefs> 323. </configuration> 324. <executions> 325. <execution> 326. <id>make-assembly</id> 327. <phase>package</phase> 328. <goals> 329. <goal>single</goal> 330. </goals> 331. </execution> 332. </executions> 333. </plugin> 334. 335. <plugin> 336. <artifactId>maven-compiler-plugin</artifactId> 337. <configuration> 338. <source>1.7</source> 339. <target>1.7</target> 340. </configuration> 341. </plugin> 342. 343. <plugin> 344. <groupId>net.alchim31.maven</groupId> 345. <artifactId>scala-maven-plugin</artifactId> 346. <version>3.2.2</version> 347. <executions> 348. <execution> 349. <id>scala-compile-first</id> 350. <phase>process-resources</phase> 351. <goals> 352. <goal>compile</goal> 353. </goals> 354. </execution> 355. </executions> 356. <configuration> 357. <scalaVersion>${scala.version}</scalaVersion> 358. <recompileMode>incremental</recompileMode> 359. <useZincServer>true</useZincServer> 360. <args> 361. <arg>-unchecked</arg> 362. <arg>-deprecation</arg> 363. <arg>-feature</arg> 364. </args> 365. <jvmArgs> 366. <jvmArg>-Xms1024m</jvmArg> 367. <jvmArg>-Xmx1024m</jvmArg> 368. </jvmArgs> 369. <javacArgs> 370. <javacArg>-source</javacArg> 371. <javacArg>${java.version}</javacArg> 372. <javacArg>-target</javacArg> 373. <javacArg>${java.version}</javacArg> 374. <javacArg>-Xlint:all,-serial,-path</javacArg> 375. </javacArgs> 376. </configuration> 377. </plugin> 378. 379. <plugin> 380. <groupId>org.antlr</groupId> 381. <artifactId>antlr4-maven-plugin</artifactId> 382. <version>4.3</version> 383. <executions> 384. <execution> 385. <id>antlr</id> 386. <goals> 387. <goal>antlr4</goal> 388. </goals> 389. <phase>none</phase> 390. </execution> 391. </executions> 392. <configuration> 393. <outputDirectory>src/test/java</outputDirectory> 394. <listener>true</listener> 395. <treatWarningsAsErrors>true</treatWarningsAsErrors> 396. </configuration> 397. </plugin> 398. </plugins> 399. </build> 400. </project>
(7)在pom.xml中,按Ctrl+S快捷鍵保存pom.xml文件,IDEA會自動從網上下載各類Jar包,下載的時間根據網絡帶寬的情況可能需要幾十分鐘,也可能需要幾個小時,全部下載好以后,可以看到工程中External Libraries已經加載了Spark相關的Jar包及源碼,如圖12-9所示。

圖12-9 使用Maven方式加載Jar包
3.在SparkApps工程中建立scala目錄
單擊SparkApps工程下的src/main目錄,右擊main,從彈出的快捷菜單中選擇New→Directory命令,新建一個目錄scala,如圖12-10所示。

圖12-10 新建目錄
單擊SparkApps工程下的src/main/scala目錄,右擊scala,從彈出的快捷菜單中選擇Mark Directory as→Resource Root命令,標識目錄scala為源碼目錄。至此,IDEA本地開發環境搭建完成,如圖12-11所示。

圖12-11 設置為代碼目錄
12.1.2 大數據電影點評系統中電影數據說明
1.大數據電影點評系統電影數據的來源
電影推薦系統(MovieLens)是美國明尼蘇達大學(Minnesota)計算機科學與工程學院的GroupLens項目組創辦的,是一個非商業性質的、以研究為目的的實驗性站點。電影推薦系統主要使用協同過濾和關聯規則相結合的技術,向用戶推薦他們感興趣的電影。這個項目是由John Riedl教授和Joseph Konstan教授領導的。該項目從1992年開始研究自動化協同過濾,在1996年使用自動化協同過濾系統應用于USENET新聞組中。自那以后,項目組擴大了研究范圍,基于內容方法以及改進當前的協作過濾技術來研究所有的信息過濾解決方案。
電影推薦系統(MovieLens)的數據下載地址為: https://grouplens.org/datasets/movielens/。GroupLens項目研究收集了從電影推薦系統MovieLens站點提供評級的數據集(http://MovieLens.org),收集了不同時間段的數據,我們可以根據電影分析業務需求下載不同規模大小的數據源文件。
2.大數據電影點評系統電影數據的格式說明
這里下載的是中等規模的電影推薦系統數據集。在本地目錄moviedata/medium包含的電影點評系統數據源中提供了在2000年6040個用戶觀看約3900部電影發表的1 000 209條匿名評級數據信息。
評級文件ratings.dat的格式描述如下。
1. UserID::MovieID::Rating::Timestamp 2. 用戶ID、電影ID、評分數據、時間戳 3. - 用戶ID范圍在1~6040之間 4. - 電影ID范圍在1~3952之間 5. - 評級:使用五星評分方式 6. - 時間戳表示系統記錄的時間 7. - 每個用戶至少有20個評級
評級文件ratings.dat中摘取部分記錄如下。
1. 1::1193::5::978300760 2. 1::661::3::978302109 3. 1::914::3::978301968 4. 1::3408::4::978300275 5. 1::2355::5::978824291 6. 1::1197::3::978302268 7. 1::1287::5::978302039 8. 1::2804::5::978300719 9. 1::594::4::978302268 10. 1::919::4::978301368
用戶文件users.dat的格式描述如下。
1. UserID::Gender::Age::Occupation::Zip-code 2. 用戶ID、性別、年齡、職業、郵編代碼 3. -所有的用戶資料由用戶自愿提供, GroupLens項目組不會去檢查用戶數據的準確性。這個數 據集中包含用戶提供的用戶數據 4. -性別:“M”是男性、“F”是女性 5. -年齡由以下范圍選擇: 6. * 1: "少于 18歲" 7. * 18: "18年齡段:從18歲到24歲" 8. * 25: "25年齡段:從25歲到34歲" 9. * 35: "35歲年齡段:從35歲到44歲" 10. * 45: "45歲年齡段:從45歲到49歲" 11. * 50: "50歲年齡段:從50歲到55歲" 12. * 56: "56歲年齡段:大于56歲"
從用戶文件users.dat中摘取部分記錄如下。
1. 1::F::1::10::48067 2. 2::M::56::16::70072 3. 3::M::25::15::55117 4. 4::M::45::7::02460 5. 5::M::25::20::55455 6. 6::F::50::9::55117 7. 7::M::35::1::06810 8. 8::M::25::12::11413 9. 9::M::25::17::61614 10. 10::F::35::1::95370
電影文件movies.dat的格式描述如下。
1. MovieID::Title::Genres 2. 電影ID、電影名、電影類型 3. -標題是由亞馬遜公司的互聯網電影資料庫(IMDB)提供的,包括電影發布年份 4. -電影類型包括以下類型: 5. * Action:行動 6. * Adventure:冒險 7. * Animation:動畫 8. * Children's:兒童 9. * Comedy:喜劇 10. * Crime:犯罪 11. * Documentary:紀錄片 12. * Drama:戲劇 13. * Fantasy:幻想 14. * Film-Noir:黑色電影 15. * Horror:恐怖 16. * Musical:音樂 17. * Mystery:神秘 18. * Romance:浪漫 19. * Sci-Fi:科幻 20. * Thriller:驚悚 21. * War:戰爭 22. * Western:西方 23. -由于偶然重復的電影記錄或者電影記錄測試,一些電影ID和電影名可能不一致。 24. -電影記錄大多是GroupLens手工輸入的,因此不一定準確。
電影文件movies.dat中摘取的部分記錄如下。
1. 1::Toy Story (1995)::Animation|Children's|Comedy 2. 2::Jumanji (1995)::Adventure|Children's|Fantasy 3. 3::Grumpier Old Men (1995)::Comedy|Romance 4. 4::Waiting to Exhale (1995)::Comedy|Drama 5. 5::Father of the Bride Part II (1995)::Comedy 6. 6::Heat (1995)::Action|Crime|Thriller 7. 7::Sabrina (1995)::Comedy|Romance 8. 8::Tom and Huck (1995)::Adventure|Children's 9. 9::Sudden Death (1995)::Action 10. 10::GoldenEye (1995)::Action|Adventure|Thriller
職業文件occupations.dat的格式描述如下。
1. OccupationID::Occupation 2. 職業ID、職業名 3. -職業包含如下選擇: 4. * 0:“其他”或未指定 5. * 1:“學術/教育者” 6. * 2:“藝術家” 7. * 3:“文書/行政” 8. * 4:“高校畢業生” 9. * 5:“客戶服務” 10. * 6:“醫生/保健” 11. * 7:“行政/管理” 12. * 8:“農民” 13. * 9:“家庭主婦” 14. * 10:“中小學生” 15. * 11:“律師” 16. * 12:“程序員” 17. * 13:“退休” 18. * 14:“銷售/市場營銷” 19. * 15:“科學家” 20. * 16:“個體戶” 21. * 17:“技術員/工程師” 22. * 18:“商人和工匠” 23. * 19:“失業” 24. * 20:“作家”
從職業文件occupations.dat中摘取部分記錄如下。
1. 0::other or not specified 2. 1::academic/educator 3. 2::artist 4. 3::clerical/admin 5. 4::college/grad student 6. 5::customer service 7. 6::doctor/health care 8. 7::executive/managerial 9. 8::farmer 10. 9::homemaker 11. 10::K-12 student 12. 11::lawyer 13. 12::programmer 14. 13::retired 15. 14::sales/marketing 16. 15::scientist 17. 16::self-employed 18. 17::technician/engineer 19. 18::tradesman/craftsman 20. 19::unemployed 21. 20::writer
12.1.3 電影點評系統用戶行為分析統計實戰
在本節大數據電影點評系統用戶行為分析統計實戰中,我們需統計用戶觀看電影和點評電影行為數據的采集、過濾、處理和展示。對于用戶行為的數據采集:在生產環境中,企業通常使用Kafka的方式實時收集前端服務器中發送的用戶行為日志記錄信息;對于用戶行為的數據過濾:可以在前端服務器端進行用戶行為數據的過濾和格式化,也可以采用Spark SQL的方式進行數據過濾。在大數據電影點評系統用戶行為分析統計實戰中,基于GroupLens項目組電影推薦系統(MovieLens)已經采集的用戶電影觀看和點評數據文件,我們直接基于ratings.dat、users.dat、movies.dat、occupations.dat文件進行用戶行為實戰分析。
用戶行為分析統計的數據處理:①一個基本的技巧是,先使用傳統的SQL去實現一個數據處理的業務邏輯(自己可以手動模擬一些數據);②在Spark2.x的時候,再一次推薦使用DataSet去實現業務功能,尤其是統計分析功能;③如果想成為專家級別的頂級Spark人才,請使用RDD實現業務功能,為什么?原因很簡單,因為使用Spark DataSet方式有一個底層的引擎catalyst,基于DataSet的編程,catalyst的引擎會對我們的代碼進行優化,有很多優化的言外之意是你看不到問題到底是怎么來的,假設出錯了,優化后的RDD跑在Spark上,打印的錯誤不是直接基于DataSet產生的錯誤,DataSet是在內核上的封裝,運行的時候是基于RDD的!因此,打印的錯誤是基于RDD的。 DataSet的優化引擎catalyst涉及Spark底層的代碼封裝。在DataSet的解析過程中,基于抽象語法樹和語法規則的相互配合,引擎catalyst完成了詞法分析、未解析的邏輯計劃、解析以后的邏輯計劃、優化后的邏輯計劃、物理計劃、可執行的物理計劃、物理計劃執行、生成RDD等一系列過程。如果使用DataSet出現問題,我們可能不知其所以然。而在業務代碼編碼中,如果我們直接使用RDD,可以直接基于RDD來排查問題。在本節大數據電影點評系統用戶行為分析統計實戰中,我們通過RDD的方式直接統計分析用戶的電影行為。
用戶行為分析統計的數據格式:在生產環境中,強烈建議大家使用Parquet的文件格式。Parquet是面向分析型業務的列式存儲格式,由Twitter和Cloudera合作開發,2015年5月成為Apache頂級項目。Parquet是列式存儲格式的一種文件類型,可以適配多種計算框架,是語言無關的,而且不與任何一種數據處理框架綁定在一起,適配多種語言和組件。在本節大數據電影點評系統用戶行為分析統計實戰中,我們研究試驗中小規模的用戶電影點評數據的分析,專注于大數據Spark RDD的算子實現,這里仍使用GroupLens項目組提供的文本文件格式,不進行Parquet格式的轉換。
大數據電影點評系統用戶行為分析統計的數據源格式:
1."ratings.dat":UserID::MovieID::Rating::Timestamp 2."users.dat":UserID::Gender::Age::OccupationID::Zip-code 3."movies.dat":MovieID::Title::Genres 4. "occupations.dat":OccupationID::OccupationName
大數據電影點評系統用戶行為分析統計實戰,我們使用Spark本地模式進行開發,在IDEA開發環境的SparkApps工程中的src/main/scala目錄中新建包com.dt.spark.cores,然后在com.dt.spark.cores下新建Movie_Users_Analyzer_RDD.scala文件。
在Movie_Users_Analyzer_RDD.scala文件中導入電影點評數據。
1. //設置打印日志的輸出級別 2. Logger.getLogger("org").setLevel(Level.ERROR) 3. 4. var masterUrl = "local[4]" //默認程序運行在本地Local模式中,主要用于學習和測試 5. var dataPath = "data/moviedata/medium/" //數據存放的目錄 6. 7. /** *當我們把程序打包運行在集群上的時候,一般都會傳入集群的URL信息,這里我們假設 *如果傳入參數,第一個參數只傳入Spark集群的URL,第二個參數傳入的是數據的地址信息 8. */ 9. 10. if(args.length > 0) { 11. masterUrl = args(0) 12. } else if (args.length > 1) { 13. dataPath = args(1) 14. } 15. 16. /** *創建Spark集群上下文sc,在sc中可以進行各種依賴和參數的設置等,大家可以通 *過SparkSubmit腳本的help去看設置信息 17. */ 18. val sc = new SparkContext(new SparkConf().setMaster(masterUrl).setAppName ("Movie_Users_Analyzer")) 19. 20. /** *讀取數據,用什么方式讀取數據呢?這里使用的是RDD 21. */ 22. val usersRDD = sc.textFile(dataPath + "users.dat") 23. val moviesRDD = sc.textFile(dataPath + "movies.dat") 24. val occupationsRDD = sc.textFile(dataPath + "occupations.dat") 25. val ratingsRDD = sc.textFile(dataPath + "ratings.dat") 26. val ratingsRDD = sc.textFile("data/moviedata/large/" + "ratings.dat")
電影點評系統用戶行為分析之一,統計具體某部電影觀看的用戶信息,如電影ID為1193的用戶信息(用戶的ID、Age、Gender、Occupation)。為了便于閱讀,我們在Spark Driver端collect()獲取到RDD的元素集合以后,使用collect().take(2)算子打印輸出RDD的兩個元素,最后的用戶信息的輸出結果,我們使用collect().take(20)顯示10個元素。
1. /** *電影點評系統用戶行為分析之一:分析具體某部電影觀看的用戶信息,如電影ID為 *1193的用戶信息(用戶的ID、Age、Gender、Occupation) 2. */ 3. 4. val usersBasic: RDD[(String, (String, String, String))] = usersRDD. map(_.split("::")).map { user => 5. ( //UserID::Gender::Age::OccupationID 6. user(3), 7. (user(0), user(1), user(2)) 8. ) 9. } 10. for (elem <- usersBasic.collect().take(2)) { 11. println("usersBasicRDD (職業ID,(用戶ID,性別,年齡)): " + elem) 12. } 13. val occupations: RDD[(String, String)] = occupationsRDD.map(_.split ("::")).map(job => (job(0), job(1))) 14. for (elem <- occupations.collect().take(2)) { 15. println("occupationsRDD(職業ID,職業名): " + elem) 16. } 17. val userInformation: RDD[(String, ((String, String, String), String))] = usersBasic.join(occupations) 18. userInformation.cache() 19. 20. for (elem <- userInformation.collect().take(2)) { 21. println("userInformationRDD (職業ID,((用戶ID,性別,年齡),職業名)): " + elem) 22. } 23. 24. val targetMovie: RDD[(String, String)] = ratingsRDD.map(_.split ("::")).map(x => (x(0), x(1))).filter(_._2.equals("1193")) 25. 26. for (elem <- targetMovie.collect().take(2)) { 27. println("targetMovie(用戶ID,電影ID) : " + elem) 28. } 29. 30. val targetUsers: RDD[(String, ((String, String, String), String))] = userInformation.map(x => (x._2._1._1, x._2)) 31. for (elem <- targetUsers.collect().take(2)) { 32. println("targetUsers (用戶ID, ((用戶ID,性別,年齡), 職業名)): " + elem) 33. } 34. println("電影點評系統用戶行為分析,統計觀看電影ID為1193的電影用戶信息:用戶 的ID、性別、年齡、職業名 ") 35. val userInformationForSpecificMovie: RDD[(String, (String, ((String, String, String), String)))] = targetMovie.join(targetUsers) 36. for (elem <- userInformationForSpecificMovie.collect().take(10)) { 37. println("userInformationForSpecificMovie(用戶ID, (電影ID, ((用戶ID, 性別,年齡), 職業名))) : " + elem) 38. }
在IDEA中運行代碼,結果如下。
1. Using Spark's default log4j profile: org/apache/spark/log4j-defaults. properties 2. [Stage 0:> (0 + 0) / 2] 3. usersBasicRDD (職業ID,(用戶ID,性別,年齡)): (10,(1,F,1)) 4. usersBasicRDD (職業ID,(用戶ID,性別,年齡)): (16,(2,M,56)) 5. 6. occupationsRDD(職業ID,職業名): (0,other or not specified) 7. occupationsRDD(職業ID,職業名): (1,academic/educator) 8. 9. userInformationRDD (職業ID, ((用戶ID,性別,年齡), 職業名)): (4,((25,M,18), college/grad student)) 10. userInformationRDD (職業ID, ((用戶ID,性別,年齡), 職業名)): (4,((38,F,18), college/grad student)) 11. 12. targetMovie(用戶ID,電影ID) : (6,1193) 13. targetMovie(用戶ID,電影ID) : (10,1193) 14. 15. targetUsers (用戶 ID, ((用戶 ID,性別,年齡), 職業名)): (25,((25,M,18), college/grad student)) 16. targetUsers (用戶 ID, ((用戶 ID,性別,年齡), 職業名)): (38,((38,F,18), college/grad student)) 17. 電影點評系統用戶行為分析,統計觀看電影ID為1193的電影用戶信息:用戶的ID、性別、年 齡、職業名 18. userInformationForSpecificMovie(用戶ID, (電影ID, ((用戶ID,性別,年齡), 職 業名))) : (3638,(1193,((3638,M,25),artist))) 19. userInformationForSpecificMovie(用戶ID, (電影ID, ((用戶ID,性別,年齡), 職 業名))) : (2060,(1193,((2060,M,1),academic/educator))) 20. userInformationForSpecificMovie(用戶ID, (電影ID, ((用戶ID,性別,年齡), 職 業名))) : (91,(1193,((91,M,35),executive/managerial))) 21. userInformationForSpecificMovie(用戶ID, (電影ID, ((用戶ID,性別,年齡), 職 業名))) : (4150,(1193,((4150,M,25),other or not specified))) 22. userInformationForSpecificMovie(用戶ID, (電影ID, ((用戶ID,性別,年齡), 職 業名))) : (3168,(1193,((3168,F,35),customer service))) 23. userInformationForSpecificMovie(用戶ID, (電影ID, ((用戶ID,性別,年齡), 職 業名))) : (2596,(1193,((2596,M,50),executive/managerial))) 24. userInformationForSpecificMovie(用戶ID, (電影ID, ((用戶ID,性別,年齡), 職 業名))) : (2813,(1193,((2813,M,25),writer))) 25. userInformationForSpecificMovie(用戶ID, (電影ID, ((用戶ID,性別,年齡), 職 業名))) : (5445,(1193,((5445,M,25),other or not specified))) 26. userInformationForSpecificMovie(用戶ID, (電影ID, ((用戶ID,性別,年齡), 職 業名))) : (3652,(1193,((3652,M,25),programmer))) 27. userInformationForSpecificMovie(用戶ID, (電影ID, ((用戶ID,性別,年齡), 職 業名))) : (3418,(1193,((3418,F,18),clerical/admin)))