- 大數(shù)據(jù):從基礎(chǔ)理論到最佳實(shí)踐
- 祁偉
- 2179字
- 2021-01-07 18:48:01
3.2 操作實(shí)踐
前面主要介紹了HDFS系統(tǒng)接口和編程方式,本節(jié)介紹HDFS中Java編程的操作實(shí)例。
3.2.1 文件操作
使用命令行編寫HDFS程序,通常有三個(gè)步驟。
首先,編寫HDFS程序源碼,并通過java編譯器編譯成字節(jié)碼。
然后,將字節(jié)碼打包成JAR文件。
最后,通過Hadoop加載JAR文件,并運(yùn)行。
下面,我們以一個(gè)完整的文件操作為例來說明。程序的主要功能如下。
(1)在HDFS文件系統(tǒng)中創(chuàng)建一個(gè)名為“hdtest”的目錄。
(2)將本地名為“hfile.txt”的文件上傳到HDFS中的hdtest目錄下面。
(3)遍歷hdtest目錄。
(4)將HDFS中的hdtest/hfile.txt文件下載到本地,并另存為“hfile2.txt”。
程序的源代碼如下:
import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; public class HdfsTest { private static final String HADOOP_URL = "hdfs://test.hadoop:9000"; private Configuration conf; /** * 構(gòu)造函數(shù) */ public HdfsTest() { this.conf = new Configuration(); } /** * 測(cè)試入口函數(shù) */ public static void main(String[]args) throws IOException { HdfsTest hdfs = new HdfsTest(); hdfs.createDir("/hdtest"); //創(chuàng)建目錄 hdfs.copyFile("file/hfile.txt", "/hdtest/hfile.txt"); //拷貝文件 hdfs.ls("/hdtest"); //遍歷目錄 hdfs.cat("/hdtest/hfile.txt"); //查看文件內(nèi)容 //下載文件并另存 hdfs.download("/hdtest/hfile.txt", "file/hfile2.txt"); } /** * 創(chuàng)建目錄 * @param folder * @throws IOException */ public void createDir(String folder) throws IOException { Path path = new Path(folder); FileSystem fs = FileSystem.get(conf); if (! fs.exists(path)) { fs.mkdirs(path); System.out.println("Create: " + folder); } fs.close(); } /** * 上傳文件到HDFS * @param local * @param remote * @throws IOException */ public void copyFile(String local, String remote) throws IOException { FileSystem fs = FileSystem.get(conf); fs.copyFromLocalFile(new Path(local), new Path(remote)); System.out.println("copy from: " + local + " to " + remote); fs.close(); } /** * 遍歷文件 * @param folder * @throws IOException */ public void ls(String folder) throws IOException { Path path = new Path(folder); FileSystem fs = FileSystem.get(conf); FileStatus[]list = fs.listStatus(path); System.out.println("ls: " + folder); System.out.println("**********list begin*************"); for (FileStatus f : list) { System.out.printf("name: %s, folder: %s, size: %d\n", f.getPath(), f.isDir(), f.getLen()); } System.out.println("**********list end*************"); fs.close(); } /** * 查看文件中的內(nèi)容 * @param remoteFile * @return * @throws IOException */ public String cat(String remoteFile) throws IOException { Path path = new Path(remoteFile); FileSystem fs = FileSystem.get(conf); FSDataInputStream fsdis = null; System.out.println("Content: " + remoteFile); OutputStream baos = new ByteArrayOutputStream(); String str = null; try { fsdis = fs.open(path); IOUtils.copyBytes(fsdis, baos, 4096, false); str = baos.toString(); } finally { IOUtils.closeStream(fsdis); fs.close(); } System.out.println(str); return str; } /** * 從HDFS中下載文件到本地 * @param remote * @param local * @throws IOException */ public void download(String remote, String local) throws IOException { Path path = new Path(remote); FileSystem fs = FileSystem.get(conf); fs.copyToLocalFile(path, new Path(local)); System.out.println( "download file from'" + remote + "' to '" + local + "'"); fs.close(); } /** * 重命名文件 * @param src * @param dst * @throws IOException */ public void rename(String src, String dst) throws IOException { FileSystem fs = FileSystem.get(conf); fs.rename(new Path(src), new Path(dst)); System.out.println("Rename: " + src + " to " + dst); fs.close(); } /** * 刪除文件或目錄 * @param folder * @throws IOException */ public void delete(String folder) throws IOException { Path path = new Path(folder); FileSystem fs = FileSystem.get(conf); fs.deleteOnExit(path); System.out.println("Delete: " + folder); fs.close(); } }
編譯HdfsTest.java源文件。Hadoop 2.x版本中JAR不再集中在一個(gè)hadoop-core*.jar中,而是分成多個(gè)JAR(如$HADOOP_HOME/share/hadoop/common/hadoop-common-2.6.0.jar、$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.6.0.jar、$HADOO P_HOME/share/hadoop/common/lib/commons-cli-1.2.jar等),通過“hadoop classpath”命令,可以得到運(yùn)行Hadoop程序所需的全部classpath信息。
我們將Hadoop的classhpath信息添加到CLASSPATH變量中,然后直接編譯:
$ javac HdfsTest.java
編譯時(shí)會(huì)有警告,可以忽略。編譯后,可以看到生成的.class文件,如圖3-10所示。

圖3-10 編譯并查看生成的.class文件
打包.class文件,如圖3-11所示。

圖3-11 打包.class文件并查看
運(yùn)行測(cè)試,結(jié)果如圖3-12所示。

圖3-12 運(yùn)行測(cè)試結(jié)果
由上面的運(yùn)行結(jié)果可以看到,我們?cè)贖DFS文件系統(tǒng)中成功地創(chuàng)建了目錄并上傳/下載了一個(gè)文件。通過Fs Shell命令,可以驗(yàn)證查看已上傳的文件,如圖3-13所示。

圖3-13 驗(yàn)證查看已上傳的文件
此外,上述實(shí)例代碼中,還提供了重命名(rename)和刪除(delete)函數(shù),感興趣的讀者可以自己測(cè)試一下。
使用命令行編譯運(yùn)行Java程序有些麻煩,每修改一次就需要手動(dòng)編譯、打包一次。對(duì)于較大規(guī)模的應(yīng)用,可以使用Eclipse等集成環(huán)境進(jìn)行開發(fā),以提高開發(fā)效率。
3.2.2 壓縮與解壓縮
我們?cè)贖DFS中對(duì)數(shù)據(jù)進(jìn)行壓縮處理來優(yōu)化磁盤使用率,提高數(shù)據(jù)在磁盤和網(wǎng)絡(luò)中的傳輸速度,從而提高系統(tǒng)處理數(shù)據(jù)的效率。
Hadoop應(yīng)對(duì)壓縮格式的技術(shù)是自動(dòng)識(shí)別。如果我們壓縮的文件有相應(yīng)壓縮格式的擴(kuò)展名(比如lzo、gz、bzip2等),Hadoop會(huì)根據(jù)壓縮格式的擴(kuò)展名,自動(dòng)選擇相對(duì)應(yīng)的解碼器來解壓數(shù)據(jù),此過程完全是Hadoop自動(dòng)處理的,我們只須確保輸入的壓縮文件有擴(kuò)展名。
Hadoop在Codec類(org.apache.hadoop.io.compress)中,實(shí)現(xiàn)了壓縮和解壓縮的接口CompressionCodec。可用的Codec實(shí)現(xiàn)類見表3-12。
表3-12 可用的Codec實(shí)現(xiàn)類

CompressionCodec有兩個(gè)方法,可以幫助我們方便地壓縮或解壓數(shù)據(jù)。壓縮數(shù)據(jù)時(shí)使用createOutputStream(OutputStream out)獲取壓縮輸出流對(duì)象CompressionOutputStream,我們將未壓縮的數(shù)據(jù)寫入該流,它會(huì)幫我們壓縮數(shù)據(jù)后,寫出至底層的數(shù)據(jù)流out。
相反地,在解析數(shù)據(jù)的時(shí)候,使用createInputStream(InputStream in)獲取解壓縮輸入流對(duì)象CompressionInputstream,通過它,我們可以從底層的數(shù)據(jù)流中讀取解壓后的數(shù)據(jù)。
CompressionOutputStream、CompressionInputStream與java.util.zip.DeflaterOutputStream、java.util.zip.DeflaterInputStream類似,但是,前者支持重置內(nèi)部的壓縮器(Compressor)與解壓縮器(Decompressor)狀態(tài)。
CompressionCodecFactory是Hadoop壓縮框架中的另一個(gè)類,主要功能是負(fù)責(zé)根據(jù)不同的文件擴(kuò)展名,來自動(dòng)地獲取相對(duì)應(yīng)的壓縮解壓器,使用者可以通過它提供的方法,獲得CompressionCodec,極大地增強(qiáng)了應(yīng)用程序在處理壓縮文件時(shí)的通用性。
除了前面介紹的createInputStream()和createInputStream()方法外,Hadoop中還有其他兩種壓縮模式。
一是壓縮機(jī)Compressor和解壓機(jī)Decompressor。在Hadoop的實(shí)現(xiàn)中,數(shù)據(jù)編碼器和解碼器被抽象成了兩個(gè)接口:org.apache.hadoop.io.compress.Compressor和org.apache.hadoop.io.compress.Decompressor。它們規(guī)定了一系列的方法,所以,在Hadoop內(nèi)部的編碼/解碼算法實(shí)現(xiàn)中都需要實(shí)現(xiàn)對(duì)應(yīng)的接口。在實(shí)際的數(shù)據(jù)壓縮與解壓縮過程中,Hadoop為用戶提供了統(tǒng)一的I/O流處理模式。
二是壓縮流CompressionOutputStream和解壓縮流CompressionInputStream。這兩個(gè)類分別繼承自java.io.OutputStream和java.io.InputStream,作用也類似。
下面,我們編碼實(shí)現(xiàn)文件的壓縮和解壓縮操作。源程序如下:
import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.CompressionInputStream; import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.util.ReflectionUtils; public class CompressTest { /** * 壓縮文件 * @param codecClassName * @param filein, fileout * @throws IOException */ public static void compress(String codecClassName, String filein, String fileout) throws Exception { Class<? > codecClass = Class.forName(codecClassName); Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); CompressionCodec codec = (CompressionCodec)ReflectionUtils .newInstance(codecClass, conf); //指定壓縮文件路徑 FSDataOutputStream outputStream = fs.create(new Path(fileout)); //指定要被壓縮的文件路徑 FSDataInputStream in = fs.open(new Path(filein)); //創(chuàng)建壓縮輸出流 CompressionOutputStream out = codec.createOutputStream(outputStream); IOUtils.copyBytes(in, out, conf); IOUtils.closeStream(in); IOUtils.closeStream(out); } /** * 解壓縮:使用文件擴(kuò)展名來推斷codec * @param fileuri * @throws IOException */ public static void uncompress(String fileuri) throws IOException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(fileuri), conf); Path inputPath = new Path(fileuri); CompressionCodecFactory factory = new CompressionCodecFactory(conf); CompressionCodec codec = factory.getCodec(inputPath); if(codec == null) { System.out.println("no codec for " + fileuri); System.exit(1); } String outputUri = CompressionCodecFactory.removeSuffix( fileuri, codec.getDefaultExtension()); InputStream in = null; OutputStream out = null; try { in = codec.createInputStream(fs.open(inputPath)); out = fs.create(new Path(outputUri)); IOUtils.copyBytes(in, out, conf); } finally { IOUtils.closeStream(out); IOUtils.closeStream(in); } } public static void main(String[]args) throws Exception { String filein = "/hdtest/bigdata.pdf"; String fileout = "/hdtest/bigdatacom.gz"; compress("org.apache.hadoop.io.compress.GzipCodec", filein, fileout); //uncompress(fileout); } }
編譯并打包運(yùn)行。壓縮操作運(yùn)行的結(jié)果如圖3-14所示。

圖3-14 進(jìn)行壓縮操作并查看結(jié)果
解壓縮操作的運(yùn)行結(jié)果如圖3-15所示。

圖3-15 解壓縮的運(yùn)行結(jié)果
- 虛擬儀器設(shè)計(jì)測(cè)控應(yīng)用典型實(shí)例
- 離散事件系統(tǒng)建模與仿真
- 計(jì)算機(jī)網(wǎng)絡(luò)應(yīng)用基礎(chǔ)
- 網(wǎng)絡(luò)綜合布線技術(shù)
- Maya 2012從入門到精通
- 傳感器與物聯(lián)網(wǎng)技術(shù)
- 工業(yè)機(jī)器人維護(hù)與保養(yǎng)
- The DevOps 2.1 Toolkit:Docker Swarm
- Mastering Ansible(Second Edition)
- 未來學(xué)徒:讀懂人工智能飛馳時(shí)代
- 步步驚“芯”
- 項(xiàng)目實(shí)踐精解:C#核心技術(shù)應(yīng)用開發(fā)
- Windows 7來了
- 中老年人學(xué)電腦與上網(wǎng)
- 分布式Java應(yīng)用