- Spark快速大數據分析(第2版)
- (美)朱爾斯·S. 達米吉 布魯克·韋尼希 丹尼·李 (印)泰瑟加塔·達斯
- 1676字
- 2021-12-06 11:51:04
1.3.1 由Spark組件組成的一站式軟件棧
如圖 1-3 所示,針對各種各樣的作業類型,Spark 提供了 4 個組件:Spark SQL、Spark Structured Streaming、Spark MLlib,以及 GraphX。這些組件都獨立于 Spark 具有容錯性的內核引擎。通過使用這些組件提供的 API,你可以實現自己的 Spark 應用。Spark 會將它轉為 DAG,然后交給內核引擎執行。這樣一來,無論你使用的是 Java、R、Scala、SQL、Python 中的哪種語言,只要使用了提供的這些結構化 API(第 3 章將對其做進一步介紹),底層代碼就都會轉為高度緊湊的字節碼,然后交給集群內工作節點上的 JVM 去執行。

圖 1-3:Spark 的組件和 API 棧
接下來分別介紹這些組件。
Spark SQL
Spark SQL 非常適合處理結構化數據。關系數據庫的表或用文件(CSV、文本文件、JSON、Avro、ORC、Parquet 等文件格式)存儲的結構化數據都可以被讀取并構建為 Spark 的永久表或臨時表。另外,當使用 Spark 提供的 Java、Python、Scala 或 R 等語言的結構化 API 時,你可以直接用 SQL 查詢來訪問剛讀入為 Spark DataFrame 的數據。至撰寫本書時,Spark SQL 語法已經兼容于 ANSI SQL:2003 標準,功能也和純正的 SQL 引擎相同。
舉例來說,在以下 Scala 代碼片段中,你可以從存儲于亞馬遜云 S3 存儲桶內的 JSON 文件讀取數據、創建臨時表,并且執行類似 SQL 的查詢語句。查詢結果由內存中的 Spark DataFrame 對象表示。
// Scala代碼 // 從亞馬遜云S3存儲桶加載數據為Spark DataFrame spark.read.json("s3://apache_spark/data/committers.json") .createOrReplaceTempView("committers") // 發起SQL查詢,并以Spark DataFrame的形式返回結果 val results = spark.sql("""SELECT name, org, module, release, num_commits FROM committers WHERE module = 'mllib' AND num_commits > 10 ORDER BY num_commits DESC""")
你也可以用 Python、R、Java 等語言寫出類似的代碼片段,因為執行時所生成的字節碼相同,所以執行性能也是一樣的。
Spark MLlib
Spark 自帶一個包含常見機器學習算法的庫——MLlib。從 Spark 發布第一個版本至今,這個組件的性能已經有了天翻地覆的提升,尤其是 Spark 2.x 底層引擎的改進。
MLlib 提供了很多常見的機器學習算法,這些算法基于高級的 DataFrame API 構建,可用于搭建模型。
從 Spark 1.6 開始,MLlib 項目分成了兩個包:
spark.mllib
和spark.ml
。后者提供基于 DataFrame 的 API,而前者包含基于 RDD 的 API,現在前者處于維護模式(不會開發新特性,只提供原有特性維護)。所有的新特性只存在于spark.ml
中。本書所提到的“MLlib”是 Spark 中機器學習庫的總稱。這些 API 允許你提取或轉化特征,構建用于訓練和評估模型的流水線,并在不同環境間持久化模型(包括保存和重新加載)。其他功能包括常見線性代數運算和統計函數的使用。MLlib 包含其他一些低級的機器學習原語,比如通用的梯度下降優化。以下的 Python 代碼片段封裝了數據科學家在構建模型時的一些基本操作(第 10~11 章將討論更詳細的示例)。
# Python代碼 from pyspark.ml.classification import LogisticRegression ... training = spark.read.csv("s3://...") test = spark.read.csv("s3://...") # 加載訓練數據 lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8) # 擬合模型 lrModel = lr.fit(training) # 預測 lrModel.transform(test) ...
Spark Structured Streaming
基于 Spark SQL 引擎和 DataFrame API,Spark 2.0 引入了實驗性的連續流處理模型和結構化流處理 API。到 Spark 2.2 版本,結構化流處理(Structured Streaming)已經到了一般可用的狀態,也就是說,開發人員已經可以在生產環境中使用它了。
為了讓大數據開發人員能夠將來自 Apache Kafka 或其他流式數據源的數據流和靜態數據結合起來,并實時響應,新模型將數據流視為持續增長的表,新的數據記錄不斷追加到表的最后。開發人員只需將數據流當作結構化的表,像靜態表那樣用查詢語句直接進行查詢即可。
在結構化流處理模型下,內部的 Spark SQL 核心引擎會處理包括容錯和遲到數據(late- data)語義在內的方方面面,以便開發人員從容地將注意力放在流處理應用本身的實現上。這種新模型取代了 Spark 1.x 版本中基于 DStream 的舊模型,第 8 章將對其進行詳細介紹。此外,Spark 2.x 和 Spark 3.0 將流式數據源的范圍擴大到包括 Apache Kafka、Kinesis、基于 HDFS 的數據,以及云存儲的數據。
以下代碼片段展示了結構化流處理應用的典型結構。這個示例從本機的套接字讀取數據,并將單詞計數結果寫入 Apache Kafka。
# Python代碼 # 從本機讀取一個數據流 from pyspark.sql.functions import explode, split lines = (spark .readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load()) # 執行轉化操作 # 將每行字符切分為單詞 words = lines.select(explode(split(lines.value, " ")).alias("word")) # 生成流式單詞統計 word_counts = words.groupBy("word").count() # 將輸出寫入Kafka數據流 query = (word_counts .writeStream .format("kafka") .option("topic", "output"))
GraphX
顧名思義,GraphX 是用于操作圖(如社交網絡圖、路線和連接點圖、網絡拓撲圖等)和執行并行圖計算的庫。社區用戶為它貢獻了標準的圖算法,可以用于分析、連通、遍歷。可供使用的算法包括 PageRank(網頁排名)、Connected Components(連通分支),以及 Triangle Counting(三角計數)等。2
以下代碼片段展示了如何用 GraphX 的 API 連接兩個圖。
// Scala代碼 val graph = Graph(vertices, edges) val messages = spark.textFile("hdfs://...") val graph2 = graph.joinVertices(messages) { (id, vertex, msg) => ... }
2Databricks 向社區貢獻的開源項目 GraphFrames 是一個和 Spark 中的 GraphX 類似的通用圖計算庫,區別在于 GraphFrames 使用基于 DataFrame 的 API。