官术网_书友最值得收藏!

5.1 MapReduce簡介

MapReduce是Hadoop的一個核心組成框架,使用該框架編寫的應用程序能夠以一種可靠的、容錯的方式并行處理大型集群(數千個節點)上的大量數據(TB級別以上),也可以對大數據進行加工、挖掘和優化等處理。

一個MapReduce任務主要包括兩部分:Map任務和Reduce任務。Map任務負責對數據的獲取、分割與處理,其核心執行方法為map()方法;Reduce任務負責對Map任務的結果進行匯總,其核心執行方法為reduce()方法。MapReduce將并行計算過程高度抽象到了map()方法和reduce()方法中,程序員只需負責這兩個方法的編寫工作,而并行程序中的其他復雜問題(如分布式存儲、工作調度、負載均衡、容錯處理等)均可由MapReduce框架代為處理,程序員完全不用操心。

5.1.1 設計思想

MapReduce的設計思想是,從HDFS中獲得輸入數據,將輸入的一個大的數據集分割成多個小數據集,然后并行計算這些小數據集,最后將每個小數據集的結果進行匯總,得到最終的計算結果,并將結果輸出到HDFS中,如圖5-1所示。

圖5-1 MapReduce設計思想流程圖

在MapReduce并行計算中,對大數據集分割后的小數據集的計算,采用的是map()方法,各個map()方法對輸入的數據進行并行處理,對不同的輸入數據產生不同的輸出結果;而對小數據集最終結果的合并,采用的是reduce()方法,各個reduce()方法也各自進行并行計算,各自負責處理不同的數據集合。但是在reduce()方法處理之前,必須等到map()方法處理完畢,因此在數據進入到reduce()方法前需要有一個中間階段,負責對map()方法的輸出結果進行整理,將整理后的結果輸入到reduce()方法,這個中間階段稱為Shuffle階段。

此外,在進行MapReduce計算時,有時候需要把最終的數據輸出到不同的文件中。比如,按照省份劃分的話,需要把同一省份的數據輸出到一個文件中;按照性別劃分的話,需要把同一性別的數據輸出到一個文件中。我們知道,最終的輸出數據來自于Reduce任務,如果要得到多個文件,意味著有同樣數量的Reduce任務在運行。而Reduce任務的數據來自于Map任務,也就是說,Map任務要進行數據劃分,對于不同的數據分配給不同的Reduce任務執行。Map任務劃分數據的過程就稱作分區(Partition,本章5.1.3節的MapReduce工作原理中會詳細講解)。

從編程的角度來看,將圖5-1進一步細化,可以得到圖5-2所示的流程。

圖5-2 MapReduce設計思想流程圖(編程角度)

總結來說,MapReduce利用了分而治之的思想,將數據分布式并行處理,然后進行結果匯總。舉個例子,有一堆撲克牌,現在需要把里面的花色都分開,而且統計每一種花色的數量。一個人清點可能耗時4分鐘,如果利用MapReduce的思想,把撲克牌分成4份,每個人對自己的那一份進行清點,4個人都清點完成之后把各自的相同花色放到一起并清點每種花色的數量,那么這樣可能只會耗時1分鐘。在這個過程中,每個人就相當于一個map()方法,把各自的相同花色放到一起的過程就是Partition,最后清點每一種花色的數量的過程就是reduce()方法。

5.1.2 任務流程

MapReduce程序運行于YARN之上,使用YARN進行集群資源管理和調度。每個MapReduce應用程序會在YARN中產生一個名為“MRAppMaster”的進程,該進程是MapReduce的ApplicationMaster實現,它具有YARN中ApplicationMaster角色的所有功能,包括管理整個MapReduce應用程序的生命周期、任務資源申請、Container啟動與釋放等。

客戶端將MapReduce應用程序(jar、可執行文件等)和配置信息提交給YARN集群的ResourceManager,ResourceManager負責將應用程序和配置信息分發給NodeManager、調度和監控任務、向客戶端提供狀態和診斷信息等。

圖5-3所示為MapReduce應用程序在YARN中的執行流程。

圖5-3 MapReduce 應用程序在YARN中的執行流程

(1)客戶端提交MapReduce應用程序到ResourceManager。

(2)ResourceManager分配用于運行MRAppMaster的Container,然后與NodeManager通信,要求它在該Container中啟動MRAppMaster。MRAppMaster啟動后,它將負責此應用程序的整個生命周期。

(3)MRAppMaster向ResourceManager注冊(注冊后客戶端可以通過ResourceManager查看應用程序的運行狀態)并請求運行應用程序各個Task所需的Container(資源請求是對一些Container的請求)。如果符合條件,ResourceManager會分配給MRAppMaster所需的Container。

(4)MRAppMaster請求NodeManager使用這些Container來運行應用程序的相應Task(即將Task發布到指定的Container中運行)。

此外,各個運行中的Task會通過RPC協議向MRAppMaster匯報自己的狀態和進度,這樣一旦某個Task運行失敗時,MRAppMaster可以對其重新啟動。當應用程序運行完成時,MRAppMaster會向ResourceManager申請注銷自己。

5.1.3 工作原理

MapReduce計算模型主要由三個階段組成:Map階段、Shuffle階段、Reduce階段,如圖5-4所示。

圖5-4 MapReduce計算模型

Map階段的工作原理如下:

將輸入的多個分片(Split)由Map任務以完全并行的方式處理。每個分片由一個Map任務來處理。默認情況下,輸入分片的大小與HDFS中數據塊(Block)的大小是相同的,即文件有多少個數據塊就有多少個輸入分片,也就會有多少個Map任務,從而可以通過調整HDFS數據塊的大小來間接改變Map任務的數量。

每個Map任務對輸入分片中的記錄按照一定的規則解析成多個<key,value>對。默認將文件中的每一行文本內容解析成一個<key,value>對,key為每一行的起始位置,value為本行的文本內容,然后將解析出的所有<key,value>對分別輸入到map()方法中進行處理(map()方法一次只處理一個<key,value>對)。map()方法將處理結果仍然是以<key,value>對的形式進行輸出。

由于頻繁的磁盤I/O會降低效率,因此Map任務輸出的<key,value>對會首先存儲在Map任務所在節點(不同的Map任務可能運行在不同的節點)的內存緩沖區中,緩沖區默認大小為100 MB(可修改mapreduce.task.io.sort.mb屬性調整)。當緩沖區中的數據量達到預先設置的閾值后(mapreduce.map.sort.spill.percent屬性的值,默認0.8,即80%),便會將緩沖區中的數據溢寫(spill)到磁盤(mapreduce.cluster.local.dir屬性指定的目錄,默認為${hadoop.tmp.dir}/mapred/local)的臨時文件中。

在數據溢寫到磁盤之前,會對數據進行分區(Partition)。分區的數量與設置的Reduce任務的數量相同(默認Reduce任務的數量為1,可以在編寫MapReduce程序時對其修改)。這樣每個Reduce任務會處理一個分區的數據,可以防止有的Reduce任務分配的數據量太大,而有的Reduce任務分配的數據量太小,從而可以負載均衡,避免數據傾斜。數據分區的劃分規則為:取<key,value>對中key的hashCode值,然后除以Reduce任務數量后取余數,余數則是分區編號,分區編號一致的<key,value>對則屬于同一個分區。因此,key值相同的<key,value>對一定屬于同一個分區,但是同一個分區中可能有多個key值不同的<key,value>對。由于默認Reduce任務的數量為1,而任何數字除以1的余數總是0,因此分區編號從0開始。

MapReduce提供的默認分區類為HashPartitioner,該類的核心代碼如下:

getPartition()方法有三個參數,前兩個參數指的是<key,value>對中的key和value,第三個參數指的是Reduce任務的數量,默認值為1。由于一個Reduce任務會向HDFS中輸出一個結果文件,而有時候需要根據自身的業務,將不同key值的結果數據輸出到不同的文件中。例如,需要統計各個部門的年銷售總額,每一個部門單獨輸出一個結果文件,這個時候就可以自定義分區(關于如何自定義分區,在本章的5.6節將詳細講解)。

分區后,會對同一個分區中的<key,value>對按照key進行排序,默認升序。

Reduce階段的工作原理如下:

Reduce階段首先會對Map階段的輸出結果按照分區進行再一次合并,將同一分區的<key,value>對合并到一起,然后按照key對分區中的<key,value>對進行排序。

每個分區會將排序后的<key,value>對按照key進行分組,key相同的<key,value>對將合并為<key,value-list>對,最終每個分區形成多個<key,value-list>對。例如,key中存儲的是用戶ID,則同一個用戶的<key,value>對會合并到一起。

排序并分組后的分區數據會輸入到reduce()方法中進行處理,reduce()方法一次只能處理一個<key,value-list>對。

最后,reduce()方法將處理結果仍然以<key,value>對的形式通過context.write(key,value)進行輸出。

Shuffle階段所處的位置是Map任務輸出后,Reduce任務接收前。Shuffle階段主要是將Map任務的無規則輸出形成一定的有規則數據,以便Reduce任務進行處理。

總結來說,MapReduce的工作原理主要是:通過Map任務讀取HDFS中的數據塊,這些數據塊由Map任務以完全并行的方式處理;然后將Map任務的輸出進行排序后輸入到Reduce任務中;最后Reduce任務將計算的結果輸出到HDFS文件系統中。

Map任務中的map()方法和Reduce任務中的reduce()方法需要用戶自己實現,而其他操作MapReduce已經幫用戶實現了。

通常,MapReduce計算節點和數據存儲節點是同一個節點,即MapReduce框架和HDFS文件系統運行在同一組節點上。這樣的配置可以使MapReduce框架在有數據的節點上高效地調度任務,避免過度消耗集群的網絡帶寬。

主站蜘蛛池模板: 井冈山市| 邢台县| 航空| 凌源市| 衡山县| 阳曲县| 宁乡县| 武胜县| 瑞昌市| 文登市| 慈溪市| 金川县| 马公市| 阜新市| 德惠市| 永泰县| 茌平县| 醴陵市| 准格尔旗| 武乡县| 繁峙县| 侯马市| 宜君县| 灵寿县| 罗江县| 长治县| 岑溪市| 那曲县| 万载县| 瓮安县| 镶黄旗| 册亨县| 离岛区| 金溪县| 柘荣县| 通许县| 新蔡县| 灵宝市| 本溪市| 巩义市| 上饶市|