官术网_书友最值得收藏!

3.3 Hadoop基本組件

如圖3-2所示,Hadoop實際是由三個不同的組件構成:

圖3-2 Hadoop基本組件

● HDFS:Hadoop分布式文件系統。

● YARN:一個資源調度框架。

● MapReduce:一個分布式處理框架。

程序員可以聯合使用這三個組件構建分布式系統。

3.3.1 HDFS

HDFS是一個運行在通用硬件設備之上的分布式文件系統。HDFS是高度容錯的,在廉價的硬件上部署。HDFS提供以高吞吐量訪問應用數據的能力,非常適合擁有大數據集的應用。HDFS放寬了一些POSIX的需求,允許對文件系統數據的流式訪問。HDFS源自為Apache Nutch Web搜索引擎項目建立的框架,是Apache Hadoop的核心項目。

1. HDFS的目標

● 硬件容錯。HDFS假定發生硬件故障是一個常態。硬件損壞的情況通常比預想出現的更加頻繁。一個HDFS實例可能由成百上千的服務器組成,每個機器上存儲文件系統的部分數據。事實上一個HDFS包含有大量的硬件組件,而在如此之多的硬件中,出現問題的概率就非常大了,也可以說,HDFS中總會有部分組件處于不可用狀態。因此,檢測硬件錯誤并從有問題的硬件快速自動恢復,就成為HDFS架構的核心目標。

● 流式數據訪問。運行在HDFS上的應用程序需要流式訪問它們的數據集。簡單地說,流式訪問就是對數據邊讀取邊處理,而不是將整個數據集讀取完成后再開始處理。這與運行在典型普通文件系統上的程序不同。HDFS被設計成更適合批處理操作,而不是讓用戶交互式地使用。它強調的是數據訪問的吞吐量而不是低延時。POSIX的許多硬性要求并不適合HDFS上的應用程序,因為POSIX的某些關鍵語義影響了數據吞吐量的提升。

● 支持大數據集。部署在HDFS上的應用要處理很大的數據集。HDFS中一個典型文件的大小是幾GB到幾TB。HDFS需要支持大文件,它應該提供很大的數據帶寬,能夠在單一集群中擴展幾百甚至數千個節點,并且一個HDFS實例應該能夠支持幾千萬個文件。

● 簡單的一致性模型。HDFS應用程序訪問文件是一次寫多次讀模式。文件一旦被創建,對該文件只能執行追加或徹底清除操作。追加的內容只能寫到文件尾部,而文件中已有的任何內容都不能被更新。這些設定簡化了數據一致性問題并能使數據訪問的吞吐量更高。MapReduce或Web爬蟲應用都適合于這種模型。

● 移動計算而不是移動數據。一個應用的計算請求,在它所操作的數據附近執行時效率會更高,尤其是在數據集非常大的情況下更是如此。此時網絡的競爭最小,系統整體的吞吐量會得到提高。通常,將計算移動到臨近數據的位置,比把數據移動到應用運行的位置要好。HDFS為應用程序提供接口,把計算移動到數據所在位置。

● 便捷訪問異構的軟硬件平臺。HDFS能夠很容易地從一個平臺遷移到另一個,這種便利性使HDFS為大量應用程序所采用。

2. HDFS架構

如圖3-3所示,HDFS是主/從架構。一個HDFS集群有一個NameNode進程,它負責管理文件系統的命名空間,這里所說的命名空間是指一種層次化的文件組織形式。NameNode進程控制被客戶端訪問的文件,運行NameNode進程的節點是HDFS的主節點。HDFS還有許多DataNode進程,通常集群中除NameNode外的每個節點都運行一個DataNode進程,它管理所在節點上的存儲。運行DataNode進程的節點是HDFS的從節點,又稱工作節點。HDFS維護一個文件系統命名空間,并允許將用戶數據存儲到文件中。在系統內部,一個文件被分成多個數據塊,這些數據塊實際被存儲到DataNode所在節點上。NameNode不僅執行文件系統命名空間上的打開文件、關閉文件、文件和目錄重命名等操作,還要維護數據塊到DataNode節點的映射關系。DataNode不僅負責響應文件系統客戶端的讀寫請求,還依照NameNode下達的指令執行數據塊的創建、刪除和復制等操作。

圖3-3 HDFS架構

NameNode和DataNode進程運行在通用的機器上,這些機器通常安裝Linux操作系統。HDFS是用Java語言開發的,任何支持Java的機器都可以運行NameNode或DataNode進程。使用平臺無關的Java語言,意味著HDFS可以部署在大范圍的主機上。典型的部署是一臺專用服務器作為主節點,只運行NameNode進程。集群中的其他機器作為從節點,每個上面運行一個DataNode進程。一臺主機上不能同時運行多個DataNode進程。

集群中NameNode的存在極大地簡化了系統架構。NameNode所在的主節點是HDFS的仲裁人和所有元數據的知識庫。這樣的系統設計下,用戶數據永遠不會存儲在主節點上。

HDFS支持傳統的層次形文件組織。用戶或應用可以創建目錄,也可以在目錄中存儲文件。HDFS命名空間的層次結構與其他文件系統類似,能執行創建、刪除文件,把一個目錄中的文件移動到另外的目錄中,修改文件名稱的操作。HDFS支持配置用戶配額和訪問權限,但不支持軟連接和硬連接。命名空間及其屬性的任何變化都被NameNode所記錄。應用可以指定一個HDFS文件的副本數。文件的副本數被稱為該文件的復制因子,這個信息被NameNode存儲。

3.數據復制

HDFS可以保證集群中文件存儲的可靠性。它把文件分解成一個由數據塊構成的序列,每個數據塊有多個副本,這種數據冗余對容錯非常關鍵。當一個數據塊損壞時,不會造成數據丟失。數據塊的大小和復制因子對每個文件都是可配的。

一般情況下,HDFS中一個文件的所有數據塊,除最后一個塊外,都有同樣的大小。但是,HDFS支持變長的數據塊,就是說一個文件有可能包含兩種大小的數據塊。當用戶重新配置了文件的塊大小,然后向該文件中追加數據,這時HDFS不會填充文件的最后一個塊,而是用新的尺寸創建新塊存儲追加的數據,這種情況下文件中就會同時存在兩種大小的塊。

應用可以指定一個文件的副本數,即復制因子。可以在文件創建時指定復制因子,這個復制因子的配置以后是可以改變的。除了追加和清除操作外,HDFS中的文件在任何時候都是嚴格地一次寫入。

NameNode做出的所有操作,都會考慮數據塊的復制。它周期性地接收集群中每個DataNode發出的心跳和塊報告。接收到心跳說明DataNode工作正常。塊報告包含該DataNode節點上所有數據塊的列表。

HDFS使用所謂的“機架感知”策略放置數據塊副本。這是一個需要進行大量實驗并不斷調整的特性,也是HDFS與其他分布式文件系統的主要區別。機架感知的目的是要提升數據可靠性、可用性和網絡帶寬的利用率。當前HDFS版本的實現只是實施副本放置策略的第一步,主要是為了驗證該策略在生產系統上的有效性,同時收集更多的行為信息,以供繼續研究和測試更好的策略。

在此簡單說一下可靠性與可用性的區別??煽啃允侵赶到y可以無故障地持續運行,而可用性指的是系統在任何給定的時刻都能工作。例如,如果系統每月崩潰1分鐘,那么它的可用性是99.998%,但是它還是非常不可靠的。與之相反,如果一個系統從來不崩潰,但是每年要停機兩星期,那么它是高度可靠的,但是可用性只有96%。

一個大型HDFS集群中會包含很多計算機,這些機器分布于多個機架上。位于不同機架上的兩個節點通過網絡交換機進行通信。大多數情況下,同一個機架上機器間的網絡帶寬會高于不同機架上的機器。

NameNode通過Hadoop機架感知策略確定每個DataNode所屬的機架ID。一種簡單的策略是在每個機架上放置一份數據塊的副本,這種設計即使在整個一個機架(甚至多個機架)失效的情況下,也能防止數據丟失。該策略還有一個優點是,可以利用多個機架的帶寬讀取數據。將數據副本平均分布于集群的所有機架中,當集群中的一個組件(節點、機架等)失效時,重新負載均衡也很簡單。但是很顯然,寫入數據時需要把一個數據塊傳輸到每一個機架,這樣做的寫入成本太高了。

在一個復制因子為3的普通場景中,HDFS把數據塊的第一個副本放置在本地機架的一個節點上,另一個副本放置在本地機架的另外一個節點上,最后一個副本放置在另外一個機架的節點上。這樣只寫了兩個機架,節省了一個機架的寫入流量,提升了寫入性能。該策略的前提是認可這樣一種假設:機架失效的可能性比機器失效的可能性小得多。因此這種策略并不會影響數據的可靠性和可用性。然而它卻減少了讀取數據的整體帶寬,因為此時只能利用兩個機架的帶寬而不是三個。使用這種策略,一個文件的副本不是平均分布于所有機架,三分之一在同一個節點,三分之二在同一個機架,剩下的三分之一分布在其他機架上。該策略提升了寫的性能,同時沒有損害數據可靠性或讀的性能。

如果復制因子大于3,第4個及其后面的副本被隨機放置,但每個機架的副本數量要低于上限值,上限值的計算公式是:((副本數 -1)/(機架數 + 2))取整。

由于NameNode不允許一個DataNode上存在一個數據塊的多份副本,因此一個數據塊的最大副本數就是當時DataNode節點的個數。

在HDFS支持選擇存儲類型和存儲策略后,NameNode實施策略時除了依照上面描述的機架感知外,還考慮到放置副本的其他問題。NameNode首先按機架感知策略選擇存儲節點,然后檢查該候選節點是否滿足文件的存儲需求。如果候選節點不支持文件的存儲類型,NameNode就會去尋找其他節點。如果在第一條查找路徑上沒有找到足夠的節點來存放副本,那么NameNode會再選擇第二條路徑繼續查找可用于存儲該文件類型的節點。當前默認的副本放置策略就是這樣工作的。

為了使全局的帶寬消耗和讀延遲降到最小,在選擇副本時,HDFS總是選擇距離讀請求最近的存儲節點。如果在讀請求所在節點的同一個機架上有需要的數據副本,則HDFS盡量選擇它來滿足讀請求。如果HDFS集群跨越多個數據中心,那么存儲在本地數據中心的副本會優先于遠程副本被選擇。

當Hadoop的NameNode節點啟動時,會進入一種稱為安全模式的特殊狀態。NameNode處于安全模式時不會進行數據塊的復制操作。此時NameNode會接收來自DataNode的心跳和塊報告消息。塊報告中包含該DataNode節點所保存的數據塊列表,每個數據塊有一個特定的最小副本數。NameNode檢測到一個數據塊達到了最小副本數時,就認為該數據塊是復制安全的。當檢測到的復制安全的數據塊達到一定比例(由dfs.safemode.threshold.pct參數指定)30秒后,NameNode退出安全模式。然后NameNode會確定一個沒有達到最小副本條件的數據塊列表,并將這些數據塊復制到其他DataNode節點,直至達到最小副本數。

4.文件系統元數據持久化

HDFS命名空間的元數據由NameNode負責存儲。NameNode使用一個叫做EditLog的事務日志持久化記錄文件系統元數據的每次變化。例如,在HDFS中創建一個新文件,NameNode就會向EditLog中插入一條記錄標識這個操作。同樣,改變文件的復制因子也會向EditLog中插入一條記錄。NameNode使用本地主機上的一個操作系統文件存儲EditLog。整個文件系統的命名空間,包括數據塊和文件的映射關系、文件系統屬性等,存儲在一個叫做FsImage的文件中。FsImage也是一個NameNode節點的本地操作系統文件。

NameNode在內存中保留一份完整的文件系統命名空間映像,其中包括文件和數據塊的映射關系。啟動或者達到配置的閾值觸發了檢查點時,NameNode把FsImage和EditLog從磁盤讀取到內存,對內存中的FsImage應用EditLog里的事務,并將新版本的FsImage寫回磁盤,然后清除老的EditLog事務條目,因為它們已經持久化到FsImage了。這個過程叫做檢查點。檢查點的目的是確認HDFS有一個文件系統元數據的一致性視圖,這是通過建立一個文件系統元數據的快照并保存到FsImage實現的。盡管可以高效讀取FsImage,但把每次FsImage的改變直接寫到磁盤的效率是很低的。替代做法是將每次的變更持久化到Editlog中,在檢查點期間再把FsImage刷新到磁盤。檢查點有兩種觸發機制,按以秒為單位的時間間隔(dfs.namenode.checkpoint.period)觸發,或者達到文件系統累加的事務值(dfs.namenode.checkpoint.txns)時觸發。如果兩個參數都設置,兩種條件都會觸發檢查點。熟悉數據庫的讀者對檢查點這一概念一定不會陌生,NameNode的FsImage和EditLog,其作用與關系數據庫中的數據文件、重做日志文件非常類似。

DataNode把HDFS文件里的數據存儲到本地文件系統,是聯系本地文件系統和HDFS的紐帶。DataNode將HDFS的每個數據塊存到一個單獨的本地文件中,這些本地文件并不都在同一個目錄中。DataNode會根據實際情況決定一個目錄中的文件數,并在適當的時候建立子目錄。本地文件系統不能支持在一個目錄里創建太多的文件。DataNode啟動時會掃描本地文件系統,生成一個該節點上與本地文件對應的所有HDFS數據塊的列表,并把列表上報給NameNode,這個報告就是前面所說的塊報告。

5. HDFS示例

如圖3-4所示,有一個256MB的文件,集群中有4個節點,那么默認情況下,當把文件上傳到集群時,系統會自動做三件事情:

圖3-4 HDFS示例

● HDFS會將此文件分成四個64MB的數據塊。

● 每個塊有三個復制。

● 數據塊被分散到集群節點中,確保對于任意數據塊,沒有兩個塊復制在相同的節點上。

這個簡單的數據分布算法是Hadoop成功的關鍵,它顯著提高了HDFS集群在硬件失效時的可用性,并且使MapReduce計算框架成為可能。

3.3.2 MapReduce

MapReduce是一個分布式計算軟件框架,支持編寫處理大數據量(TB以上)的應用程序。MapReduce程序可以在幾千個節點組成的集群上并行執行。集群節點使用通用的硬件,以硬件冗余保證系統的可靠性和可用性,而MapReduce框架則從軟件上保證處理任務的可靠性和容錯性。

在Hadoop中每個MapReduce應用程序被表示成一個作業,每個作業又被分成多個任務。應用程序向框架提交一個MapReduce作業,作業一般會將輸入的數據集合分成彼此獨立的數據塊,然后由map任務以并行方式完成對數據分塊的處理??蚣軐ap的輸出進行排序,之后輸入到reduce任務。MapReduce作業的輸入輸出都存儲在一個如HDFS的文件系統上??蚣苷{度并監控任務的執行,當任務失敗時框架會重新啟動任務。

通常情況下,集群中的一個節點既是計算節點,又是存儲節點。也就是說,MapReduce框架和HDFS共同運行在多個節點之上。這種設計效率非常高,框架可以在數據所在的節點上調度任務執行,大大節省了集群節點間的整體帶寬。

Hadoop 0.20.0和之前的版本里,MapReduce框架由JobTracker和TaskTracker組成。JobTracker是一個運行在主節點上的后臺服務進程,啟動之后會一直監聽并接收來自各個TaskTracker發送的心跳,包括資源使用情況和任務運行情況等信息。TaskTracker是運行在從節點上的進程,它一方面從JobTracker接收并執行各種命令,包括提交任務、運行任務、殺死任務等,另一方面將本地節點上各個任務的狀態通過心跳,周期性地匯報給JobTracker。TaskTracker與JobTracker之間采用RPC協議進行通信。為解決MapReduce框架的性能瓶頸,從0.23.0版本開始,Hadoop的MapReduce框架完全重構,使用YARN管理資源,框架的組成變為三個部分:一個主節點上的資源管理器ResourceManager,每個從節點上的節點管理器NodeManager,每個應用程序對應的MRAppMaster。

一個最簡單的MapReduce應用程序,只需要指定輸入輸出的位置,并實現適當的接口或抽象類,就可以提供map和reduce的功能。提交應用程序時,需要指定依賴的包、相關環境變量和可選的MapReduce作業配置參數。Hadoop作業客戶端將程序提交的MapReduce作業及其相關配置發送給ResourceManager, ResourceManager把作業分解成任務,然后把任務和配置信息分發給工作節點,調度并監控任務的執行,同時向作業客戶端提供任務狀態和診斷信息。

盡管Hadoop框架是用Java語言實現的,但MapReduce應用程序卻不一定要用Java來編寫。Hadoop Streaming提供了一個便于進行MapReduce編程的工具包,使用它可以基于一些可執行命令、腳本語言或其他編程語言來實現MapReduce。Hadoop Pipes是一個C++ API,允許用戶使用C++語言編寫MapReduce應用程序。

1.處理步驟

MapReduce數據處理分為Split、Map、Shuffle和Reduce 4個步驟。應用程序實現Map和Reduce步驟的邏輯,Split和Shuffle步驟由框架自動完成。

(1)Split步驟

在執行MapReduce之前,原始數據被分割成若干split,每個split作為一個map任務的輸入,在map執行過程中split會被分解成一個個記錄(鍵/值對), map會依次處理每一個記錄。引入split的概念是為了解決記錄溢出問題。假設一個map任務處理一個塊中的所有記錄,那么當一個記錄跨越了塊邊界時怎么辦呢?HDFS的塊大小是嚴格的64MB(默認值,當然也可能是配置的其他值),而且HDFS并不關心文件塊中存儲的內容是什么,因此HDFS無法評估何時一個記錄跨越了多個塊。

為了解決此問題,Hadoop使用了一種數據塊的邏輯表示,叫做input splits。當MapReduce作業客戶端計算input splits時,它會計算出塊中第一個和最后一個完整記錄的位置。如果最后一個記錄是不完整的,input split中包含下一個塊的位置信息,還有完整記錄所需的字節偏移量。

MapReduce數據處理是由input splits概念驅動的。為特定應用計算出的input splits數量決定了mapper任務的數量。ResourceManager盡可能把每個map任務分配到存儲input split的從節點上,以此來保證input splits被本地處理。

(2)Map步驟

一個MapReduce應用逐一處理input splits中的每一條記錄。input splits在上一步驟被計算完成之后,map任務便開始處理它們,此時Resource Manager的調度器會給map任務分配它們處理數據所需的資源。

對于文本文件,默認為文件里的每一行是一條記錄,一行的內容是鍵/值對中的值,從split的起始位置到每行的字節偏移量,是鍵/值對中的鍵。之所以不用行號當作鍵,是因為當一個大的文本文件被分成了許多數據塊,當作很多splits處理時,行號的概念本身就是存在風險的。每個split中的行數不同,因此在處理一個split之前就計算出行數并不容易。但字節偏移量是精確的,因為每個數據塊都有相同的固定的字節數。

map任務處理每一個記錄時,會生成一個新的中間鍵/值對,這個鍵和值可能與輸入對完全不同。map任務的輸出就是這些中間鍵/值對的全部集合。為每個map任務生成最終的輸出文件前,先會依據鍵進行分區,以便將同一分組的數據交給同一個reduce任務處理。在非常簡單的應用場景下,可能只有一個reduce任務,此時map任務的所有輸出都會被寫入一個文件。但是在有多個reduce任務的情況下,每個map任務會基于分區鍵生成多個輸出文件??蚣苣J的分區函數(HashPartitioner)滿足大多數情況,但有時也需要定制自己的partitioner,例如需要對mapper的結果集進行二次排序時。

在應用程序中最好對map任務的輸出文件進行壓縮以獲得更優的性能。

(3)Shuffle步驟

Map步驟之后,開始Reduce處理之前,還有一個重要的步驟叫做Shuffle。MapReduce保證每個reduce任務的輸入都是按照鍵排好序的。系統對map任務的輸出執行排序和轉換,并映射為reduce任務的輸入,此過程就是Shuffle,它是MapReduce的核心處理過程。在Shuffle中,會把map任務輸出的一組無規則的數據盡量轉換成一組具有一定規則的數據,然后把數據傳遞給reduce任務運行的節點。Shuffle橫跨Map端和Reduce端,在Map端包括spill過程,在Reduce端包括copy和sort過程,如圖3-5所示。

圖3-5 Shuffle過程

需要注意的是,只有當所有的map任務都結束時,reduce任務才會開始處理。如果一個map任務運行在一個配置比較差的從節點上,它的滯后會影響MapReduce作業的性能。為了避免這種情況的發生,MapReduce框架使用了一種叫做推測執行的方法。所謂的推測執行,就是當所有task都開始運行之后,MRAppMaster會統計所有任務的平均進度,如果某個task所在的task node因為硬件配置比較低或者CPU load很高等原因,導致任務執行比總體任務的平均執行慢,此時MRAppMaster會啟動一個新的任務(duplicate task),原有任務和新任務哪個先執行完就把另外一個kill。另外,根據mapreduce job冪等的特點,同一個task執行多次的結果是一樣的,所以task只要有一次執行成功,job就是成功的,被kill的task對job的結果沒有影響。如果你監測到任務執行成功,但是總有些任務被kill,或者map任務的數量比預期的多,可能就是此原因所在。

map任務的輸出不寫到HDFS,而是寫入map任務所在從節點的本地磁盤,這個中間結果也不會在Hadoop集群間進行復制。

(4)Reduce步驟

Reduce步驟負責數據的計算歸并,它處理Shuffle后的每個鍵及其對應值的列表,并將一系列鍵/值對返回給客戶端應用。有些情況下只需要Map步驟的處理就可以為應用生成輸出結果,這時就沒有Reduce步驟。例如,將全部文本轉換成大寫這種基本的轉化操作,或者從視頻文件中抽取關鍵幀等。這些數據處理只要Map階段就夠了,因此又叫map-only作業。但在大多數情況下,到map任務輸出結果只完成了一部分工作。剩下的任務是對所有中間結果進行歸并、聚合等操作,最終生成一個匯總的結果。

與map任務類似,reduce任務也是逐條處理每一個鍵。通常reduce為每個處理的鍵返回單一鍵/值對,但這個結果鍵/值對可能會比原始輸入的鍵/值對小得多。當reduce任務完成后,每個reduce任務的輸出會寫入一個結果文件,并將結果文件存儲到HDFS中,HDFS會自動生成結果文件數據塊的副本。

Resource Manager會盡量給map任務分配資源,確保input splits被本地處理,但這個策略不適用于reduce任務。Resource Manager假定map的結果集需要通過網絡傳輸給reduce任務進行處理。這樣實現的原因是,要對成百上千的map任務輸出進行Shuffle,沒有切實可行的方法為reduce實施相同的本地優先策略。

2.邏輯表示

MapReduce計算模型一般包括兩個重要的階段:Map是映射,負責數據的過濾分發;Reduce是規約,負責數據的計算歸并。Map函數和Reduce函數都是通過鍵/值對來操作數據的。Map函數將輸入數據按數據的類型和一定的規則進行分解,并返回一個中間鍵/值對的列表,如下所示:

    Map(k1, v1) → list(k2, v2)

Reduce函數處理Map階段產生的組,按鍵依次產生歸并后的值的集合,如下所示:

    Reduce(k2, list (v2)) → list(v3)

通常一次Reduce調用會返回一個v3值或返回空,盡管允許一次調用返回多個值。所有Reduce調用的返回值集成在一起作為請求的結果列表。

為了實現MapReduce,僅僅有鍵/值對的抽象是不夠的。MapReduce的分布式實現還需要一個Map和Reduce兩個執行階段的“連接器”,它可以是一個分布式文件系統,如HDFS,也可以是從mapper到reducer的數據流。

既然本書講的是數據倉庫,我們就來看一個SQL的例子。想象有一個11億人口數據的數據庫,要按年齡分組統計每個年齡的平均社會關系數。查詢語句如下:

    selectage, avg(contacts)
      fromsocial.person
     group byage
     order byage;

使用MapReduce, K1鍵可以是1到1100的整數,每個整數表示一個100萬條人口記錄的批次號。K2鍵是人口的年齡。這個統計可以使用下面的Map/Reduce函數偽代碼實現:

    function Map is
        input: integer K1 between 1 and 1100, representing a batch of 1 million social.person
    records
        for each social.person record in the K1 batch do
            let Y be the person's age
            let N be the number of contacts the person has
            produce one output record (Y, (N,1))
        repeat
    end function

    function Reduce is
        input: age (in years) Y
        for each input record (Y, (N, C)) do
            Accumulate in S the sum of N*C
            Accumulate in Cnew the sum of C
        repeat
        let A be S/Cnew
        produce one output record (Y, (A, Cnew))
    end function

MapReduce系統將線性增長到1100個Map進程,每個進程處理100萬條輸入記錄。在Map步驟里,將產生11億條(Y,(N,1))記錄,Y表示年齡,假設取值范圍在8到103之間。MapReduce系統將線性產生96個Reduce進程執行中間鍵/值對的Shuffle操作。每個Map進程產生的100萬條記錄,經過輸出、排序、溢寫、合并等map端的Shuffle操作,輸出到96個Reduce進程,Reduce端再進行合并排序,計算我們實際需要的每個年齡的平均社會關系人數。Reduce步驟只會產生96條(Y, A)的輸出記錄,它們以Y值排序,被記錄到最終的結果文件。

記住,盡管一個reduce任務可能已經獲得了所有map任務的輸出,但是只有在所有的map任務都結束后,reduce任務才開始執行,換句話說,要保持對map任務的計數。這一點至關重要,否則我們計算的平均值就是錯誤的。例如,經過map端Shuffle操作的輸出如下:

    -- map output #1: age, quantity of contacts
    10, 9
    10, 9
    10, 9

    -- map output #2: age, quantity of contacts
    10, 9
    10, 9

    -- map output #3: age, quantity of contacts
    10, 10

如果在前兩個map輸出完成就開始reduce計算任務,此時得到的結果是:10歲的平均社會關系人數是9((9+9+9+9+9)/5):

    -- reduce step #1: age, average of contacts
    10, 9

這時第三個map輸出完成,繼續計算平均值時,我們得到的結果是9.5((9+10)/2),但這個數是錯誤的,正確的結果應該是9.166((9*3+9*2+10*1)/(3+2+1))。

3.應用程序定義

MapReduce框架中可以由應用程序定義的部分主要有:

● 輸入程序:輸入程序將輸入的文件分解成適當大小的‘splits'(實踐中典型的是64MB或128MB),框架為每一個split賦予一個Map任務。輸入程序從穩定存儲(一般是分布式文件系統)讀取數據并生成鍵/值對。輸入程序最常見的例子是讀取一個目錄下的所有文件,并將每一行作為一個記錄返回。

● map函數:map函數處理輸入的鍵/值對,生成零個或多個中間輸入鍵/值對。map函數的輸入與輸出可以是不同的類型。例如單詞計數應用,map函數分解每行的單詞并輸出每個單詞的鍵/值對。單詞是鍵,單詞的實例數是值。

● 分區函數:每個map函數的輸出通過應用定義的分區函數分配給特定的reduce任務。分區函數的輸入是鍵、值和reduce任務的數量,輸出reduce任務的索引值。典型的分區函數是取鍵的哈希值,或對鍵的哈希值用reduce任務數取模。選擇適當的分區函數對于數據在reduce間的平均分布和負載均衡非常重要。

● 比較函數:通過應用的比較函數從Map運行的節點為reduce拉取數據并排序。

● reduce函數:框架按鍵的排序為每個唯一的鍵調用一次應用的reduce函數。reduce函數會在與鍵相關的多個值中迭代,然后生成零個或多個輸出。例如單詞計數應用,reduce函數獲取到輸入值,對它們進行匯總計算,并為每個單詞及其計數值生成單個輸出項。

● 輸出程序:輸出程序負責將reduce的輸出寫入穩定存儲。

4. MapReduce示例

MapReduce是一個分布式編程模式。它的主要思想是,將數據Map為一個鍵/值對的集合,然后對所有鍵/值對按照相同鍵值進行Reduce。為了直觀地理解這種編程模式,看一個在10TB的Web日志中計算“ERROR”個數的例子。假設Web日志輸出到一系列文本文件中,文件中的每一行代表一個事件,以ERROR、WARN或INFO之一開頭,表示事件級別。一行的其他部分由事件的時間戳及其描述組成,如圖3-6所示。

圖3-6 Web日志中的文本

我們可以非常容易地使用MapReduce模式計算“ERROR”的數量。如圖3-7所示,在map階段,識別出每個以“ERROR”開頭的行并輸出鍵值對<ERROR, 1>。在reduce階段我們只需要對map階段生成的<ERROR, 1>對進行計數。

圖3-7 MapReduce統計‘ERROR’的個數

對這個例子稍微做一點擴展,現在想知道日志中ERROR、WARN、INFO分別的個數。如圖3-8所示,在map階段檢查每一行并標識鍵值對,如果行以“INFO”開頭,鍵值對為<INFO, 1>,如果以“WARN”開頭,鍵值對為<WARN, 1>,如果以“ERROR”開頭,鍵值對為<ERROR, 1>。在reduce階段,對每個map階段生成的唯一鍵值“INFO”“WARN”和“ERROR”進行計數。

圖3-8 MapReduce分別統計‘ERROR'、‘WARN'、‘INFO’的個數

通過上面簡單的示例我們已經初步理解了MapReduce編程模式是如何工作的,現在看一下MapReduce是怎么實現的。如圖3-9所示,HadoopMapReduce的實現分為split、map、shuffle和reduce 4步。開發者只需要在Mappers和Reducers的Java類中編碼map和reduce階段的邏輯,框架完成其余的工作。

圖3-9 MapReduce執行步驟

MapReduce的處理流程如下:

● HDFS分布數據。

● 向YARN請求資源以建立mapper實例。

● 在可用的節點上建立mapper實例。

● 對mappers的輸出進行混洗,確保一個鍵對應的所有值都分配給相同的reducer。

● 向YARN請求資源以建立reducer實例。

● 在可用的節點上建立reducer實例。

表面上看,似乎MapReduce能處理的情況十分有限,但實際結果卻是,正如前面統計平均社會關系人數的例子所示,大多數SQL操作都可以被表達成一連串的MapReduce操作,并且Hadoop生態圈的工具可以自動把SQL轉化成MapReduce程序處理,所以對于熟悉SQL的開發者來說,不必再自己實現Mapper或者Reducer。

3.3.3 YARN

YARN是一種集群管理技術,全稱是Yet Another Resource Negotiator。從圖3-10可以看到,YARN是第二代Hadoop的一個關鍵特性。Apache開始對YARN的描述是,為MapReduce重新設計的一個資源管理器,經過不斷地發展和改進,現在的YARN更像是一個支持大數據應用的分布式操作系統。

圖3-10 Hadoop1.0與Hadoop2.0

2012年,YARN成為ApacheHadoop的子項目,有時也叫MapReduce2.0。它對老的MapReduce進行重構,將資源管理和調度功能與MapReduce的數據處理組件解耦,以使Hadoop可以支持更多的數據處理方法和更廣泛的應用。例如,現在的Hadoop集群可以同時執行MapReduce批處理作業、交互式查詢和流數據應用。最初的Hadoop1.x中,HDFS和MapReduce被緊密聯系在一起,MapReduce并行執行Hadoop系統上的資源管理、作業調度和數據處理。YARN使用一個中心資源管理器給應用分配Hadoop系統資源,多個節點管理器監控集群中各個節點的操作處理情況。

1.第一代Hadoop的問題

第一代Hadoop是共享HDFS實例的MapReduce集群模型。這種共享計算架構的主要組件是JobTracker和TaskTracker。JobTracker是一個中央守護進程,負責運行集群上的所有作業。用戶程序(JobClient)提交的作業信息會發送到JobTracker中,JobTracker與集群中的其他節點通過心跳定時通信,管理哪些任務應該運行在哪些節點上,還負責所有任務的失敗重啟操作。TaskTracker是系統里的從進程,它監視自己所在機器的資源情況,并根據JobTracker的指令來執行任務。TaskTracker同時監控當前機器的任務運行狀況。TaskTracker需要把這些信息通過心跳發送給JobTracker, JobTracker會搜集這些信息以給新提交的作業分配運行資源。

第一代MapReduce的架構簡單明了,剛推出時也有很多成功案例,但隨著分布式系統的集群規模和工作負荷不斷增長,使用原框架顯露出以下問題:

● 可擴展性問題。JobTracker完成了太多的任務,造成了過多的資源消耗,當MapReduce作業非常多的時候,會產生很大的內存開銷,同時也增加了JobTracker失敗的風險。內存管理以及JobTracker中各特性的粗粒度鎖問題成為可擴展性的顯著瓶頸。將JobTracker擴展到4000個節點規模的集群被證明是極端困難的。

● 內存溢出問題。在TaskTracker端,以MapReduce任務的數目作為資源的表示過于簡單,沒有考慮到任務中CPU、內存的占用情況,如果幾個大內存消耗的任務被調度到了一起,很容易出現內存溢出問題。

● 可靠性與可用性問題。JobTracker失敗所引發的中斷,不僅僅是丟失單獨的一個作業,而是會丟失集群中所有的運行作業,并且要求用戶手動重新提交并恢復他們的作業。從操作的角度來看,MapReduce框架在發生任何變化時(如修復缺陷、性能提升或增加特性),都會強制進行系統級別的升級更新。操作員必須協調好集群停機時間,關掉集群,部署新的二進制文件,驗證升級,然后才允許提交新的作業。任何停機都會導致處理的積壓,當作業被重新提交時,它們會給JobTracker造成明顯的壓力。更糟的是,升級強制讓分布式集群系統的每一個客戶端同時更新。這些更新會讓用戶為了驗證他們之前的應用程序是否適用于新的Hadoop版本而浪費大量時間。

● 資源模型問題。在TaskTracker端,把資源強制劃分為map任務槽位和reduce任務槽位,map和reduce的槽位數量是配置的固定值,因此閑置的map資源無法啟動reduce任務,反之亦然。當系統中只有map任務或只有reduce任務的時候,也會造成資源的浪費。

2. YARN架構

為了解決第一代Hadoop的可擴展性、內存消耗、線程模型、可靠性和性能上的問題,Hadoop開發出新一代的MapReduce框架,命名為MapReduce V2或者叫YARN,其架構如圖3-11所示。

圖3-11 YARN架構

YARN的基本思想是將資源管理和調度及監控功能從MapReduce分離出來,用獨立的后臺進程實現。這個想法需要有一個全局的資源管理器(ResourceManager),每個應用還要有一個應用主管(ApplicationMaster)。應用可以是一個單獨MapReduce作業,或者是一個作業的有向無環圖(DAG)。

資源管理器和節點管理器(NodeManager)構成了分布式數據計算框架。資源管理器是系統中所有應用資源分配的最終仲裁者。節點管理器是框架中每個工作節點的代理,監控節點CPU、內存、磁盤、網絡等資源的使用,并且報告給資源管理器。

每個應用對應的ApplicationMaster實際上是框架中一組特定的庫,負責從資源管理器協調資源,并和節點管理器一起工作,共同執行和監控任務。

資源管理器有兩個主要組件:調度器和應用管理器。調度器負責給多個正在運行的應用分配資源,比如對每個應用所能使用的資源做限制,按一定規則排隊等。調度器只負責資源分配,它不監控或跟蹤應用的狀態。而且,當任務因為應用的錯誤或硬件問題而失敗后,調度器不保證能重啟它們。調度器根據應用對資源的需求執行其調度功能,這基于一個叫做資源容器的抽象概念。資源容器由內存、CPU、磁盤、網絡等元素構成。調度器使用一個可插拔的調度策略,將集群資源分配給多個應用。當前支持的調度器如CapacityScheduler和FairScheduler就是可插拔調度器的例子。

應用管理器負責接收應用提交的作業,協調執行特定應用所需的資源容器,并在ApplicationMaster容器失敗時提供重啟服務。每個應用對應一個ApplicationMaster,它向調度器請求適當的資源容器,并跟蹤應用的狀態和資源使用情況。

Hadoop-2.x的MapReduce API保持與之前的穩定版本(Hadoop-1.x)兼容。這意味著老的MapReduce作業不需要做任何修改,只需要重新編譯就可以在YARN上執行。

3. Capacity調度器

Capacity調度器以一種操作友好的方式,把Hadoop應用作為一個共享的、多租戶集群來運行,并把集群利用率和吞吐量最大化。Capacity調度器允許多用戶安全地共享一個大規模Hadoop集群,并保證它們的性能。其核心思想是,Hadoop集群中的可用資源為多個用戶所共享,資源的多少是由他們的計算需求決定的。基于這種思想帶來的一個好處是,只要資源沒有被其他用戶使用,一個用戶就可以使用它,從而以一種具有成本效益的方式提供資源的彈性使用。

多個用戶共享集群,必須要實現所謂的多租戶(multi-tenancy)技術,這是因為集群中的每個用戶任務都必須保證高性能和安全性。特別是集群中出現了某個用戶或應用試圖占用大量資源時,共享的集群必須做到不影響其他用戶的使用。Capacity調度器提供了一套嚴格的限制機制,確保單一應用或用戶不能消耗集群中不成比例的資源數量。并且,Capacity調度器可能限制或掛起一個異常應用,以保證整個集群的穩定。

Capacity調度器一個主要的抽象概念是隊列(queues)。隊列是Capacity的基礎調度單元,管理員可以通過配置隊列來影響共享集群的使用。為了提供更多的控制和可預測性,Capacity調度器支持層次隊列,保證資源在允許其他隊列使用之前,被一個用戶的子隊列優先共享,以此為特定應用提供資源親和性。

4. Fair調度器

Fair調度是將資源公平分配給應用的方法,使得所有應用在平均情況下隨著時間得到相等的份額。新一代Hadoop有能力調度多種資源類型。默認時Fair調度器只在內存上采用公平調度。

在Fair調度模型中,每個應用都屬于某一個隊列。YARNContainer的分配是選擇使用了最少資源的隊列,在這個隊列中,再選擇使用了最少資源的應用程序。默認情況下,所有的用戶共享一個稱為“default”的隊列。如果一個應用程序在Container資源請求中指定了隊列,則將請求提交到該隊列中。另外,還可以將Fair調度器配置成根據請求中包含的用戶名來分配隊列。Fair調度器還支持許多功能,如隊列的權重(權重大的隊列獲得更多的Container),最小份額,最大份額,以及隊列內的FIFO策略,但基本思想是盡可能平均地共享資源。

在Fair調度器下,如果單個應用程序正在運行,該應用程序可以請求整個集群資源。若有其他程序提交,空閑的資源可以被公平地分配給新的應用程序,使每個應用程序最終可以獲得大致相當的資源。Fair調度器也支持搶占的概念,從而可以從ApplicationMaster要回Container。根據配置和應用程序的設計,搶占和隨后的資源分配可以是友好的或者強制的。

除了提供公平共享,Fair調度器還允許保證隊列的最小份額,這是確保某些用戶、組,或者應用程序總能得到的資源。當隊列中有等待的應用程序,它至少可以獲取其最小份額的資源。與此相反,當隊列并不需要所有的保證份額,超出的部分可以分配給其他運行的應用程序。為了避免擁有數百個作業的單個用戶充斥整個集群,Fair調度器可以通過配置文件限制用戶和每個隊列中運行應用程序的數量。若達到了該限制,用戶應用程序將在隊列中等待,直到前面提交的作業完成。

5. Container

在最基本的層面,Container是單個節點上內存、CPU核和磁盤等物理資源的集合。單個節點上可以有多個Container。系統中的每個節點可以認為是由內存和CPU最小容量的多個Container組成。ApplicationMaster可以請求任何Container來占據最小容量的整數倍的資源。因此Container代表了集群中單個節點上的一組資源(內存、CPU等),由節點管理器監控,由資源管理器調度。

每個應用程序從ApplicationMaster開始,它本身就是一個Container。一旦啟動,ApplicationMaster就與資源管理器協商更多的Container。在運行過程中,可以動態地請求或者釋放Container。例如,一個MapReduce作業可以請求一定數量的Map Container,當Map任務結束時,它可以釋放這些Map Container,并請求更多的Reduce Container。

6. NodeManager

NodeManager是DataNode節點上的“工作進程”代理,管理Hadoop集群中獨立的計算節點。其職責包括與ResourceManager保持通信、管理Container的生命周期、監控每個Container的資源使用情況、跟蹤節點健康狀況、管理日志和不同應用程序的附屬服務(auxiliaryservices)等。

在啟動時,NodeManager向ResourceManager注冊,然后發送包含了自身狀態的心跳,并等待來自ResourceManager的指令。它的主要目的是管理ResourceManager分配給它的應用程序Container。

7. ApplicationMaster

不同于YARN的其他組件,Hadoop 1.x中沒有組件和ApplicationMaster相對應。本質上講,ApplicationMaster所做的工作,就是原來JobTracker為每個應用所做的,但實現卻是完全不同的。

運行在Hadoop集群上的每個應用程序,都有自己專用的Application Master實例,它實際上運行在每個從節點的一個Container進程中。而JobTracker是運行在主節點上的單個后臺進程,跟蹤所有應用的進行情況。

ApplicationMaster會周期性地向ResourceManager發送心跳消息,報告自身的狀態和應用的資源使用情況。ResourceManager根據調度的結果,給特定從節點上的ApplicationMaster分配一個預留的Container的資源租約。ApplicationMaster監控一個應用的整個生命周期,從Container請求所需的資源開始,到ResourceManager將租約請求分配給NodeManager。

為Hadoop編寫的每個應用框架都有自己的ApplicationMaster實現。在YARN的設計中,MapReduce只是一種應用程序框架,這種設計允許使用其他框架建立和部署分布式應用程序。例如,YARN附帶了一個Distributed-Shell應用程序,它允許在YARN集群中的多個節點運行一個shell腳本。

主站蜘蛛池模板: 乌兰浩特市| 靖边县| 凤凰县| 大邑县| 乳山市| 长岭县| 唐山市| 麻栗坡县| 双桥区| 大庆市| 黔东| 曲阜市| 嘉荫县| 遂宁市| 龙口市| 安仁县| 穆棱市| 菏泽市| 盐亭县| 白朗县| 阳新县| 达州市| 包头市| 垫江县| 阿勒泰市| 临邑县| 惠东县| 颍上县| 阳信县| 来安县| 南康市| 开阳县| 道孚县| 衡东县| 永修县| 丹江口市| 义马市| 济源市| 文水县| 西宁市| 麻栗坡县|