- 大數據處理框架Apache Spark設計與實現
- 許利杰等
- 6856字
- 2024-01-19 16:36:19
1.4 大數據處理框架的四層結構
一個大數據應用可以表示為<;輸入數據,用戶代碼,配置參數>;。應用的輸入數據一般以分塊(如以128MB為一塊)形式預先存儲在分布式文件系統(如HDFS[18])之上。用戶在向大數據處理框架提交應用之前,需要指定數據存儲位置,撰寫數據處理代碼,并設定配置參數。之后,用戶將應用提交給大數據處理框架運行。
大數據處理框架大體可以分為四層結構:用戶層、分布式數據并行處理層、資源管理與任務調度層、物理執行層。以Apache Spark框架為例,其四層結構如圖1.2所示。在用戶層中,用戶需要準備數據、開發用戶代碼、配置參數。之后,分布式數據并行處理層根據用戶代碼和配置參數,將用戶代碼轉化成邏輯處理流程(數據單元及數據依賴關系),然后將邏輯處理流程轉化為物理執行計劃(執行階段及執行任務)。資源管理與任務調度層根據用戶提供的資源需求來分配資源容器,并將任務(task)調度到合適的資源容器上運行。物理執行層實際運行具體的數據處理任務。下面具體介紹每個層次的詳細信息,以及工業界和學術界進行的一些相關工作。

圖1.2 大數據處理框架的四層結構
1.4.1 用戶層
用戶層方便用戶開發大數據應用。如前所述,我們將一個大數據應用表示為<;輸入數據,用戶代碼,配置參數>;。下面介紹用戶在開發應用時需要準備的輸入數據、用戶代碼和配置參數。
1.輸入數據
對于批式大數據處理框架,如Hadoop、Spark,用戶在提交作業(job)之前,需要提前準備好輸入數據。輸入數據一般以分塊(如以128MB為一塊)的形式預先存儲,可以存放在分布式文件系統(如Hadoop的分布式文件系統HDFS)和分布式Key-Value數據庫(如HBase[19])上,也可以存放到關系數據庫中。輸入數據在應用提交后會由框架進行自動分塊,每個分塊一般對應一個具體執行任務(task)。
對于流式大數據處理框架,如Spark Streaming[20]和Apache Flink[21],輸入數據可以來自網絡流(socket)、消息隊列(Kafka)等。數據以微批(多條數據形成一個微批,稱為mini-batch)或者連續(一條接一條,稱為continuous)的形式進入流式大數據處理框架。
對于大數據應用,數據的高效讀取常常成為影響系統整體性能的重要因素。為了提高應用讀取數據的性能,學術界研究了如何通過降低磁盤I/O來提高性能。例如,PACMan[22]根據一定策略提前將task所需的部分數據緩存到內存中,以提高task的執行性能。為了加速不同的大數據應用(如Hadoop、Spark等)之間的數據傳遞和共享,Tachyon[23](現在更名為Alluxio[24])構造了一個基于內存的分布式數據存儲系統,用戶可以將不同應用產生的中間數據緩存到Alluxio中,而不是直接緩存到框架中,這樣可以加速中間數據的寫入和讀取,同時也可以降低框架的內存消耗。
2.用戶代碼
用戶代碼可以是用戶手寫的MapReduce代碼,或者是基于其他大數據處理框架的具體應用處理流程的代碼。圖1.3展示了在Hadoop MapReduce上實現WordCount的用戶代碼,其用于計算字符出現的次數。在Hadoop MapReduce上用戶需要自定義map()和reduce()函數。除了map()和reduce()函數,用戶為了優化應用性能還定義了一個“迷你”的reduce(),叫作combine()。combine()可以在reduce()執行之前對中間數據進行聚合,這樣可以減少reduce()從各個節點獲取的輸入數據量,進而減少網絡I/O開銷和reduce()的壓力。combine()和reduce()的代碼實現一般是相同的。Hadoop MapReduce提供的map()和reduce()函數的處理邏輯比較固定單一,難以支持復雜數據操作,如常見的排序操作sort()、數據庫表的關聯操作join()等。為此,Dryad和Spark提供了更加通用的數據操作符,如flatMap()等。圖1.4展示了在Spark上實現WordCount的用戶代碼,對于同樣的應用處理邏輯,基于Spark的用戶代碼比基于Hadoop MapReduce的用戶代碼要更加簡潔。

圖1.3 在Hadoop MapReduce上實現WordCount的用戶代碼

圖1.4 在Spark上實現WordCount的用戶代碼
在實際系統中,用戶撰寫用戶代碼后,大數據處理框架會生成一個Driver程序,將用戶代碼提交給集群運行。例如,在Hadoop MapReduce中,Driver程序負責設定輸入/輸出數據類型,并向Hadoop MapReduce框架提交作業;在Spark中,Driver程序不僅可以產生數據、廣播數據給各個task,而且可以收集task的運行結果,最后在Driver程序的內存中計算出最終結果。圖1.5展示了在Spark平臺上Driver程序的運行模式。

圖1.5 在Spark平臺上Driver程序的運行模式
除了直接依賴底層操作手動撰寫用戶代碼,用戶還可以利用高層語言或者高層庫來間接產生用戶代碼。例如,在圖1.6中用戶可以使用類似SQL的Apache Pig腳本自動轉化生成Hadoop MapReduce代碼。通過這種方式生成的代碼是二進制的,map()和reduce()等函數代碼不可見。一些高層庫還提供了更簡單的方式生成用戶代碼,如使用Spark之上的機器學習庫MLlib時,用戶只需要選擇算法和設置算法參數,MLlib即可自動生成可執行的Spark作業了。

圖1.6 使用類似SQL的Apache Pig腳本自動轉化生成Hadoop MapReduce代碼
除了Apache Pig和MLlib,工業界和學術界也提出了很多更簡單、更通用的高層語言和高層庫,使用戶不用手寫較為煩瑣的map()和reduce()代碼。Google提出了FlumeJava[25],可以將多個MapReduce 作業以流水線(pipeline)的形式串聯起來,并提供了基本的數據操作符,如group()、join(),使常見的編程任務變得簡單。Cascading[26]是用Java編寫的,它是構建在Hadoop之上的一套數據操作函數庫。與FlumeJava類似,Cascading同樣為用戶提供了基本的數據操作符,可以方便用戶構建出較為復雜的數據流程。Google設計的Sawzall[27]是一種用于數據查詢的腳本語言,偏向統計分析。Sawzall腳本可以自動轉化為MapReduce作業執行,使得分析人員不用直接寫MapReduce程序就可以進行大數據分析。Google還設計了Tenzing[28],該模塊構建在MapReduce框架之上,支持SQL查詢語言,并實現高效、低延遲的數據查詢服務。微軟研究院也設計了自己的用戶層語言DryadLINQ[29]和SCOPE[30]。DryadLINQ將針對數據對象操作的LINQ程序轉化成Dryad任務,再利用Dryad框架來并行處理數據。SCOPE與Sawzall在一個層次上,可以將SQL腳本轉化成Dryad DAG任務,同樣利用Dryad框架來并行處理數據。SCOPE和Dryad是使用C#/C++實現的。
在用戶代碼的優化方面,PeriSCOPE[31]根據job piepeline的拓撲結構對用戶代碼采用類似編譯的優化措施,自動優化運行在SCOPE上的job性能。
3.配置參數
一個大數據應用可以有很多配置參數,如Hadoop支持200多個配置參數。這些配置參數可以分為兩大類:一類是與資源相關的配置參數。例如,buffer size定義框架緩沖區的大小,影響map/reduce任務的內存用量。在Hadoop中,map/reduce任務實際啟動一個JVM來運行,因此用戶還要設置JVM的大小,也就是heap size。在Spark中,map/reduce任務在資源容器(Executor JVM)中以線程的方式執行,用戶需要估算應用的資源需求量,并設置應用需要的資源容器個數、CPU個數和內存大小。
另一類是與數據流相關的配置參數。例如,Hadoop和Spark中都可以設置partition()函數、partition個數和數據分塊大小。partition()函數定義如何劃分map()的輸出數據。partition個數定義產生多少個數據分塊,也就是有多少個reduce任務會被運行。數據分塊大小定義map任務的輸入數據大小。
由于Hadoop/Spark框架本身沒有提供自動優化配置參數的功能,所以工業界和學術界研究了如何通過尋找最優配置參數來對應用進行性能調優。StarFish[32]研究了如何選擇性能最優的Hadoop應用配置參數,其核心是一個Just-In-Time的優化器,該優化器可以對Hadoop應用的歷史運行信息進行分析,并根據分析結果來預測應用在不同配置參數下的執行時間,以選擇最優參數。Verma等[33,34]討論了在給定應用完成時限的情況下,如何為Hadoop應用分配最佳的資源(map/reduce slot)來保證應用能夠在給定時限內完成。DynMR[35]通過調整任務啟動時間、啟動順序、任務個數來減少任務等待時間和由于過早啟動而引起的任務之間的資源競爭。MROnline[36]根據任務執行狀態,使用爬山法尋找最優的緩沖區大小和任務內存大小,以減少應用執行時間。Xu等[37]研究了如何離線估計MapReduce應用內存用量,采用的方法是先用小樣本數據運行應用,然后根據應用運行信息來估算應用在大數據上的實際內存消耗。SkewTune[38]可以根據用戶自定義的代價函數來優化數據劃分算法,在保持數據輸入順序的同時,減少數據傾斜問題。
1.4.2 分布式數據并行處理層
分布式數據并行處理層首先將用戶提交的應用轉化為較小的計算任務,然后通過調用底層的資源管理與任務調度層實現并行執行。
在Hadoop MapReduce中,這個轉化過程是直接的。因為MapReduce具有固定的執行流程(map—Shuffle—reduce),可以直接將包含map/reduce函數的作業劃分為map和reduce兩個階段。map階段包含多個可以并行執行的map任務,reduce階段包含多個可以并行執行的reduce任務。map任務負責將輸入的分塊數據進行map()處理,并將其輸出結果寫入緩沖區,然后對緩沖區中的數據進行分區、排序、聚合等操作,最后將數據輸出到磁盤上的不同分區中。reduce任務首先將map任務輸出的對應分區數據通過網絡傳輸拷貝到本地內存中,內存空間不夠時,會將內存數據排序后寫入磁盤,然后經過歸并、排序等階段產生reduce()的輸入數據。reduce()處理完輸入數據后,將輸出數據寫入分布式文件系統中。
與Hadoop MapReduce不同,Spark上應用的轉化過程包含兩層:邏輯處理流程、執行階段與執行任務劃分。如圖1.7所示,Spark 首先根據用戶代碼中的數據操作語義和操作順序,將代碼轉化為邏輯處理流程。邏輯處理流程包含多個數據單元和數據依賴,每個數據單元包含多個數據分塊。然后,框架對邏輯處理流程進行劃分,生成物理執行計劃。該計劃包含多個執行階段(stage),每個執行階段包含若干執行任務(task)。微軟的大數據編程框架DryadLINQ也提供類似的編譯過程,可以將用戶編寫的大數據應用程序(LINQ)編譯為可分布運行的Dryad執行計劃和任務。
為了將用戶代碼轉化為邏輯處理流程,Spark和Dryad對輸入/輸出、中間數據進行了更具體的抽象處理,將這些數據用一個統一的數據結構表示。在Spark中,輸入/輸出、中間數據被表示成RDD(Resilient Distributed Datasets,彈性分布式數據集)。在RDD上可以執行多種數據操作,如簡單的map(),以及復雜的cogroup()、join()等。一個RDD可以包含多個數據分區(partition)。parent RDD和child RDD之間通過數據依賴關系關聯,支持一對一和多對一等數據依賴關系。數據依賴關系的類型由數據操作的類型決定。如圖1.7所示,邏輯處理流程是一個有向無環圖(Directed Acyclic Graph,簡稱DAG圖),其中的節點是數據單元RDD,每個數據單元里面的圓形是指RDD的多個數據分塊,正方形專指輸入數據分塊。箭頭是在RDD上的一些數據操作(也隱含了parent RDD和child RDD之間的依賴關系)。

圖1.7 Spark應用轉化與執行流程
為了將邏輯處理流程轉化為物理執行計劃,Spark首先根據RDD之間的數據依賴關系,將整個流程劃分為多個小的執行階段(stage)。例如,圖1.7中邏輯處理流程被劃分為3個執行階段。之后,在每個執行階段形成計算任務(task),計算任務的個數一般與RDD中分區的個數一致。與MapReduce不同的是,一個Spark job可以包含很多個執行階段,而且每個執行階段可以包含多種計算任務,因此并不能嚴格地區分每個執行階段中的任務是map任務還是reduce任務。另外,在Spark中,用戶可以通過調用cache() 接口使框架緩存可被重用的中間數據。例如,當前job的輸出可能會被下一個job用到,那么用戶可以使用cache()對這些數據進行緩存。
1.4.3 資源管理與任務調度層
從系統架構上講,大數據處理框架一般是主-從(Master-Worker)結構。主節點(Master節點)負責接收用戶提交的應用,處理請求,管理應用運行的整個生命周期。從節點(Worker節點)負責執行具體的數據處理任務(task),并在運行過程中向主節點匯報任務的執行狀態。以Hadoop MapReduce為例(見圖1.8),在主節點運行的JobTracker進程首先接收用戶提交的job,然后根據job的輸入數據和配置等信息將job分解為具體的數據處理任務(map/reduce task),最后將task交給任務調度器調度運行。任務調度器根據各個從節點的資源總量與資源使用情況將map/reduce task分發到合適的從節點的TaskTracker中。TaskTracker進程會為每個map/reduce task啟動一個進程(在Hadoop MapReduce中是JVM進程)執行task的處理步驟。每個從節點可以同時運行的task數目由該節點的CPU個數等資源狀況決定。

圖1.8 Hadoop MapReduce框架的部署圖,其中不同job的task可以分布在不同機器上
另外,大數據處理服務器集群一般由多個用戶共享。如果多個用戶同時提交了job且集群資源充足,那么集群會同時運行多個job,每個job包含多個map/reduce task。在圖1.8中,Worker 1節點上運行了3個map task和1個reduce task,而Worker 2節點上運行了2個map task和1個reduce task。Worker 1節點上運行的map task和Worker 2節點上運行的map task可以分別屬于不同的job。
Spark支持不同的部署模式,如Standalone部署模式、YARN部署模式和Mesos部署模式等。其中Standalone部署模式與Hadoop MapReduce部署模式基本類似,唯一區別是Hadoop MapReduce部署模式為每個task啟動一個JVM進程運行,而且是在task將要運行時啟動JVM,而Spark是預先啟動資源容器(Executor JVM),然后當需要執行task時,再在Executor JVM里啟動task線程運行。
在運行大數據應用前,大數據處理框架還需要對用戶提交的應用(job)及其計算任務(task)進行調度。任務調度的主要目的是通過設置不同的策略來決定應用或任務獲得資源的先后順序。典型的任務調度器包含先進先出(FIFO)調度器、公平(Fair)調度器等。先進先出(FIFO)的任務調度器如圖1.9所示,其有兩種類型的調度器:一類是應用調度器,決定多個應用(app)執行的先后順序;另一類是任務調度器,決定多個任務(task)執行的先后順序。例如,Spark中一個stage可能包含多個map task,任務調度器可以根據數據本地化等信息決定這些task應調度到的執行節點。

圖1.9 先進先出(FIFO)的任務調度器
傳統的資源管理與任務調度層只針對某一種類型的應用進行資源管理和任務調度,而有一些新的研究對此進行了拓展。例如,UC Berkeley提出將資源管理和任務調度模塊構造為一個統一的集群資源管理系統,稱為“集群操作系統”[39]。該系統可以集中調度多個不同大數據處理框架中的job。再例如,第二代Hadoop的資源管理與調度框架YARN[40]能夠同時為集群中運行的多種框架(如Hadoop MapReduce,Spark)提供資源管理等服務。用戶可以直接將應用提交給YARN,并且在提交應用時指定應用的資源需求,如CPU個數和內存空間大小等。UC Berkeley提出的Mesos[41]與YARN類似,可以對集群上各種應用進行資源分配與任務調度,支持MapReduce作業、Spark作業、MPI[42]作業等。盡管YARN和Mesos提供了比較成熟的資源管理策略,可以統一分配、管理和回收不同節點上的計算資源。然而,它們有一個共同的局限,即資源分配策略的執行依賴用戶提供的資源需求與當前集群資源的監控信息,而不能根據應用的實際場景自動動態地調整資源分配。
1.4.4 物理執行層
大數據處理框架的物理執行層負責啟動task,執行每個task的數據處理步驟。在Hadoop MapReduce中,一個應用需要經歷map、Shuffle、reduce 3個數據處理階段。而在Spark中,一個應用可以有更多的執行階段(stage),如迭代型應用可能有幾十個執行階段,每個執行階段也包含多個task。另外,這些執行階段可以形成復雜的DAG圖結構。在物理執行時首先執行上游stage中的task,完成后執行下游stage中的task。
在Hadoop MapReduce中,每個task對應一個進程,也就是說每個task以JVM(Java虛擬機)的方式來運行,所以在Hadoop MapReduce中task的內存用量指的是JVM的堆內存用量。在Spark中,每個task對應JVM中的一個線程,而一個JVM可能同時運行了多個task,因此JVM的內存空間由task共享。在應用未運行前,我們難以預知task的內存消耗和執行時間,也難以預知JVM中的堆內存用量。
從應用特點來分析,我們可以將task執行過程中主要消耗內存的數據分為以下3類。
(1)框架執行時的中間數據。例如,map()輸出到緩沖區的數據和reduce task在Shuffle階段暫存到內存中的數據。
(2)框架緩存數據。例如,在Spark中,用戶調用cache()接口緩存到內存中的數據。
(3)用戶代碼產生的中間計算結果。例如,用戶代碼調用map()、reduce()、combine(),在處理輸入數據時會在內存中產生中間計算結果。
很多大數據處理框架在設計時就考慮了內存的使用問題,并進行了相應的優化設計。例如,Spark框架是基于內存計算的,它將大量的輸入數據和中間數據都緩存到內存中,這種設計能夠有效地提高交互型job和迭代型job的執行效率。
由于大數據應用的內存消耗量很大,所以當前許多研究關注如何改進大數據處理框架的內存管理機制,以減少應用內存消耗。例如,UCSD提出了ThemisMR [43],重新設計了MapReduce的數據流及內存管理方案,有效地將中間數據磁盤讀寫次數降低為兩次,從而提高了job的執行性能。Tachyon構造了一個基于內存的分布式數據存儲系統,主要用于在不同Hadoop/Spark應用之間共享數據。用戶可以將不同應用產生的中間數據緩存到Tachyon中而非直接緩存到框架中,以降低框架的內存消耗。FA?ADE[44]提供了用于降低用戶代碼內存消耗的代碼編譯和執行環境。FA?ADE的設計目的是將數據存儲和數據操作分開,方法是將數據存放到JVM的堆外內存中,將對堆內對象的數據操作轉化為對FA?ADE的函數調用。對于Java對象本身產生的overhead(也就是Java對象自身所需的header和reference),Bu等[45]提出了一些優化方法,如將大量數據對象(record object)合并為少量的大的數據對象。Lu等[46]提出了基于對象生命周期的內存管理機制,可以根據數據對象類型和生命周期,將對象分配到不同隊列進行分配和回收。Xu等[47]針對Hadoop/Spark等大數據框架經常出現的垃圾回收時間長、頻繁等問題,通過實驗分析主流Java垃圾回收算法在大數據環境下存在的性能缺陷,提出了垃圾回收算法的3種改進方法。Yak[48]提出了一種混合GC算法,將堆內存劃分為控制流區域和數據流區域,前者使用傳統GC算法回收控制流代碼的內存對象,后者使用基于時域區域(epoch-based region)的內存管理,并根據數據對象生命周期來回收內存。Spark社區采用堆外內存管理機制和基于堆外內存的Shuffle機制,提出了鎢絲計劃[49]。
另外,如何預測大數據應用的執行時間也被一些研究人員關注。如果能夠預測出job的執行時間可以為任務調度器提供決策依據,則方便用戶了解job的執行進度。華盛頓大學的研究人員提出了KAMD [50]和ParaTimer [51],可以根據job執行的歷史信息并結合正在運行的job處理的數據量,使用啟發式方法來估算job剩余的執行時間。UIUC的研究人員提出了ARIA [33],細粒度地分析了單個MapReduce job的執行階段,并提出了基于上下界的時間估算公式,可以通過job的歷史信息或調試信息來估算執行時間。華盛頓大學的研究人員后來又提出了PerfXplain [52],通過對比兩個包含同樣處理邏輯的job的性能指標,來解釋兩個job執行效率不同的原因。
- GitHub Essentials
- PyTorch深度學習實戰:從新手小白到數據科學家
- App+軟件+游戲+網站界面設計教程
- Learning JavaScriptMVC
- Access 2016數據庫技術及應用
- 大數據算法
- Creating Dynamic UIs with Android Fragments(Second Edition)
- Mastering Machine Learning with R(Second Edition)
- SQL應用及誤區分析
- 數據科學工程實踐:用戶行為分析與建模、A/B實驗、SQLFlow
- 數據指標體系:構建方法與應用實踐
- 掌中寶:電腦綜合應用技巧
- 算力芯片:高性能CPU/GPU/NPU微架構分析
- Scratch Cookbook
- 數字孿生