- PySpark大數據分析與應用
- 戴剛 張良均主編
- 1941字
- 2025-01-07 16:58:56
1.3 PySpark大數據分析
Spark主要由Scala和Java語言開發,運行在Java虛擬機(Java Virtual Machine,JVM)中。除了提供Scala、Java開發接口外,Spark還為Python、R等語言提供了開發接口。PySpark是Spark為Python開發者提供的API,使得Python開發者在Python環境下可以運行Spark程序。
1.3.1 PySpark簡介
Python在數據分析和機器學習領域擁有豐富的庫資源,如NumPy、SciPy、Pandas和Scikit-learn 等,因此成為數據科學家和數據分析師處理和分析數據的熱門語言。Spark 是目前處理和使用大數據的主流框架之一,其設計初衷是加速迭代計算,非常適合大數據分析、機器學習等應用場景。為了兼顧Spark和Python的優勢,Apache Spark開源社區推出了PySpark。
與原生Python相比,PySpark的優勢在于其能夠運行在集群上,而不僅僅局限于單機環境。因此,當數據量過大以至于單機無法處理,或數據存儲在HDFS中,或需要進行分布式/并行計算時,可以選擇使用PySpark。
1.3.2 PySpark子模塊
PySpark包括一組公共類、用于處理結構化數據的SQL模塊與流數據處理的Streaming模塊、用于機器學習的MLlib和ML兩個包。PySpark類、模塊與包如圖1-8所示。

圖1-8 PySpark類、模塊與包
PySpark 公共類中的 pyspark.SparkContext、pyspark.RDD,Streaming 模塊中的pyspark.streaming.StreamingContext、pyspark.streaming.DStream 以及 SQL 模塊中的pyspark.sql.SparkSession、pyspark.sql.DataFrame為PySpark的核心類。PySpark核心類說明如表1-4所示。
表1-4 PySpark核心類說明

1.PySpark公共類
在 PySpark 中有11個公共類,分別是 Accumulator、Broadcast、RDD、SparkConf、SparkContext、SparkFiles、StorageLevel、TaskContext、RDDBarrier、BarrierTaskContext和BarrierTaskInfo。PySpark公共類的簡要說明如表1-5所示。
表1-5 PySpark公共類的簡要說明

PySpark的主要公共類解釋說明如下。
(1)SparkContext編程入口
在PySpark中,SparkContext類是所有Spark功能的核心入口點,扮演著重要的角色。它負責與 Spark 集群通信,并負責任務的分發和執行。以下是關于 SparkContext 的詳細說明。
① 功能入口。SparkContext作為Spark功能的入口點,是運行任何Spark應用程序時必須初始化的對象。因此,在編寫PySpark程序時,需要先創建一個SparkContext實例,并傳入一個SparkConf對象作為參數。通過這個SparkContext實例,可以提交作業、分發任務和注冊應用程序。
② 驅動程序。當運行一個 Spark 應用程序時,系統會啟動一個驅動程序,其中包含main函數。SparkContext會在驅動程序中啟動,并在工作節點上的Executor中運行操作。
③ 集群連接。SparkContext表示與Spark集群的連接,它是創建RDD和廣播變量的基礎。
④ 默認實例。默認情況下,PySpark 將 SparkContext 實例命名為“sc”,因此在大多數情況下,可以直接使用“sc”這個名字來訪問SparkContext實例。
此外,SparkContext還提供了許多用于操作RDD的方法,例如map()、filter()、reduce()等,這些方法使得對數據的操作變得簡單高效。它還支持廣播變量,這是一種只讀變量,可以被緩存在每臺機器上,以便在每個任務中快速訪問而無需通過網絡傳輸。
(2)SparkConf配置對象
在PySpark中,SparkConf是一個關鍵的配置類,用來設置和管理Spark應用程序的各種參數。通過創建SparkConf對象,可以自定義Spark應用程序參數來定制應用程序的行為,以滿足不同的需求和環境。以下是關于SparkConf對象的詳細說明。
① 創建SparkConf對象。通過調用SparkConf()構造函數,可以創建一個新的SparkConf對象。這個構造函數接受一個可選的字典參數,用于指定默認的配置選項。
② 加載系統屬性。SparkConf對象會自動從Java系統屬性中加載所有以“spark.”為前綴的屬性。例如,如果在啟動JVM時設置了“-Dspark.app.name=MyApp”,那么可以使用SparkConf對象的“.get("spark.app.name")”方法獲取到“MyApp”。
③ 設置和獲取配置選項。可以使用“set(key, value)”方法來設置配置選項,使用get(key)方法來獲取配置選項的值。如果嘗試獲取一個未設置的配置選項,那么系統將會拋出一個異常。
④ 優先級規則。如果在創建SparkConf對象后使用set()方法設置了某個配置選項,那么該方法設置的值將優先于從系統屬性中加載的值。
⑤ 不可變性。一旦創建了SparkConf對象并將其傳遞給SparkContext,就不能再修改該對象。這是由于Spark需要確保在應用程序的整個生命周期中,配置參數保持一致。
⑥ 傳遞配置給SparkContext。在創建SparkContext對象時,需要傳入一個SparkConf對象。這樣,SparkContext就可以使用這些配置參數來初始化和運行Spark應用程序。
(3)PySpark廣播變量與累加器
在 Spark 中,為了支持并行處理,可以使用兩種類型的變量:廣播變量(Broadcast Variables)和累加器(Accumulators)。這兩種變量可以在集群的每個節點上運行任務時使用。
① 廣播變量。廣播變量用于在所有節點上保存數據的只讀副本。當需要在多個節點上使用相同的數據時,可以使用廣播變量來避免數據的重復傳輸。廣播變量在第一次使用時會被緩存在各個節點上,之后可以快速訪問而無需再次通過網絡傳輸。
② 累加器。累加器用于在集群中的多個節點上聚合信息。與廣播變量不同,累加器是可變的,可以進行關聯和交換操作。例如,可以使用累加器來實現計數器或求和操作。累加器的值會在任務執行過程中不斷更新,并最終返回給驅動程序。
總的來說,廣播變量主要用于在集群中共享只讀數據,而累加器用于在集群中進行信息聚合。
2.PySpark SQL模塊
PySpark SQL(pyspark.sql)模塊包含10個類,提供了類型、配置、DataFrame和許多其他功能的SQL函數和方法,PySpark SQL模塊相關類說明如表1-6所示。關于PySpark SQL模塊,在本書第3章中將進行詳細介紹。
表1-6 PySpark SQL模塊相關類說明

3.PySpark Streaming模塊
PySpark Streaming(pyspark.streaming)模塊包含3個主要的類:StreamingContext、DStream、StreamingListener,也特別提供針對Flume、Kafka、Kinesis流數據處理的類,但這里只對前3個類進行說明,如表1-7所示。PySpark Streaming模塊將在本書第4章中做詳細介紹。
表1-7 PySpark Streaming模塊相關類說明
