官术网_书友最值得收藏!

Join patterns

Data is all over the place, and while it's very valuable on its own, we can discover interesting relationships when we start analyzing these sets together. This is where join patterns come in to play. Joins can be used to enrich data with a smaller reference set or they can be used to filter out or select records that are in some type of special list.

To understand these patterns and their implementations, you should refer to the MultipleMappersReducer job earlier in this chapter.

The abbreviated code is shown as follows, showing the two mappers and one reducer classes:

public class MultipleMappersReducer
{
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "City Temperature Job");
job.setMapperClass(TemperatureMapper.class);
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, CityMapper.class);
MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, TemperatureMapper.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(TemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

FileOutputFormat.setOutputPath(job, new Path(args[2]));

System.exit(job.waitForCompletion(true) ? 0 : 1);
}

/*
Id,City
1,Boston
2,New York
*/
private static class CityMapper

extends Mapper<Object, Text, Text, Text> {

public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String txt = value.toString();
String[] tokens = txt.split(",");
String id = tokens[0].trim();
String name = tokens[1].trim();
if (name.compareTo("City") != 0)
context.write(new Text(id), new Text(name));
}
}

/*
Date,Id,Temperature
2018-01-01,1,21
2018-01-01,2,22
*/
private static class TemperatureMapper
extends Mapper<Object, Text, Text, Text> {

public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String txt = value.toString();
String[] tokens = txt.split(",");
String date = tokens[0];
String id = tokens[1].trim();
String temperature = tokens[2].trim();
if (temperature.compareTo("Temperature") != 0)
context.write(new Text(id), new Text(temperature));
}
}


private static class TemperatureReducer
extends Reducer<Text, Text, Text, IntWritable> {
private IntWritable result = new IntWritable();
private Text cityName = new Text("Unknown");
public void reduce(Text key, Iterable<Text> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
int n = 0;

cityName = new Text("city-"+key.toString());

for (Text val : values) {
String strVal = val.toString();
if (strVal.length() <=3)
{
sum += Integer.parseInt(strVal);
n +=1;
} else {
cityName = new Text(strVal);
}
}
if (n==0) n = 1;
result.set(sum/n);
context.write(cityName, result);
}
}
}

The output of this job is shown in the following code:

Boston 22
New York 23
Chicago 23
Philadelphia 23
San Francisco 22
city-6 22 //city ID 6 has no name in cities.csv only temperature measurements
Las Vegas 0 // city of Las vegas has no temperature measurements in temperature.csv
主站蜘蛛池模板: 托克逊县| 微博| 抚顺市| 海伦市| 应城市| 华亭县| 女性| 奉化市| 旌德县| 吉林省| 会昌县| 桐乡市| 当涂县| 长顺县| 申扎县| 龙山县| 岱山县| 三都| 台南县| 曲沃县| 长治县| 鹤山市| 漳州市| 通辽市| 朝阳区| 泰宁县| 蓬安县| 庆元县| 综艺| 建始县| 永泰县| 察哈| 潞西市| 东丰县| 曲靖市| 辽阳市| 荥阳市| 涿州市| 黄山市| 沙洋县| 遂溪县|