官术网_书友最值得收藏!

Multiple mappers reducer job

Multiple mappers reducer jobs are used in join use cases. In this design pattern, our input is taken from multiple input files to yield joined/aggregated output:

Now let's look at a complete example of a single mapper reducer job. For this, we will simply try to output the cityID and average temperature from the temperature.csv file seen earlier.

The following is the code:

package io.somethinglikethis;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;


public class MultipleMappersReducer
{
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "City Temperature Job");
job.setMapperClass(TemperatureMapper.class);
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, CityMapper.class);
MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, TemperatureMapper.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(TemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

FileOutputFormat.setOutputPath(job, new Path(args[2]));

System.exit(job.waitForCompletion(true) ? 0 : 1);
}


/*
Id,City
1,Boston
2,New York
*/
private static class CityMapper

extends Mapper<Object, Text, Text, Text> {

public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String txt = value.toString();
String[] tokens = txt.split(",");
String id = tokens[0].trim();
String name = tokens[1].trim();
if (name.compareTo("City") != 0)
context.write(new Text(id), new Text(name));
}
}


/*
Date,Id,Temperature
2018-01-01,1,21
2018-01-01,2,22
*/
private static class TemperatureMapper
extends Mapper<Object, Text, Text, Text> {

public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String txt = value.toString();
String[] tokens = txt.split(",");
String date = tokens[0];
String id = tokens[1].trim();
String temperature = tokens[2].trim();
if (temperature.compareTo("Temperature") != 0)
context.write(new Text(id), new Text(temperature));
}
}



private static class TemperatureReducer
extends Reducer<Text, Text, Text, IntWritable> {
private IntWritable result = new IntWritable();
private Text cityName = new Text("Unknown");
public void reduce(Text key, Iterable<Text> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
int n = 0;

cityName = new Text("city-"+key.toString());

for (Text val : values) {
String strVal = val.toString();
if (strVal.length() <=3)
{
sum += Integer.parseInt(strVal);
n +=1;
} else {
cityName = new Text(strVal);
}
}
if (n==0) n = 1;
result.set(sum/n);
context.write(cityName, result);
}
}
}

Now, run the command, as shown in the following code:

hadoop jar target/uber-mapreduce-1.0-SNAPSHOT.jar io.somethinglikethis.MultipleMappersReducer /user/normal/cities.csv /user/normal/temperatures.csv /user/normal/output/MultipleMappersReducer

The job will run and you should be able to see output as shown in the following output counters:

Map-Reduce Framework -- mapper for temperature.csv
Map input records=28
Map output records=27
Map output bytes=135
Map output materialized bytes=195
Input split bytes=286
Combine input records=0
Spilled Records=27
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=0
Total committed heap usage (bytes)=430964736

Map-Reduce Framework. -- mapper for cities.csv
Map input records=7
Map output records=6
Map output bytes=73
Map output materialized bytes=91
Input split bytes=273
Combine input records=0
Spilled Records=6
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=10
Total committed heap usage (bytes)=657457152

Map-Reduce Framework -- output average temperature per city name
Map input records=35
Map output records=33
Map output bytes=208
Map output materialized bytes=286
Input split bytes=559
Combine input records=0
Combine output records=0
Reduce input groups=7
Reduce shuffle bytes=286
Reduce input records=33
Reduce output records=7
Spilled Records=66
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=10
Total committed heap usage (bytes)=1745879040

This shows that 27 records were output from one Mapper, six records were output from Mapper2 and seven records were output by the reducer. You will be able to check this using the HDFS browser, simply by using http://localhost:9870 and jumping into the output directory shown under /user/normal/output, as shown in the following screenshot:

Figure: Check output in output directory

Now find the MultipleMappersReducer folder go into the directory, and then drill down as in the SingleMapper section; then, using the head/tail option in the preceding screenshot, you can view the content of the file, as shown in the following screenshot:

Figure: Content of the file

This shows the output of the MultipleMappersReducer job as the cityName and average temperature per city. If a cityID does not have corresponding temperature records in temperature.csv, the average is shown as 0. Similarly, if a cityID does not have a name in cities.csv, then the city name is shown as city-N.

You can also use the command line to view the contents of output hdfs dfs -cat /user/normal/output/MultipleMappersReducer/part-r-00000.

The output file contents are shown in the following code:

Boston 22
New York 23
Chicago 23
Philadelphia 23
San Francisco 22
city-6 22 //city ID 6 has no name in cities.csv only temperature measurements
Las Vegas 0 // city of Las vegas has no temperature measurements in temperature.csv

This concludes the MultipleMappersReducer job execution, and the output is as expected.

主站蜘蛛池模板: 海盐县| 大同县| 元朗区| 千阳县| 安庆市| 昌邑市| 永仁县| 收藏| 涞水县| 延吉市| 如皋市| 华坪县| 湛江市| 灵山县| 德昌县| 无棣县| 西峡县| 全椒县| 古浪县| 岳普湖县| 云霄县| 扎兰屯市| 库车县| 柳江县| 苍山县| 株洲市| 汤原县| 喀喇| 南华县| 潼南县| 福州市| 西华县| 铜梁县| 尖扎县| 陈巴尔虎旗| 岚皋县| 安庆市| 施秉县| 农安县| 甘洛县| 金寨县|