官术网_书友最值得收藏!

MapReduce job types

MapReduce jobs can be written in multiple ways, depending on what the desired outcome is. The fundamental structure of a MapReduce job is as follows:

import java.io.IOException;
import java.util.StringTokenizer;
import java.util.Map;
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.commons.lang.StringEscapeUtils;

public class
EnglishWordCounter {
public static class WordMapper
extends Mapper<Object, Text, Text, IntWritable> {
...
}
public static class CountReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
...
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "English Word Counter");
job.setJarByClass(EnglishWordCounter.class);
job.setMapperClass(WordMapper.class);
job.setCombinerClass(CountReducer.class);
job.setReducerClass(CountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

The purpose of the driver is to orchestrate the jobs. The first few lines of main are all about parsing command-line arguments. Then, we start setting up the job object by telling it what classes to use for computations and what input paths and output paths to use.

Let's look at the Mapper code, which simply tokenizes the input string and writes each word as an output of the mapper:

public static class WordMapper
extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
// Grab the "Text" field, since that is what we are counting over
String txt = value.toString()
StringTokenizer itr = new StringTokenizer(txt);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}

Finally, there is comes the reducer code, which is relatively simple. The reduce function gets called once per key grouping; in this case, each word. We'll iterate through the values, which will be numbers, and take a running sum. The final value of this running sum will be the sum of the ones:

public static class CountReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

There are basic types of MapReduce jobs, as shown in the following points.

主站蜘蛛池模板: 曲靖市| 阜宁县| 梅州市| 沈阳市| 长武县| 翁源县| 肇东市| 蕲春县| 新绛县| 林芝县| 辛集市| 丹东市| 嘉兴市| 惠安县| 吉木萨尔县| 临夏县| 永定县| 广河县| 肥城市| 揭东县| 南乐县| 静海县| 辉县市| 青铜峡市| 大埔县| 湾仔区| 敖汉旗| 凤阳县| 措勤县| 高邑县| 邳州市| 合阳县| 隆化县| 抚顺县| 友谊县| 塔河县| 安义县| 瑞昌市| 许昌市| 湖州市| 和田县|