官术网_书友最值得收藏!

5.6 案例分析:二次排序

MapReduce在傳遞<key,value>對(duì)時(shí)默認(rèn)按照key進(jìn)行排序,而有時(shí)候除了key以外,還需要根據(jù)value或value中的某一個(gè)字段進(jìn)行排序,基于這種需求進(jìn)行的自定義排序稱為“二次排序”。

例如有以下數(shù)據(jù):

現(xiàn)需要對(duì)上述數(shù)據(jù)先按照第一字段進(jìn)行升序排列,若第一字段相同,則按照第二字段進(jìn)行降序排列,期望的輸出結(jié)果如下:

1. 設(shè)計(jì)思路

由于MapReduce中主要是對(duì)key的比較和排序,因此可以將需要排序的兩個(gè)字段組合成一個(gè)復(fù)合key,而value值不變,則組合后的<key,value>對(duì)形如<(key,value),value>。

在編程時(shí)可以自定義一個(gè)類MyKeyPair,該類中包含要排序的兩個(gè)字段,然后將該類作為<key,value>對(duì)中的key(Hadoop中的任何類型都可以作為key),形如<MyKeyPair,value>,相當(dāng)于自定義key的類型。由于所有的key是可序列化并且可比較的,因此自定義的key需要實(shí)現(xiàn)接口WritableComparable。

與按照一個(gè)字段排序相比,本次二次排序需要自定義的地方如下:

  •  自定義組合key類,需要實(shí)現(xiàn)WritableComparable接口。
  •  自定義分區(qū)類,按照第一個(gè)字段進(jìn)行分區(qū),需要繼承Partitioner類。
  •  自定義分組類,按照第一個(gè)字段進(jìn)行分組,需要繼承WritableComparator類。
2. 編寫(xiě)程序

(1)自定義組合key類。

新建自定義組合key類MyKeyPair.java,該類需要實(shí)現(xiàn)Hadoop提供的org.apache.hadoop.io.WritableComparable接口,該接口繼承了org.apache.hadoop.io.Writable接口和java.lang.Comparable接口,定義源碼如下:

然后需要實(shí)現(xiàn)WritableComparable接口中的序列化方法write()、反序列化方法readFields()、比較方法compareTo()。write()方法用于將數(shù)據(jù)寫(xiě)入輸出流;readFields()方法用于從輸入流讀取數(shù)據(jù);compareTo()方法用于將兩個(gè)對(duì)象進(jìn)行比較,以便能夠進(jìn)行排序。

自定義組合key類MyKeyPair.java的源碼如下:

(2)自定義分區(qū)類。

新建自定義分區(qū)類MyPartitioner.java,該類需要繼承Hadoop提供的org.apache.hadoop.mapreduce.Partitioner類,并實(shí)現(xiàn)其中的抽象方法getPartition()。Partitioner類是一個(gè)抽象泛型類,用于控制對(duì)Map任務(wù)輸出結(jié)果的分區(qū),泛型的兩個(gè)參數(shù)分別表示<key,value>對(duì)中key的類型和value的類型。Partitioner類的源碼如下:

關(guān)于MapReduce的分區(qū)規(guī)則可參考本章5.1.3節(jié)的MapReduce工作原理,此處不再贅述。

自定義分區(qū)類MyPartitioner.java的源碼如下:

上述代碼繼承Partitioner類的同時(shí)指定了<key,value>對(duì)中key的類型為MyKeyPair,value的類型為IntWritable。

(3)自定義分組類。

新建自定義分組類MyGroupComparator.java,該類需要繼承Hadoop提供的org.apache.hadoop.io.WritableComparator類,并重寫(xiě)其中的compare()方法,以實(shí)現(xiàn)按照指定的字段進(jìn)行分組。

自定義分組類MyGroupComparator.java的源碼如下:

上述代碼首先通過(guò)構(gòu)造方法指定了<key,value>對(duì)中key的類型為MyKeyPair,由于MapReduce默認(rèn)以<key,value>對(duì)中的key值進(jìn)行分組,因此接下來(lái)重寫(xiě)了compare()方法,實(shí)現(xiàn)了按照MyKeyPair對(duì)象中的first字段進(jìn)行對(duì)比,若值相等則會(huì)將當(dāng)前<key,value>對(duì)分為一組。

(4)定義Mapper類。

新建Mapper類MyMapper.java,實(shí)現(xiàn)將輸入的數(shù)據(jù)封裝為<MyKeyPair, IntWritable>形式的<key,value>對(duì)進(jìn)行輸出,即輸出的key的類型為MyKeyPair,輸出的value的類型為IntWritable。

Mapper類MyMapper.java的源碼如下:

(5)定義Reducer類。

新建Reducer類MyReducer.java,將接收到的分組后的<key,value-list>對(duì)循環(huán)進(jìn)行輸出。

Reducer類MyReducer.java的源碼如下:

上述代碼將MyKeyPair類型的key中的first字段值作為輸出的key,輸出的value從集合values中進(jìn)行遍歷。

(6)定義應(yīng)用程序主類。

新建應(yīng)用程序主類MySecondSortApp.java,在該類中需要指定自定義的分區(qū)類和分組類,同時(shí)需要顯式設(shè)置Map任務(wù)輸出的key和value的類型。

應(yīng)用程序主類MySecondSortApp.java的源碼如下:

上述代碼解析如下:

? 設(shè)置map()方法輸出的key和value的類型。若將此省略,則默認(rèn)采用?中設(shè)置的輸出類型。也就是說(shuō),若map()方法和reduce()方法的輸出類型一致,可以省略對(duì)map()方法輸出類型的設(shè)置。若map()方法和reduce()方法實(shí)際的輸出類型與此處的設(shè)置不匹配,則程序運(yùn)行過(guò)程中將會(huì)報(bào)錯(cuò)。

在MapReduce程序運(yùn)行的過(guò)程中會(huì)通過(guò)JobConf類獲取map()方法的輸出類型,獲取map()方法輸出key的類型的源碼如下:

從上述源碼可以看出,當(dāng)沒(méi)有設(shè)置map()方法的輸出類型時(shí),會(huì)調(diào)用getOutputKeyClass()方法使用reduce()方法的輸出類型。

? 在執(zhí)行MapReduce程序時(shí),會(huì)首先從HDFS中讀取數(shù)據(jù)塊,然后按行拆分成<key,value>對(duì),這個(gè)過(guò)程則是由TextInputFormat類完成的。TextInputFormat類繼承了抽象類FileInputFormat<K,V>,而FileInputFormat<K, V>又繼承了抽象類InputFormat<K, V>,抽象類InputFormat<K, V>中定義了兩個(gè)方法:getSplits()和createRecordReader()。getSplits()方法負(fù)責(zé)將HDFS數(shù)據(jù)解析為InputSplit集合,createRecordReader()方法負(fù)責(zé)將一個(gè)InputSplit解析為一個(gè)<key,value>對(duì)記錄。抽象類InputFormat<K, V>的源碼如下:

3. 程序運(yùn)行

程序的打包和執(zhí)行參考前面的“單詞計(jì)數(shù)”和“數(shù)據(jù)去重”案例,此處不再贅述。

執(zhí)行完成后,查看執(zhí)行結(jié)果,如圖5-11所示。

圖5-11 查看二次排序程序執(zhí)行結(jié)果

主站蜘蛛池模板: 聊城市| 沧州市| 义乌市| 绥化市| 南岸区| 普定县| 阜新| 沈阳市| 无棣县| 南城县| 望城县| 湘阴县| 开封市| 周口市| 甘洛县| 绍兴市| 井研县| 苍溪县| 博罗县| 五指山市| 涪陵区| 九台市| 临泉县| 板桥市| 荣昌县| 舞阳县| 盘锦市| 安达市| 万州区| 锡林郭勒盟| 潮安县| 玉门市| 饶平县| 辛集市| 石阡县| SHOW| 东山县| 盈江县| 呼玛县| 荆门市| 彭阳县|