官术网_书友最值得收藏!

3.2 操作實(shí)踐

前面主要介紹了HDFS系統(tǒng)接口和編程方式,本節(jié)介紹HDFS中Java編程的操作實(shí)例。

3.2.1 文件操作

使用命令行編寫HDFS程序,通常有三個(gè)步驟。

首先,編寫HDFS程序源碼,并通過java編譯器編譯成字節(jié)碼。

然后,將字節(jié)碼打包成JAR文件。

最后,通過Hadoop加載JAR文件,并運(yùn)行。

下面,我們以一個(gè)完整的文件操作為例來說明。程序的主要功能如下。

(1)在HDFS文件系統(tǒng)中創(chuàng)建一個(gè)名為“hdtest”的目錄。

(2)將本地名為“hfile.txt”的文件上傳到HDFS中的hdtest目錄下面。

(3)遍歷hdtest目錄。

(4)將HDFS中的hdtest/hfile.txt文件下載到本地,并另存為“hfile2.txt”。

程序的源代碼如下:

        import java.io.ByteArrayOutputStream;
        import java.io.IOException;
        import java.io.OutputStream;
        import java.net.URI;
        import org.apache.hadoop.conf.Configuration;
        import org.apache.hadoop.fs.FSDataInputStream;
        import org.apache.hadoop.fs.FSDataOutputStream;
        import org.apache.hadoop.fs.FileStatus;
        import org.apache.hadoop.fs.FileSystem;
        import org.apache.hadoop.fs.Path;
        import org.apache.hadoop.io.IOUtils;
        public class HdfsTest {
            private static final String HADOOP_URL = "hdfs://test.hadoop:9000";
            private Configuration conf;
            /**
             * 構(gòu)造函數(shù)
             */
            public HdfsTest() {
              this.conf = new Configuration();    }
            /**
             * 測(cè)試入口函數(shù)
             */
            public static void main(String[]args) throws IOException {
                HdfsTest hdfs = new HdfsTest();
                hdfs.createDir("/hdtest"); //創(chuàng)建目錄
                hdfs.copyFile("file/hfile.txt", "/hdtest/hfile.txt"); //拷貝文件
                hdfs.ls("/hdtest");  //遍歷目錄
                hdfs.cat("/hdtest/hfile.txt");  //查看文件內(nèi)容
                //下載文件并另存
                hdfs.download("/hdtest/hfile.txt", "file/hfile2.txt");
            }
             /**
             * 創(chuàng)建目錄
             * @param folder
             * @throws IOException
             */
            public void createDir(String folder) throws IOException {
                Path path = new Path(folder);
                FileSystem fs = FileSystem.get(conf);
                if (! fs.exists(path)) {
                  fs.mkdirs(path);
                  System.out.println("Create: " + folder);
                }
                fs.close();
            }
            /**
             * 上傳文件到HDFS
             * @param local
             * @param remote
             * @throws IOException
             */
            public void copyFile(String local, String remote) throws IOException {
                FileSystem fs = FileSystem.get(conf);
                fs.copyFromLocalFile(new Path(local), new Path(remote));
                System.out.println("copy from: " + local + " to " + remote);
                fs.close();
            }
            /**
             * 遍歷文件
             * @param folder
             * @throws IOException
             */
           public void ls(String folder) throws IOException {
                Path path = new Path(folder);
                FileSystem fs = FileSystem.get(conf);
                FileStatus[]list = fs.listStatus(path);
                System.out.println("ls: " + folder);
                System.out.println("**********list begin*************");
                for (FileStatus f : list) {
                  System.out.printf("name: %s, folder: %s, size: %d\n",
                                      f.getPath(), f.isDir(), f.getLen());
                }
                System.out.println("**********list end*************");
                fs.close();
            }
            /**
             * 查看文件中的內(nèi)容
             * @param remoteFile
             * @return    * @throws IOException
            */
           public String cat(String remoteFile) throws IOException {
              Path path = new Path(remoteFile);
              FileSystem fs = FileSystem.get(conf);
              FSDataInputStream fsdis = null;
              System.out.println("Content: " + remoteFile);
              OutputStream baos = new ByteArrayOutputStream();
              String str = null;
              try {
                  fsdis = fs.open(path);
                  IOUtils.copyBytes(fsdis, baos, 4096, false);
                  str = baos.toString();
              } finally {
                  IOUtils.closeStream(fsdis);
                  fs.close();
              }
              System.out.println(str);
              return str;
           }
           /**
            * 從HDFS中下載文件到本地
            * @param remote
            * @param local
            * @throws IOException
            */
           public void download(String remote, String local) throws IOException {
              Path path = new Path(remote);
              FileSystem fs = FileSystem.get(conf);
              fs.copyToLocalFile(path, new Path(local));
              System.out.println(
                "download file from'" + remote + "' to '" + local + "'");
              fs.close();
           }
           /**
            * 重命名文件
            * @param src
            * @param dst
            * @throws IOException
            */
           public void rename(String src, String dst) throws IOException {
              FileSystem fs = FileSystem.get(conf);
              fs.rename(new Path(src), new Path(dst));
              System.out.println("Rename:  " + src + " to " + dst);
              fs.close();
           }
           /**
            * 刪除文件或目錄
            * @param folder
            * @throws IOException
            */
           public void delete(String folder) throws IOException {
              Path path = new Path(folder);
              FileSystem fs = FileSystem.get(conf);
              fs.deleteOnExit(path);
              System.out.println("Delete: " + folder);
              fs.close();
           }
        }

編譯HdfsTest.java源文件。Hadoop 2.x版本中JAR不再集中在一個(gè)hadoop-core*.jar中,而是分成多個(gè)JAR(如$HADOOP_HOME/share/hadoop/common/hadoop-common-2.6.0.jar、$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.6.0.jar、$HADOO P_HOME/share/hadoop/common/lib/commons-cli-1.2.jar等),通過“hadoop classpath”命令,可以得到運(yùn)行Hadoop程序所需的全部classpath信息。

我們將Hadoop的classhpath信息添加到CLASSPATH變量中,然后直接編譯:

        $ javac HdfsTest.java

編譯時(shí)會(huì)有警告,可以忽略。編譯后,可以看到生成的.class文件,如圖3-10所示。

圖3-10 編譯并查看生成的.class文件

打包.class文件,如圖3-11所示。

圖3-11 打包.class文件并查看

運(yùn)行測(cè)試,結(jié)果如圖3-12所示。

圖3-12 運(yùn)行測(cè)試結(jié)果

由上面的運(yùn)行結(jié)果可以看到,我們?cè)贖DFS文件系統(tǒng)中成功地創(chuàng)建了目錄并上傳/下載了一個(gè)文件。通過Fs Shell命令,可以驗(yàn)證查看已上傳的文件,如圖3-13所示。

圖3-13 驗(yàn)證查看已上傳的文件

此外,上述實(shí)例代碼中,還提供了重命名(rename)和刪除(delete)函數(shù),感興趣的讀者可以自己測(cè)試一下。

使用命令行編譯運(yùn)行Java程序有些麻煩,每修改一次就需要手動(dòng)編譯、打包一次。對(duì)于較大規(guī)模的應(yīng)用,可以使用Eclipse等集成環(huán)境進(jìn)行開發(fā),以提高開發(fā)效率。

3.2.2 壓縮與解壓縮

我們?cè)贖DFS中對(duì)數(shù)據(jù)進(jìn)行壓縮處理來優(yōu)化磁盤使用率,提高數(shù)據(jù)在磁盤和網(wǎng)絡(luò)中的傳輸速度,從而提高系統(tǒng)處理數(shù)據(jù)的效率。

Hadoop應(yīng)對(duì)壓縮格式的技術(shù)是自動(dòng)識(shí)別。如果我們壓縮的文件有相應(yīng)壓縮格式的擴(kuò)展名(比如lzo、gz、bzip2等),Hadoop會(huì)根據(jù)壓縮格式的擴(kuò)展名,自動(dòng)選擇相對(duì)應(yīng)的解碼器來解壓數(shù)據(jù),此過程完全是Hadoop自動(dòng)處理的,我們只須確保輸入的壓縮文件有擴(kuò)展名。

Hadoop在Codec類(org.apache.hadoop.io.compress)中,實(shí)現(xiàn)了壓縮和解壓縮的接口CompressionCodec。可用的Codec實(shí)現(xiàn)類見表3-12。

表3-12 可用的Codec實(shí)現(xiàn)類

CompressionCodec有兩個(gè)方法,可以幫助我們方便地壓縮或解壓數(shù)據(jù)。壓縮數(shù)據(jù)時(shí)使用createOutputStream(OutputStream out)獲取壓縮輸出流對(duì)象CompressionOutputStream,我們將未壓縮的數(shù)據(jù)寫入該流,它會(huì)幫我們壓縮數(shù)據(jù)后,寫出至底層的數(shù)據(jù)流out。

相反地,在解析數(shù)據(jù)的時(shí)候,使用createInputStream(InputStream in)獲取解壓縮輸入流對(duì)象CompressionInputstream,通過它,我們可以從底層的數(shù)據(jù)流中讀取解壓后的數(shù)據(jù)。

CompressionOutputStream、CompressionInputStream與java.util.zip.DeflaterOutputStream、java.util.zip.DeflaterInputStream類似,但是,前者支持重置內(nèi)部的壓縮器(Compressor)與解壓縮器(Decompressor)狀態(tài)。

CompressionCodecFactory是Hadoop壓縮框架中的另一個(gè)類,主要功能是負(fù)責(zé)根據(jù)不同的文件擴(kuò)展名,來自動(dòng)地獲取相對(duì)應(yīng)的壓縮解壓器,使用者可以通過它提供的方法,獲得CompressionCodec,極大地增強(qiáng)了應(yīng)用程序在處理壓縮文件時(shí)的通用性。

除了前面介紹的createInputStream()和createInputStream()方法外,Hadoop中還有其他兩種壓縮模式。

一是壓縮機(jī)Compressor和解壓機(jī)Decompressor。在Hadoop的實(shí)現(xiàn)中,數(shù)據(jù)編碼器和解碼器被抽象成了兩個(gè)接口:org.apache.hadoop.io.compress.Compressor和org.apache.hadoop.io.compress.Decompressor。它們規(guī)定了一系列的方法,所以,在Hadoop內(nèi)部的編碼/解碼算法實(shí)現(xiàn)中都需要實(shí)現(xiàn)對(duì)應(yīng)的接口。在實(shí)際的數(shù)據(jù)壓縮與解壓縮過程中,Hadoop為用戶提供了統(tǒng)一的I/O流處理模式。

二是壓縮流CompressionOutputStream和解壓縮流CompressionInputStream。這兩個(gè)類分別繼承自java.io.OutputStream和java.io.InputStream,作用也類似。

下面,我們編碼實(shí)現(xiàn)文件的壓縮和解壓縮操作。源程序如下:

        import java.io.IOException;
        import java.io.InputStream;
        import java.io.OutputStream;
        import java.net.URI;
        import org.apache.hadoop.conf.Configuration;
        import org.apache.hadoop.fs.FSDataInputStream;
        import org.apache.hadoop.fs.FSDataOutputStream;
        import org.apache.hadoop.fs.FileSystem;
        import org.apache.hadoop.fs.Path;
        import org.apache.hadoop.io.IOUtils;
        import org.apache.hadoop.io.compress.CompressionCodec;
        import org.apache.hadoop.io.compress.CompressionCodecFactory;
        import org.apache.hadoop.io.compress.CompressionInputStream;
        import org.apache.hadoop.io.compress.CompressionOutputStream;
        import org.apache.hadoop.util.ReflectionUtils;
        public class CompressTest {
            /**
            * 壓縮文件
            * @param codecClassName
            * @param filein, fileout
            * @throws IOException
            */
            public static void compress(String codecClassName, String filein,
              String fileout) throws Exception {
              Class<? > codecClass = Class.forName(codecClassName);
              Configuration conf = new Configuration();
              FileSystem fs = FileSystem.get(conf);
              CompressionCodec codec = (CompressionCodec)ReflectionUtils
                                          .newInstance(codecClass, conf);
              //指定壓縮文件路徑
              FSDataOutputStream outputStream = fs.create(new Path(fileout));
              //指定要被壓縮的文件路徑
              FSDataInputStream in = fs.open(new Path(filein));
              //創(chuàng)建壓縮輸出流
              CompressionOutputStream out =
                codec.createOutputStream(outputStream);
              IOUtils.copyBytes(in, out, conf);
              IOUtils.closeStream(in);
              IOUtils.closeStream(out);
            }
            /**
            * 解壓縮:使用文件擴(kuò)展名來推斷codec
            * @param fileuri
            * @throws IOException
            */      
            public static void uncompress(String fileuri) throws IOException {
                Configuration conf = new Configuration();
                FileSystem fs = FileSystem.get(URI.create(fileuri), conf);
                Path inputPath = new Path(fileuri);
                CompressionCodecFactory factory = new CompressionCodecFactory(conf);
                CompressionCodec codec = factory.getCodec(inputPath);
                if(codec == null) {
                  System.out.println("no codec for " + fileuri);
                  System.exit(1);
                }
                String outputUri = CompressionCodecFactory.removeSuffix(
                                    fileuri, codec.getDefaultExtension());
                InputStream in = null;
                OutputStream out = null;
                try {
                  in = codec.createInputStream(fs.open(inputPath));
                  out = fs.create(new Path(outputUri));
                  IOUtils.copyBytes(in, out, conf);
                } finally {
                  IOUtils.closeStream(out);
                  IOUtils.closeStream(in);
                }
            }
            public static void main(String[]args) throws Exception {
                String filein = "/hdtest/bigdata.pdf";
                String fileout = "/hdtest/bigdatacom.gz";
                compress("org.apache.hadoop.io.compress.GzipCodec",
                          filein, fileout);
                //uncompress(fileout);
            }
        }

編譯并打包運(yùn)行。壓縮操作運(yùn)行的結(jié)果如圖3-14所示。

圖3-14 進(jìn)行壓縮操作并查看結(jié)果

解壓縮操作的運(yùn)行結(jié)果如圖3-15所示。

圖3-15 解壓縮的運(yùn)行結(jié)果

主站蜘蛛池模板: 醴陵市| 罗田县| 莫力| 天水市| 文昌市| 东港市| 从化市| 行唐县| 利川市| 冷水江市| 房山区| 疏勒县| 清水河县| 昌图县| 象山县| 蕲春县| 施甸县| 江川县| 元阳县| 陆丰市| 原阳县| 日土县| 玛沁县| 名山县| 井研县| 大田县| 灵丘县| 工布江达县| 新津县| 乌拉特前旗| 浑源县| 景宁| 富锦市| 毕节市| 北辰区| 云梦县| 榕江县| 昔阳县| 甘孜县| 怀柔区| 安义县|