官术网_书友最值得收藏!

1.3.1 由Spark組件組成的一站式軟件棧

如圖 1-3 所示,針對各種各樣的作業類型,Spark 提供了 4 個組件:Spark SQL、Spark Structured Streaming、Spark MLlib,以及 GraphX。這些組件都獨立于 Spark 具有容錯性的內核引擎。通過使用這些組件提供的 API,你可以實現自己的 Spark 應用。Spark 會將它轉為 DAG,然后交給內核引擎執行。這樣一來,無論你使用的是 Java、R、Scala、SQL、Python 中的哪種語言,只要使用了提供的這些結構化 API(第 3 章將對其做進一步介紹),底層代碼就都會轉為高度緊湊的字節碼,然后交給集群內工作節點上的 JVM 去執行。

圖 1-3:Spark 的組件和 API 棧

接下來分別介紹這些組件。

  1. Spark SQL

    Spark SQL 非常適合處理結構化數據。關系數據庫的表或用文件(CSV、文本文件、JSON、Avro、ORC、Parquet 等文件格式)存儲的結構化數據都可以被讀取并構建為 Spark 的永久表或臨時表。另外,當使用 Spark 提供的 Java、Python、Scala 或 R 等語言的結構化 API 時,你可以直接用 SQL 查詢來訪問剛讀入為 Spark DataFrame 的數據。至撰寫本書時,Spark SQL 語法已經兼容于 ANSI SQL:2003 標準,功能也和純正的 SQL 引擎相同。

    舉例來說,在以下 Scala 代碼片段中,你可以從存儲于亞馬遜云 S3 存儲桶內的 JSON 文件讀取數據、創建臨時表,并且執行類似 SQL 的查詢語句。查詢結果由內存中的 Spark DataFrame 對象表示。

    // Scala代碼
    // 從亞馬遜云S3存儲桶加載數據為Spark DataFrame
    spark.read.json("s3://apache_spark/data/committers.json")
      .createOrReplaceTempView("committers")
    // 發起SQL查詢,并以Spark DataFrame的形式返回結果
    val results = spark.sql("""SELECT name, org, module, release, num_commits
        FROM committers WHERE module = 'mllib' AND num_commits > 10
        ORDER BY num_commits DESC""")

    你也可以用 Python、R、Java 等語言寫出類似的代碼片段,因為執行時所生成的字節碼相同,所以執行性能也是一樣的。

  2. Spark MLlib

    Spark 自帶一個包含常見機器學習算法的庫——MLlib。從 Spark 發布第一個版本至今,這個組件的性能已經有了天翻地覆的提升,尤其是 Spark 2.x 底層引擎的改進。

    MLlib 提供了很多常見的機器學習算法,這些算法基于高級的 DataFrame API 構建,可用于搭建模型。

    從 Spark 1.6 開始,MLlib 項目分成了兩個包:spark.mllibspark.ml。后者提供基于 DataFrame 的 API,而前者包含基于 RDD 的 API,現在前者處于維護模式(不會開發新特性,只提供原有特性維護)。所有的新特性只存在于 spark.ml 中。本書所提到的“MLlib”是 Spark 中機器學習庫的總稱。

    這些 API 允許你提取或轉化特征,構建用于訓練和評估模型的流水線,并在不同環境間持久化模型(包括保存和重新加載)。其他功能包括常見線性代數運算和統計函數的使用。MLlib 包含其他一些低級的機器學習原語,比如通用的梯度下降優化。以下的 Python 代碼片段封裝了數據科學家在構建模型時的一些基本操作(第 10~11 章將討論更詳細的示例)。

    # Python代碼
    from pyspark.ml.classification import LogisticRegression
    ...
    training = spark.read.csv("s3://...")
    test = spark.read.csv("s3://...")
    
    # 加載訓練數據
    lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
    
    # 擬合模型
    lrModel = lr.fit(training)
    
    # 預測
    lrModel.transform(test)
    ...
  3. Spark Structured Streaming

    基于 Spark SQL 引擎和 DataFrame API,Spark 2.0 引入了實驗性的連續流處理模型和結構化流處理 API。到 Spark 2.2 版本,結構化流處理(Structured Streaming)已經到了一般可用的狀態,也就是說,開發人員已經可以在生產環境中使用它了。

    為了讓大數據開發人員能夠將來自 Apache Kafka 或其他流式數據源的數據流和靜態數據結合起來,并實時響應,新模型將數據流視為持續增長的表,新的數據記錄不斷追加到表的最后。開發人員只需將數據流當作結構化的表,像靜態表那樣用查詢語句直接進行查詢即可。

    在結構化流處理模型下,內部的 Spark SQL 核心引擎會處理包括容錯和遲到數據(late- data)語義在內的方方面面,以便開發人員從容地將注意力放在流處理應用本身的實現上。這種新模型取代了 Spark 1.x 版本中基于 DStream 的舊模型,第 8 章將對其進行詳細介紹。此外,Spark 2.x 和 Spark 3.0 將流式數據源的范圍擴大到包括 Apache Kafka、Kinesis、基于 HDFS 的數據,以及云存儲的數據。

    以下代碼片段展示了結構化流處理應用的典型結構。這個示例從本機的套接字讀取數據,并將單詞計數結果寫入 Apache Kafka。

    # Python代碼
    # 從本機讀取一個數據流
    from pyspark.sql.functions import explode, split
    lines = (spark
      .readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load())
    
    # 執行轉化操作
    # 將每行字符切分為單詞
    words = lines.select(explode(split(lines.value, " ")).alias("word"))
    
    # 生成流式單詞統計
    word_counts = words.groupBy("word").count()
    
    # 將輸出寫入Kafka數據流
    query = (word_counts
      .writeStream
      .format("kafka")
      .option("topic", "output"))
  4. GraphX

    顧名思義,GraphX 是用于操作圖(如社交網絡圖、路線和連接點圖、網絡拓撲圖等)和執行并行圖計算的庫。社區用戶為它貢獻了標準的圖算法,可以用于分析、連通、遍歷。可供使用的算法包括 PageRank(網頁排名)、Connected Components(連通分支),以及 Triangle Counting(三角計數)等。2

    以下代碼片段展示了如何用 GraphX 的 API 連接兩個圖。

    // Scala代碼
    val graph = Graph(vertices, edges)
    val messages = spark.textFile("hdfs://...")
    val graph2 = graph.joinVertices(messages) {
      (id, vertex, msg) => ...
    }

2Databricks 向社區貢獻的開源項目 GraphFrames 是一個和 Spark 中的 GraphX 類似的通用圖計算庫,區別在于 GraphFrames 使用基于 DataFrame 的 API。

主站蜘蛛池模板: 嘉禾县| 德钦县| 固原市| 伽师县| 铁力市| 金堂县| 元朗区| 新巴尔虎右旗| 沅陵县| 和平区| 沾益县| 西贡区| 盐亭县| 芒康县| 临泉县| 土默特右旗| 吉安市| 电白县| 方山县| 曲阜市| 咸宁市| 调兵山市| 江阴市| 股票| 怀仁县| 西平县| 大渡口区| 天台县| 招远市| 康平县| 万宁市| 十堰市| 张北县| 寿宁县| 永仁县| 察哈| 广西| 磴口县| 长子县| 基隆市| 延安市|