官术网_书友最值得收藏!

The Reduce task

The Reduce task is an aggregation step. If the number of Reduce tasks is not specified, the default number is one. The risk of running one Reduce task would mean overloading that particular node. Having too many Reduce tasks would mean shuffle complexity and proliferation of output files that puts pressure on the NameNode. It is important to understand the data distribution and the partitioning function to decide the optimal number of Reduce tasks.

Tip

The ideal setting for each Reduce task to process is a range of 1 GB to 5 GB.

The number of Reduce tasks can be set using the mapreduce.job.reduces parameter. It can be programmatically set by calling the setNumReduceTasks() method on the Job object. There is a cap on the number of Reduce tasks that can be executed by a single node. It is given by the mapreduce.tasktracker.reduce.maximum property.

Note

The heuristic to determine the right number of reducers is as follows:

0.95 * (nodes * mapreduce.tasktracker.reduce.maximum)

Alternatively, you can use the following:

1.75 * (nodes * mapreduce.tasktracker.reduce.maximum)

At 0.95, each of the reducers can launch immediately after the Map tasks are completed, and at 1.75, the faster nodes will finish their first Reduce task and move onto the second one. This is a better setting for load balancing.

Fetching intermediate outputs – Reduce-side

The Reduce task fetches relevant partitions from a Map task as and when they finish. This is called the Copy phase. The number of Map tasks from whom a Reduce task can fetch data in parallel is determined by the value of the mapreduce.shuffle.reduce.parallelcopies parameter. The lower this value, the more the queuing on the Reduce side. The Reduce task might have to wait for an available slot to fetch data from a Map task.

In situations where a Reduce task cannot reach the output data of the Map task due to network connectivity issues, it retries the fetch in an exponential backoff fashion. The retries continue until the time value specified by the mapred.reduce.copy.backoff property is reached. After that, the Reduce task is marked as failed.

Merge and spill of intermediate outputs

Similar to the Map task's sort and spill, the Reduce task also needs to merge and invoke the Reduce on files on multiple Map task outputs. The next diagram illustrates this process. Depending on the size of the Map task output, they are either copied to a memory buffer or to the disk. The mapreduce.reduce.shuffle.input.buffer.percent property configures the size of this buffer as a percentage of the heap size allocated to the task.

The value of the mapreduce.reduce.shuffle.merge.percent property determines the threshold beyond which this buffer has to be spilt to disk. The default value of this setting is 0.66. The mapreduce.reduce.merge.inmem.threshold property sets the threshold for the number of map outputs that can reside in memory before a disk spill happens. The default value of this property is 1000. When either threshold is reached, the map outputs are written onto the disk.

Merge and spill of intermediate outputs

A background thread continuously merges the disk files. After all the outputs are received, the Reduce task moves into the Merge or Sort phase. Again, like the Map task merge, the number of file streams that are merged simultaneously is determined by the value of the mapreduce.task.io.sort.factor attribute. The tuning of these parameters can be done in a fashion similar to the Map-side spill and merge parameters. The key is to process as much as possible in the memory.

In later versions of Hadoop, two parameters, mapreduce.reduce.merge.memtomem.enabled and mapreduce.reduce.merge.memtomem.threshold, enable merging within the memory.

Any compression used for Map task outputs gets reversed in the memory during merging.

主站蜘蛛池模板: 洞头县| 遵义市| 江安县| 阿勒泰市| 龙岩市| 和林格尔县| 三穗县| 南宁市| 长岛县| 济宁市| 庆阳市| 商城县| 凌云县| 比如县| 岗巴县| 花莲市| 东山县| 扎鲁特旗| 名山县| 海城市| 天门市| 民乐县| 三穗县| 抚顺县| 宁津县| 徐水县| 镇宁| 白河县| 宣城市| 宁南县| 定安县| 云林县| 东山县| 石城县| 丽江市| 定西市| 邵东县| 贵德县| 塔城市| 景洪市| 苏尼特右旗|