官术网_书友最值得收藏!

1.3.2 Spark的分布式執行

讀到這里,你一定已經知道了 Spark 是一個分布式數據處理引擎,其各種組件在一個集群上協同工作。接下來的幾章會探討如何使用 Spark 進行編程,在此之前,你需要先了解 Spark 分布式架構中的各組件是如何一起工作并相互通信的,以及 Spark 都支持哪些部署模式。

我們先一一介紹圖 1-4 中出現的組件,以及這些組件在架構中發揮的作用。從整體架構上看,Spark 應用有一個驅動器程序,該程序負責控制 Spark 集群內的并行計算。驅動器會通過 SparkSession 對象訪問集群內的分布式組件(一系列 Spark 執行器)和集群管理器。

圖 1-4:Spark 的組件和架構

  1. Spark 驅動器

    作為 Spark 應用中負責初始化 SparkSession 的部分,Spark 驅動器扮演著多個角色:它與集群管理器打交道;它向集群管理器申請 Spark 執行器(JVM)所需要的資源(CPU、內存等);它還會將所有的 Spark 操作轉換為 DAG 運算,并負責調度,還要將這些計算分成任務分發到 Spark 執行器上。一旦資源分配完成,創建好執行器后,驅動器就會直接與執行器通信。

  2. SparkSession`

    在 Spark 2.0 中,SparkSession 是所有 Spark 操作和數據的統一入口。它不僅封裝了 Spark 程序之前的各種入口(如 SparkContextSQLContextHiveContextSparkConf,以及 StreamingContext 等),還讓 Spark 變得更加簡單、好用。

    在 Spark 2.x 中,雖然 SparkSession 對象已經包含了其他所有的上下文對象,但你仍然可以訪問那些上下文對象及其方法。通過這種方式,社區保持著后向的兼容性。也就是說,使用 SparkContextSQLContext 的基于 1.x 版本的舊代碼也可以在 2.x 上運行。

    通過這個入口,可以創建 JVM 運行時參數、定義 DataFrame 或 Dataset、從數據源讀取數據、訪問數據庫元數據,并發起 Spark SQL 查詢。SparkSession 為所有的 Spark 功能提供了統一的入口。

    在獨立的 Spark 應用中,你可以用自己所選擇的編程語言的高級 API 創建 SparkSession 對象。在 Spark shell 中(第 2 章將進一步介紹),SparkSession 對象會被自動創建,你只需要使用全局變量 sparksc3 即可訪問。

    在 Spark 1.x 中,需要創建多種上下文對象(分別用于流處理、SQL 等),這會讓代碼顯得很煩瑣。在 Spark 2.x 中,應用只需要為每個 JVM 創建一個 SparkSession 對象,然后就可以用其執行各種 Spark 操作。

    我們直接看代碼示例吧。

    // Scala代碼
    import org.apache.spark.sql.SparkSession
    
    // 構建SparkSession
    val spark = SparkSession
      .builder
      .appName("LearnSpark")
      .config("spark.sql.shuffle.partitions", 6)
      .getOrCreate()
    ...
    // 用session對象讀取JSON
    val people = spark.read.json("...")
    ...
    // 用session對象發起SQL查詢
    val resultsDF = spark.sql("SELECT city, pop, state, zip FROM table_name")
  3. 集群管理器

    集群管理器負責管理和分配集群內各節點的資源,以用于 Spark 應用的執行。目前,Spark 支持 4 種集群管理器:Spark 自帶的獨立集群管理器、Apache Hadoop YARN、Apache Mesos,以及 Kubernetes。

  4. Spark 執行器

    Spark 執行器在集群內各工作節點上運行。執行器與驅動器程序通信,并負責在工作節點上執行任務。在大多數部署模式中,每個工作節點上只有一個執行器。

  5. 部署模式

    支持多種部署模式是 Spark 的一大優勢,這讓 Spark 可以在不同的配置和環境中運行。因為集群管理器不需要知道它實際在哪里運行(只要能管理 Spark 的執行器,并滿足資源請求就行),所以 Spark 可以部署在 Apache Hadoop YARN 和 Kubernetes 等一些常見環境中,并且以不同的模式運行。表 1-1 總結了可供選擇的部署模式。

    表 1-1:Spark部署模式一覽表

  6. 分布式數據與分區

    實際的物理數據是以分區的形式分布在 HDFS 或者云存儲上的,如圖 1-5 所示。數據分區遍布整個物理集群,而 Spark 將每個分區在邏輯上抽象為內存中的一個 DataFrame4。出于數據本地性要求,在分配任務時,根據要讀取的數據分區與各 Spark 執行器在網絡上的遠近,最好將任務分配到最近的 Spark 執行器上。

    圖 1-5:物理機器間的數據分布

    分區可以實現高效的并行執行。將數據切割為數據塊或分區的分布式結構可以讓 Spark 執行器只處理靠近自己的數據,從而最小化網絡帶寬使用率。也就是說,執行器的每個核心都能分到自己要處理的數據分區,如圖 1-6 所示。

    圖 1-6:執行器的每個核心都獲得了數據的一個分區

    舉個例子,以下代碼片段將存儲在集群中的物理數據分入 8 個分區。這樣一來,每個執行器都可以分到一個或多個分區,然后加載到執行器的內存中。

    # Python代碼
    log_df = spark.read.text("path_to_large_text_file").repartition(8)
    print(log_df.rdd.getNumPartitions())

    以下代碼會在內存中創建一個包含 8 個分區、共 10 000 個整型數的 DataFrame。

    # Python代碼
    df = spark.range(0, 10000, 1, 8)
    print(df.rdd.getNumPartitions())

    這兩段代碼都會打印出 8

    第 3 章和第 7 章會討論如何根據所有執行器的核心數來修改和調整分區方式,以實現最大的并發度。

3全局變量 scSparkSession 內的 SparkContext 對象。——譯者注

4此 DataFrame 與后續會介紹的 Spark DataFrame 不是同一個概念。——譯者注

主站蜘蛛池模板: 衡水市| 沙雅县| 咸宁市| 土默特右旗| 东乡县| 丹凤县| 宁蒗| 深泽县| 扶绥县| 金沙县| 安丘市| 高淳县| 建宁县| 金溪县| 淮北市| 内乡县| 沅陵县| 祥云县| 鹤岗市| 高州市| 临沂市| 盱眙县| 称多县| 肥城市| 时尚| 新安县| 隆子县| 晋城| 东宁县| 泾川县| 天柱县| 汉沽区| 南江县| 和硕县| 吉木萨尔县| 内黄县| 微博| 新巴尔虎右旗| 武川县| 阳高县| 博湖县|