官术网_书友最值得收藏!

Filtering inputs

Filtering inputs to a job based on certain attributes is often required. Data-level filtering can be done within the Maps, but it is more efficient to filter at the file level before the Map task is spawned. Filtering enables only interesting files to be processed by Map tasks and can have a positive effect on the runtime of the Map by eliminating unnecessary file fetch. For example, files generated only within a certain time period might be required for analysis.

Let's use the 441-grant proposal file corpus subset to illustrate filtering. Let's process those files whose names match a particular regular expression and have a minimum file size. Both of these are specified as job parameters—filter.name and filter.min.size, respectively. Implementation entails extending the Configured class and implementing the PathFilter interface as shown in the following snippet. The Configured class is the base class for things that can be configured using Configuration. The PathFilter interface is the interface that contains an accept() method. The accept() method implementation takes in a Path parameter and returns true or false depending on whether the file has to be included in the input or not. The outline of the class is shown in the following snippet:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public static class MasteringHadoopPathAndSizeFilter extends Configured implements PathFilter {
        private Configuration configuration;
        private Pattern filePattern;
        private long filterSize;
        private FileSystem fileSystem;

        @Override
        public boolean accept(Path path){
                //Your accept override implementation goes here
        }

        @Override
        public void setConf(Configuration conf){
                //Your setConf override implementation goes here
        }
    }

An important change is to override the setConf() method. This method is used to set the private Configuration variable and read off any properties from it. In the driver class, the job has to be informed about the presence of a filter using the following line:

FileInputFormat.setInputPathFilter(job, MasteringHadoopPathAndSizeFilter.class);

The implementation of the setConf() method is as follows:

     @Override
        public void setConf(Configuration conf){
            this.configuration = conf;

            if(this.configuration != null){
                String filterRegex = this.configuration.get("filter.name");

                if(filterRegex != null){
                    this.filePattern = Pattern.compile(filterRegex);
                }

                String filterSizeString = this.configuration.get("filter.min.size");

                if(filterSizeString != null){
                    this.filterSize = Long.parseLong(filterSizeString);
                }

                try{
                    this.fileSystem = FileSystem.get(this.configuration);
                }
                catch(IOException ioException){
                    //Error handling
                }

            }
        }

In the following code, the accept() method returns true for all directories. The path of the current directory is one of the paths that will be provided to the accept() method. It uses the Java regular expression classes such as Pattern and Matches to determine whether any of the file paths match the expression and sets a Boolean variable appropriately. A second check is done to determine the file size and compare it with the file size filter. The FileSystem object exposes a getFileStatus() method that returns a FileStatus object, which can be examined for its file attributes via getters.

    @Override
        public boolean accept(Path path){
          boolean isFileAcceptable = true;
          try{
                if(fileSystem.isDirectory(path)){
                      return true;
                }

                if(filePattern != null){
                    Matcher m = filePattern.matcher(path.toString());
                    isFileAcceptable = m.matches();
                }

                if(filterSize > 0){
                    long actualFileSize = fileSystem.getFileStatus(path).getLen();
                    if(actualFileSize > this.filterSize){
                        isFileAcceptable &= true;
                    }
                    else{
                        isFileAcceptable = false;
                    }
                }

            }
            catch(IOException ioException){
                //Error handling goes here.

            }

            return isFileAcceptable;
        }

The following command line accepts files that have a999645 in their names and have sizes greater than 2,500 bytes. If either parameter is omitted, no filter is applied for that attribute.

hadoop jar MasteringHadoop-1.0-SNAPSHOT-jar-with-dependencies.jar –D filter.name=.*a999645.* -D filter.min.size=2500 grant-subset grant-subset-filter

Three files pass the test and the output is shown as follows. The filtering happens before the splits are decided.

14/04/10 21:34:38 INFO input.FileInputFormat: Total input paths to process : 3
14/04/10 21:34:39 INFO mapreduce.JobSubmitter: number of splits:3
主站蜘蛛池模板: 文水县| 岳阳市| 秦安县| 固安县| 柳州市| 红安县| 易门县| 普陀区| 肇州县| 江山市| 盈江县| 秦皇岛市| 武平县| 阳信县| 嘉鱼县| 贵港市| 乌兰察布市| 株洲县| 同心县| 文水县| 铜川市| 宝兴县| 齐齐哈尔市| 仙居县| 宽甸| 内江市| 田东县| 台南市| 偃师市| 教育| 湟源县| 漳浦县| 高淳县| 商城县| 易门县| 虹口区| 紫金县| 兴安县| 正镶白旗| 奉贤区| 安义县|