官术网_书友最值得收藏!

3.1 HDFS接口與編程

HDFS提供了多種用戶操作和編程接口,既通過Shell命令管理文件與目錄、管理作業(yè)調(diào)度、控制與優(yōu)化集群性能等,也提供了Java、C語言等的編程接口,用戶可以通過編寫程序?qū)DFS進行擴展。

3.1.1 Shell命令

HDFS資源URI的格式如下:

        scheme://authority/path

其中scheme是協(xié)議名,一般是file或hdfs; authority是授權(quán)訪問的主機名或IP; path是訪問路徑。例如:

        hdfs://localhost:9000/user/chunk/test.txt

如果已經(jīng)在core-site.xml里配置了fs.default.name=hdfs://localhost:9000,則僅使用/user/chunk/test.txt即可。

在HDFS的所有接口中,Shell命令行接口最簡單,也是開發(fā)者比較熟悉的方式。我們通過使用“hdfs -help”命令,可以看到HDFS支持的文件系統(tǒng)命令,如圖3-1所示。

圖3-1 HDFS支持的文件系統(tǒng)命令

HDFS支持的文件系統(tǒng)命令主要有兩類。

(1)用戶命令:用于管理HDFS日常操作,如dfs、fsck、fetchdt等。

(2)系統(tǒng)管理命令:主要用于控制和管理HDFS集群,如balancer、namenode、datanode、dfsadmin、secondarynamenode等。限于篇幅,這里只介紹幾種常用的命令模塊。

1. hdfs dfs [GENERIC_OPTIONS][COMMAND_OPTIONS]

“hdfs dfs”提供了類似于Linux Shell一樣的命令集,其用法與Linux Shell基本一致。下面詳細介紹各個命令。

(1)appendToFile。

說明:將一個或者多個本地文件追加到目的文件。成功返回0,錯誤返回1。

格式:hdfs dfs -appendToFile <localsrc> ... <dst>

示例:

        hdfs dfs -appendToFile localfile /user/hadoop/hadoopfile
        hdfs dfs -appendToFile localfile1 localfile2 /user/hadoop/hadoopfile
        hdfs dfs -appendToFile localfile hdfs://nn.example.com/hadoop/hadoopfile

(2)cat。

說明:將路徑指定文件的內(nèi)容輸出到stdout。成功返回0,錯誤返回-1。

格式:hdfs dfs -cat URI [URI ...]

示例:

        hdfs dfs -cat hdfs://nn1.example.com/file1 hdfs://nn2.example.com/file2
        hdfs dfs -cat file:///file3 /user/hadoop/file4

(3)chgrp。

說明:改變文件所屬的用戶組。如果使用-R選項,則這一操作對整個目錄結(jié)構(gòu)遞歸執(zhí)行。使用這一命令的用戶必須是文件的所屬用戶,或者是超級用戶。

格式:hdfs dfs -chgrp [-R]GROUP URI [URI ...]

(4)chmod。

說明:改變文件的權(quán)限。使用-R將使改變在目錄結(jié)構(gòu)下遞歸進行。命令的使用者必須是文件的所有者或者超級用戶。

格式:hdfs dfs -chmod [-R]<MODE[, MODE]... | OCTALMODE> URI [URI ...]

(5)chown。

說明:改變文件的所屬用戶。如果使用-R選項,則這一操作對整個目錄結(jié)構(gòu)遞歸執(zhí)行。使用這一命令的用戶必須是文件在命令變更之前的所屬用戶,或者是超級用戶。

格式:hdfs dfs -chown [-R][OWNER][:[GROUP]]URI [URI ]

(6)copyFromLocal。

說明:從本地復(fù)制,與put命令相似,但限定源路徑是本地的。

格式:hdfs dfs -copyFromLocal <localsrc> URI

(7)copyToLocal。

說明:復(fù)制到本地,與get命令相似,但限定目的路徑是本地的。

格式:hdfs dfs -copyToLocal [-ignorecrc][-crc]URI <localdst>

(8)count。

說明:計算文件、目錄的數(shù)量。成功返回0,錯誤返回-1。

格式:hdfs dfs -count [-q][-h]<paths>

示例:

        hdfs dfs -count hdfs://nn1.example.com/file1 hdfs://nn2.example.com/file2
        hdfs dfs -count -q hdfs://nn1.example.com/file1
        hdfs dfs -count -q -h hdfs://nn1.example.com/file1

(9)cp。

說明:將文件從源路徑復(fù)制到目標路徑。這個命令允許有多個源路徑,但同時,目標路徑必須是一個目錄。成功返回0,錯誤返回-1。

格式:hdfs dfs -cp [-f][-p | -p[topax]]URI [URI ...]<dest>

示例:

        hdfs dfs -cp /user/hadoop/file1 /user/hadoop/file2
        hdfs dfs -cp /user/hadoop/file1 /user/hadoop/file2 /user/hadoop/dir

(10)du。

說明:顯示目錄中所有文件的大小,或者當(dāng)只指定一個文件時,顯示此文件的大小。成功返回0,錯誤返回-1。

格式:hdfs dfs -du [-s][-h]URI [URI ...]

示例:

        hdfs dfs -du /user/hadoop/dir1 /user/hadoop/file1
          hdfs://nn.example.com/user/hadoop/dir1

(11)dus。

說明:顯示文件的大小。此命令可以用“du -s”替代。

格式:hdfs dfs -dus <args>

(12)expunge。

作用:清空回收站。

格式:hdfs dfs -expunge

(13)get。

說明:復(fù)制文件到本地文件系統(tǒng)。可用“-ignorecrc”選項復(fù)制CRC校驗失敗的文件。使用“-crc”選項復(fù)制文件以及CRC信息。成功返回0,錯誤返回-1。

格式:hdfs dfs -get [-ignorecrc][-crc]<src> <localdst>

示例:

        hdfs dfs -get /user/hadoop/file localfile
        hdfs dfs -get hdfs://nn.example.com/user/hadoop/file localfile

(14)getfacl。

說明:顯示文件或者目錄的權(quán)限控制列表。成功返回0,錯誤返回非零值。

格式:hdfs dfs -getfacl [-R]<path>

示例:

        hdfs dfs -getfacl /file
        hdfs dfs -getfacl -R /dir

(15)getfattr。

說明:顯示文件或者目錄的擴展屬性。成功返回0,錯誤返回非零值。

格式:hdfs dfs -getfattr [-R]-n name | -d [-e en]<path>

示例:

        hdfs dfs -getfattr -d /file
        hdfs dfs -getfattr -R -n user.myAttr /dir

(16)getmerge。

說明:接受一個源目錄和一個目標文件作為輸入,并且將源目錄中所有的文件連接成本地目標文件。addnl是可選的,用于指定在每個文件結(jié)尾添加一個換行符。

格式:hdfs dfs -getmerge <src> <localdst> [addnl]

(17)ls。

說明:與Linux中一樣,返回子目錄或子文件列表。成功返回0,錯誤返回-1。

格式:hdfs dfs -ls [-R]<args>

示例:

        hdfs dfs -ls /user/hadoop/file1

(18)lsr。

說明:ls命令的遞歸版本,一般使用“l(fā)s -R”代替。

格式:hdfs dfs -lsr <args>

(19)mkdir。

說明:創(chuàng)建目錄,加-p選項創(chuàng)建多層目錄。成功返回0,錯誤返回-1。

格式:hdfs dfs -mkdir [-p]<paths>

示例:

        hdfs dfs -mkdir /user/hadoop/dir1 /user/hadoop/dir2
        hdfs dfs -mkdir hdfs://nn1.example.com/user/hadoop/dir
          hdfs://nn2.example.com/user/hadoop/dir

(20)moveFromLocal。

說明:類似put,區(qū)別在于put操作完成后刪除。

格式:hdfs dfs -moveFromLocal <localsrc> <dst>

(21)mv。

說明:將文件從源路徑移動到目標路徑。這個命令允許有多個源路徑,此時,目標路徑必須是一個目錄。不允許在不同的文件系統(tǒng)間移動文件。成功返回0,錯誤返回-1。

格式:hdfs dfs -mv URI [URI ...]<dest>

示例:

        hdfs dfs -mv /user/hadoop/file1 /user/hadoop/file2
        hdfs dfs -mv hdfs://nn.example.com/file1 hdfs://nn.example.com/file2
          hdfs://nn.example.com/file3 hdfs://nn.example.com/dir1

(22)put。

說明:從本地文件系統(tǒng)中復(fù)制單個或多個源路徑到目標文件系統(tǒng)。也支持從標準輸入設(shè)備中讀取輸入,寫入目標文件系統(tǒng)。成功返回0,錯誤返回-1。

格式:hdfs dfs -put <localsrc> ... <dst>

示例:

        hdfs dfs -put localfile /user/hadoop/hadoopfile
        hdfs dfs -put localfile1 localfile2 /user/hadoop/hadoopdir
        hdfs dfs -put localfile hdfs://nn.example.com/hadoop/hadoopfile

(23)rm。

說明:刪除指定的文件或目錄。成功返回0,錯誤返回-1。

格式:hdfs dfs -rm [-f][-r|-R][-skipTrash]URI [URI ...]

示例:

        hdfs dfs -rm hdfs://nn.example.com/file /user/hadoop/emptydir

(24)rmr。

說明:rm的遞歸版本,已過時,一般使用“rm -r”代替。

格式:hdfs dfs -rmr [-skipTrash]URI [URI ...]

(25)setfacl。

說明:設(shè)置文件或者目錄的權(quán)限控制列表。成功返回0,錯誤返回非零值。

格式:hdfs dfs -setfacl [-R][-b|-k -m|-x <acl_spec> <path>]|[--set <acl_spec> <path>]

示例:

        hdfs dfs -setfacl -m user:hadoop:rw- /file
        hdfs dfs -setfacl -x user:hadoop /file
        hdfs dfs -setfacl -b /file
        hdfs dfs -setfacl -k /dir
        hdfs dfs -setfacl --set user::rw-, user:hadoop:rw-, group::r--, other::r--
          /file
        hdfs dfs -setfacl -R -m user:hadoop:r-x /dir
        hdfs dfs -setfacl -m default:user:hadoop:r-x /dir

(26)setfattr。

說明:設(shè)置文件或者目錄的擴展屬性。成功返回0,錯誤返回非零值。

格式:hdfs dfs -setfattr -n name [-v value]| -x name <path>

示例:

        hdfs dfs -setfattr -n user.myAttr -v myValue /file
        hdfs dfs -setfattr -n user.noValue /file
        hdfs dfs -setfattr -x user.myAttr /file

(27)setrep。

說明:改變文件和目錄的復(fù)制因子。成功返回0,錯誤返回-1。

格式:hdfs dfs -setrep [-R][-w]<numReplicas> <path>

示例:

        hdfs dfs -setrep -w 3 /user/hadoop/dir1

(28)stat。

說明:返回指定路徑的統(tǒng)計信息。成功返回0,錯誤返回-1。

格式:hdfs dfs -stat URI [URI ...]

示例:

        hdfs dfs -stat path

(29)tail。

說明:將文件尾部1KB的內(nèi)容輸出到stdout。成功返回0,錯誤返回-1。

格式:hdfs dfs -tail [-f]URI

示例:

        hdfs dfs -tail pathname

(30)test

說明:檢查文件。選項“-e”檢查文件是否存在,如果存在則返回0;選項“-z”檢查文件是否為0字節(jié),如果是則返回0;選項“-d”檢查路徑是否為目錄,如果是則返回1,否則返回0。

格式:hdfs dfs -test -[ezd]URI

示例:

        hdfs dfs -test -e filename

(31)text。

說明:將源文件輸出為文本格式。允許的格式是zip和TextRecordInputStream。

格式:hdfs dfs -text <src>

(32)touchz。

說明:創(chuàng)建一個空文件。成功返回0,錯誤返回-1。

格式:hdfs dfs -touchz URI [URI ...]

示例:

        hdfs dfs -touchz pathname

小提示

“hadoop dfs”與“hdfs dfs”都是操作HDFS文件系統(tǒng)的命令,“hadoop dfs”屬于早期版本的格式,已經(jīng)過時,一般使用“hdfs dfs”。

“hadoop fs”也是文件系統(tǒng)操作命令,但使用范圍更廣,能夠操作其他格式文件系統(tǒng),如local、HDFS等,可以在本地與Hadoop分布式文件系統(tǒng)的交互操作中使用。

2. hdfs fsck [GENERIC_OPTIONS]<path> [-list-corruptfileblocks | [-move | -delete| -openforwrite][-files [-blocks [-locations | -racks]]]][-includeSnapshots]

fsck是一個文件系統(tǒng)健康狀況檢查工具,用來檢查各類問題,比如,文件塊丟失等(如圖3-2所示)。但是,注意它不會主動恢復(fù)備份缺失的block,這個是由NameNode單獨的線程異步處理的。

圖3-2 fsck命令的運行結(jié)果

fsck命令的參數(shù)說明見表3-1。

表3-1 fsck參數(shù)的說明

3. hdfs datanode [-regular | -rollback | -rollingupgrade rollback]

運行一個HDFS集群的數(shù)據(jù)節(jié)點。參數(shù)說明見表3-2。

表3-2 hdfs datanode命令參數(shù)的說明

4. hdfs namenode [GENERIC_OPTIONS]

“hdfs namenode”是運行NameNode的命令,是一個比較核心的工具。該命令的主要參數(shù)說明見表3-3。

表3-3 hdfs namenode命令參數(shù)的說明

5. hdfs dfsadmin [GENERIC_OPTIONS]

dfsadmin是一個多任務(wù)的工具,我們可以使用它來獲取HDFS的狀態(tài)信息,以及在HDFS上執(zhí)行的一系列管理操作。該命令的主要參數(shù)說明見表3-4。

表3-4 hdfs dfsadmin命令參數(shù)的說明

續(xù)表

dfsadmin命令的使用示例如圖3-3所示。

圖3-3 dfsadmin命令的使用示例

6. hdfs cacheadmin

管理員和用戶通過“hdfs cacheadmin”命令管理緩存資源。

緩存指令由一個唯一的無重復(fù)的64位整數(shù)ID來標識。即使緩存指令后來被刪除了,ID也不會重復(fù)使用。緩存池由一個唯一的字符串名稱來標識。

(1)增加緩存:addDirective。

用法:hdfs cacheadmin -addDirective -path <path> -pool <pool-name> [-force][-replication<replication>][-ttl <time-to-live>]

參數(shù)說明見表3-5。

表3-5 addDirective的參數(shù)說明

(2)刪除一個緩存:removeDirective。

用法:hdfs cacheadmin -removeDirective <id>

參數(shù)id指定要刪除的緩存指令的ID。刪除時,必須對該指令的緩存池擁有寫權(quán)限。

(3)刪除指定路徑下的每一個緩存:removeDirectives。

用法:hdfs cacheadmin -removeDirectives <path>

參數(shù)path中設(shè)置要刪除的緩存指令的路徑。刪除時必須對該指令的緩存池擁有寫權(quán)限。

(4)緩存列表:listDirectives。

用法:hdfs cacheadmin -listDirectives [-stats][-path <path>][-pool <pool>]

參數(shù)說明見表3-6。

表3-6 listDirectives的參數(shù)說明

(5)新增緩存池:addPool。

用法:hdfs cacheadmin -addPool <name> [-owner <owner>][-group <group>][-mode<mode>][-limit <limit>][-maxTtl <maxTtl>

參數(shù)說明見表3-7。

表3-7 addPool的參數(shù)說明

(6)修改緩存池:modifyPool。

用法:hdfs cacheadmin -modifyPool <name> [-owner <owner>][-group <group>][-mode<mode>][-limit <limit>][-maxTtl <maxTtl>]

參數(shù)說明見表3-8。

表3-8 modifyPool的參數(shù)說明

(7)刪除緩存池:removePool。

用法:hdfs cacheadmin -removePool <name>

參數(shù)name指定要刪除的緩存池的名稱。

(8)緩存池列表:listPools。

用法:hdfs cacheadmin -listPools [-stats][<name>]

參數(shù)說明見表3-9。

表3-9 addPool的參數(shù)說明

7. hdfs balancer [-threshold <threshold>][-policy <policy>]

HDFS集群非常容易出現(xiàn)機器與機器之間磁盤利用率不平衡的情況,尤其是增加新的數(shù)據(jù)節(jié)點時。保證HDFS中的數(shù)據(jù)平衡非常重要。HDFS出現(xiàn)不平衡的狀況將引發(fā)很多問題,比如MapReduce程序無法很好地利用本地計算的優(yōu)勢、機器之間無法達到更好的網(wǎng)絡(luò)帶寬使用率等。

在Hadoop中,包含一個Balancer程序,可以調(diào)節(jié)HDFS集群平衡的狀態(tài)。啟動Balancer服務(wù)時,界面如圖3-4所示。

圖3-4 啟動Balancer服務(wù)

服務(wù)啟動后,集群管理人員可用balancer命令進行分析和再平衡數(shù)據(jù),如圖3-5所示。

圖3-5 可用balancer命令進行分析和再平衡數(shù)據(jù)

參數(shù)threshold是判斷集群是否平衡的目標參數(shù),表示HDFS達到平衡狀態(tài)的磁盤使用率偏差值。默認設(shè)置為10,參數(shù)取值范圍是0~100。如果機器之間磁盤使用率偏差小于10%,我們就認為HDFS集群已經(jīng)達到了平衡的狀態(tài)。

8. hdfs version

hdfs version命令用于查看當(dāng)前系統(tǒng)的版本,運行示例如圖3-6所示。

圖3-6 使用hdfs version命令查看當(dāng)前系統(tǒng)的版本

3.1.2 Java接口操作

由于Hadoop本身就是使用Java語言編寫的,理論上,通過Java API能夠調(diào)用所有的Hadoop文件系統(tǒng)的操作接口。

Hadoop有一個抽象的文件系統(tǒng)概念,在Java抽象類org.apache.hadoop.fs中定義了接口。只要某個文件系統(tǒng)實現(xiàn)了這個接口,那么,它就可以作為Hadoop支持的文件系統(tǒng)。目前Hadoop能夠支持的文件系統(tǒng)如表3-10所示。

表3-10 Hadoop文件類的實現(xiàn)

在Hadoop中,主要是定義了一組分布式文件系統(tǒng)和通用的I/O組件和接口,Hadoop的文件系統(tǒng)準確地應(yīng)該稱作Hadoop I/O。而HDFS是實現(xiàn)該文件接口的Hadoop自帶的分布式文件項目,是對Hadoop I/O接口的實現(xiàn)。在處理大數(shù)據(jù)集時,為實現(xiàn)最優(yōu)性能,通常使用HDFS存儲。

org.apache.hadoop.fs包由接口類(FsConstants、Syncable等)、Java類(AbstractFileSystem、BlockLocation、FileSystem、FileUtil、FSDataInputStream等)、枚舉類型(如CreateFlag)、異常類(ChecksumException、InvalidPathException等)和錯誤類(如FSError)組成。每個子對象中都定義了相應(yīng)的方法,通過對org.apache.hadoop.fs包的封裝與調(diào)用,可以拓展HDFS應(yīng)用,更好地幫助用戶使用集群海量存儲。

在介紹Java接口操作之前,先介紹幾個常用的Java類。

(1)FileSystem。

org.apache.hadoop.fs.FileSystem,通用文件系統(tǒng)基類,用于與HDFS文件系統(tǒng)交互,編寫的HDFS程序都需要重寫FileSystem類。通過FileSystem,可以非常方便地像操作本地文件系統(tǒng)一樣操作HDFS集群文件。

FileSystem提供了get方法,一個是通過配置文件獲取與HDFS的連接;一個是通過URL指定配置文件,獲取與HDFS的連接,URL的格式為hdfs://namenode/xxx.xml。

方法的原型如下:

        public static FileSystem get(Configuration conf) throws IOException;
        public static FileSystem get(URI uri, Configuration conf) throws IOException;
        public static FileSystem get(final URI uri, final Configuration conf, final
          String user) throws IOException, InterruptedException;

其中,Configuration(org.apache.hadoop.conf.Configuration)類對象封裝了客戶端或服務(wù)器的配置;URI是指文件在HDFS里存放的路徑。

(2)FSDataInputStream。

org.apache.hadoop.fs.FSDataInputStream,文件輸入流,用于讀取HDFS文件,它是Java中DataInputStream的派生類,支持從任意位置讀取流式數(shù)據(jù)。

常用的讀取方法是從指定的位置,讀取指定大小的數(shù)據(jù)至緩存區(qū)。方法如下所示:

        int read(long position, byte[]buffer, int offset, int length)

還有用于隨時定位的方法,可以定位到指定的讀取點,如下所示:

        void seek(long desired)

通過long getPos()方法,還可以獲取當(dāng)前的讀取點。

(3)FSDataOutputStream。

org.apache.hadoop.fs.FSDataOutputStream,文件輸出流,是DataOutputStream的派生類,通過這個類,能夠向HDFS順序?qū)懭霐?shù)據(jù)流。

通常的寫入方法為write,如下所示:

        public void write(int b)

獲取當(dāng)前寫入點的函數(shù)為long getPos()。

(4)Path。

org.apache.hadoop.fs.Path,文件與目錄定位類,用于定義HDFS集群中指定的目錄與文件絕對或相對路徑。

可以通過多種方式構(gòu)造Path,如通過URL的模式,通常編寫方式為:

        hdfs://ip:port/directory/filename

Path可以與FileSystem的open函數(shù)相關(guān)聯(lián),通過Path構(gòu)造訪問路徑,用FileSystem進行訪問。

(5)FileStatus。

org.apache.hadoop.fs.FileStatus,文件狀態(tài)顯示類,可以獲取文件與目錄的元數(shù)據(jù)、長度、塊大小、所屬用戶、編輯時間等信息,同時,可以設(shè)置文件用戶、權(quán)限等內(nèi)容。

FileStatus有很多get與set方法,如獲取文件長度的long getLen()方法、設(shè)置文件權(quán)限的setPermission(FsPermission permission)方法等。

下面,我們開始Hadoop的Java操作之旅。

1.創(chuàng)建文件

FileSystem類里提供了很多API,用來創(chuàng)建文件,其中,最簡單的一個是:

        public FSDataOutputStream create(Path f) throws IOException;

它創(chuàng)建一個Path類代表的文件,并返回一個輸出流。這個方法有多個重載方法,可以用來設(shè)置是否覆蓋已有文件、該文件復(fù)制的份數(shù)、寫入時的緩沖區(qū)大小、文件塊大小(block)、權(quán)限等。默認情況下,如果Path中文件的父目錄(或者更上一級目錄)不存在,這些目錄會被自動創(chuàng)建。

2.讀取數(shù)據(jù)

通過調(diào)用FileSystem實例的open方法打開文件,得到一個輸入流。下面是使用FileSystem類讀取HDFS中文件內(nèi)容的完整程序:

        import java.net.URI;
        import org.apache.hadoop.conf.Configuration;
        import org.apache.hadoop.fs.FSDataInputStream;
        import org.apache.hadoop.fs.FileSystem;
        import org.apache.hadoop.fs.Path;
        import org.apache.hadoop.io.IOUtils;
        public class FileSystemCat {
            public static void main(String[]args) throws Exception {
              String uri = args[0];
              Configuration conf = new Configuration();
              FileSystem fs = FileSystem.get(URI.create(uri), conf);
              FSDataInputStream in = null;
              try {
                  in = fs.open(new Path(uri));
                  IOUtils.copyBytes(in, System.out, 4096, false);
              } finally {
                  IOUtils.closeStream(in);
              }
            }
        }

此外,F(xiàn)SDataInputStream類同時也實現(xiàn)了PositionedReadable(org.apache.hadoop.fs. PositionedReadable)接口,接口中定義的三個方法允許在任意位置讀取文件的內(nèi)容:

        public int read(long position, byte[]buffer, int offset, int length) throws IOException;
        public void readFully(long position, byte[]buffer, int offset, int length)
          throws IOException;
        public void readFully(long position, byte[]buffer) throws IOException;

結(jié)合第2章內(nèi)容,下面我們結(jié)合程序?qū)崿F(xiàn)深入剖析HDFS讀文件時的數(shù)據(jù)流向過程。

(1)客戶端通過調(diào)用FileSystem.open()方法打開一個文件,對于HDFS來講,其實是調(diào)用DistributedFileSystem實例的open方法。

(2)DistributedFileSystem通過遠程方法調(diào)用訪問NameNode,獲取該文件的前幾個blocks所在的位置信息;針對每個block, NameNode都會返回有該block數(shù)據(jù)信息的所有DataNodes節(jié)點,比如配置的dfs.replication為3,就會每個block返回3個DataNodes節(jié)點信息,這些節(jié)點是按距離客戶端的遠近排序的,如果發(fā)起讀文件的客戶端就在包含該block的DataNode上,那么這個DataNode就排第一位(這種情況在做Map任務(wù)時常見),客戶端就會從本機讀取數(shù)據(jù)。

DistributedFileSystem的open方法返回一個FSDataInputStream, FSDataInputStream里包裝著一個DFSInputStream, DFSInputStream真正管理DataNodes和NameNode的I/O。

(3)客戶端調(diào)用FSDataInputStream.read()方法,F(xiàn)SDataInputStream里已經(jīng)緩存了該文件前幾個block所在的DataNode的地址,于是,從第一個block的第一個地址(也就是最近的DataNode)開始連接讀取。

(4)反復(fù)調(diào)用read()方法,數(shù)據(jù)不斷地從DataNode流向客戶端。

(5)當(dāng)一個block的數(shù)據(jù)讀完時,DFSInputStream會關(guān)閉當(dāng)前DataNode的連接,打開下一個block所在的最優(yōu)DataNode的連接繼續(xù)讀取;這些對客戶端是透明的,在客戶端看來,就是在讀一個連續(xù)的流。

(6)這樣,一個block一個block地讀下去,當(dāng)需要使用更多block的存儲信息時,DFSInputStream會再次調(diào)用NameNode,獲取下一批block的存儲位置信息,直到客戶端停止讀取,調(diào)用FSDataInputStream.close()方法,整個讀取過程結(jié)束。

小提示

文件操作還可以使用Hadoop URL的方式,示例代碼如下:

        import java.io.InputStream;
        import java.net.URL;
        import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
        import org.apache.hadoop.io.IOUtils;
        public class URLCat {
            static {
              URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
            }
            public static void main(String[]args) throws Exception {
              InputStream in = null;
              try {
                  in = new URL(args[0]).openStream();
                  IOUtils.copyBytes(in, System.out, 4096, false);
              } finally {
                  IOUtils.closeStream(in);
              }
            }
        }

在上面的程序中,先設(shè)置URLStreamHandlerFactory,然后通過URL打開一個流,讀取流,就得到了文件的內(nèi)容,通過IOUtils.copyBytes()把讀到的內(nèi)容寫出到標準輸出流里,也就是控制臺上,從而實現(xiàn)了類似于Linux里的cat命令的功能。最后關(guān)閉輸入流。

3.寫入數(shù)據(jù)

與讀操作類似,Hadoop對于寫操作也提供了一個類:FSDataOutputStream,這個類重載了很多java.io.DataOutputStream的write方法,用于寫入很多類型的數(shù)據(jù),比如int、char、字節(jié)數(shù)組等。

HDFS寫文件的示例代碼如下:

        FileSystem hdfs = FileSystem.get(new Configuration());
        Path path = new Path("/testfile");
        FSDataOutputStream dos = hdfs.create(path);
        byte[]readBuf = "Hello World".getBytes("UTF-8");
        dos.write(readBuf, 0, readBuf.length);
        dos.close();
        hdfs.close();

如果希望向已有文件追加內(nèi)容,可以調(diào)用:

        public FSDataOutputStream append(Path f) throws IOException;

如果文件不存在時,append方法也可以用來新建一個文件。

下面,我們結(jié)合以上的程序,深入剖析HDFS寫文件時的數(shù)據(jù)流向過程。

(1)客戶端調(diào)用DistributedFileSystem.create()方法創(chuàng)建一個文件。

(2)DistributedFileSystem向NameNode發(fā)起遠程方法調(diào)用,創(chuàng)建一個文件,但是,NameNode沒有把它關(guān)聯(lián)到任何block上去;NameNode在這一步做了很多檢查工作,保證該文件當(dāng)前不存在,客戶端有創(chuàng)建該文件的權(quán)限等。如果這些檢查都通過了,NameNode創(chuàng)建一條新文件記錄;否則,創(chuàng)建失敗,客戶端返回IOException。DistributedFileSystem返回一個FSDataOutputStream,像讀文件時一樣,這個FSDataOutputStream里包裝著一個DFSOutputStream,由它來實際處理與DataNodes和NameNode的通信。

(3)客戶端向DFSOutputStream里寫數(shù)據(jù),DFSOutputStream把數(shù)據(jù)分成包,丟進一個稱為data queue的隊列中。DataStreamer負責(zé)向NameNode申請新的block,新的block被分配在了一個或多個(默認為3個)節(jié)點上,這些節(jié)點就形成一個管道。

(4)DataStreamer把data queue里的包拿出來,通過管道輸送給第1個節(jié)點,第1個節(jié)點再通過管道輸送給第2個節(jié)點,第2個再輸送給第3個。以此類推。

(5)DFSOutputStream同時還在內(nèi)部維護一個通知隊列,名叫ack queue,里面保存發(fā)過的數(shù)據(jù)包。一個包只有被所有管道上的DataNodes通知收到了,才會被移除。如果任意一個DataNode接收失敗了,首先,管道關(guān)閉,然后把ack queue里的包都放回到data queue的頭部,以便使失敗節(jié)點的下游節(jié)點不會丟失這些數(shù)據(jù)。打開管道,把壞節(jié)點移除,數(shù)據(jù)會繼續(xù)向其他好節(jié)點輸送,直到管道上的節(jié)點都完成了。如果少復(fù)制了一個節(jié)點,向NameNode報告一下,說現(xiàn)在這個block沒有達到設(shè)定的副本數(shù),然后就返回成功了,后期,NameNode會組織一個異步的任務(wù),把副本數(shù)恢復(fù)到設(shè)定值。然后,接下來的數(shù)據(jù)包和數(shù)據(jù)塊正常寫入。

如果多個DataNodes都失敗了,會檢測hdfs-site.xml里的dfs.replication.min參數(shù),默認值是1,意思是只要有1個DataNode接收成功,就認為數(shù)據(jù)寫入成功了。客戶端就會收到寫入成功的返回。后期,Hadoop會發(fā)起異步任務(wù)把副本數(shù)恢復(fù)到dfs.replication設(shè)置的值。

以上操作對客戶端都是透明的,客戶端不知道發(fā)生了這些事情,只知道寫文件成功了。

(6)當(dāng)客戶端完成數(shù)據(jù)寫入后,調(diào)用流的close()方法,這個操作把data queue里的所有剩余的包都發(fā)給管道。

(7)等所有包都收到了寫成功的反饋后,客戶端通知NameNode寫文件完成了。因為DataStream寫文件前就先向NameNode申請block的位置信息了,所以寫文件完成時,NameNode已知道每個block的位置信息,它只需等最小的副本數(shù)寫成功,就可以返回成功。

4.文件讀寫位置

讀取文件時(FSDataInputStream),允許使用seek()方法在文件中定位。支持隨機訪問,理論上,可以從流的任何位置讀取數(shù)據(jù),但調(diào)用seek()方法的開銷是相當(dāng)巨大的,應(yīng)該盡量少調(diào)用,盡可能地使程序做到順序讀。

由于HDFS只允許對一個打開的文件順序?qū)懭耄蛳蛞粋€已有文件的尾部追加,不允許在任意位置寫,F(xiàn)SDataOutputStream沒有seek方法。但FSDataOutputStream類提供了一個getPos()方法,可以查詢當(dāng)前在往文件的哪個位置寫的寫入偏移量:

        public long getPos() throws IOException;

5.重命名

通過FileSystem.rename()方法,可為指定的HDFS文件重命名:

        protected void rename(Path src, Path dst, Options) throws IOException;

示例代碼實現(xiàn)如下:

        Configuration conf = new Configuration();
        FileSystem hdfs = FileSystem.get(conf);
        Path frpath = new Path("/test");   //舊的文件名
        Path topath = new Path("/testNew");   //新的文件名
        boolean isRename = hdfs.rename(frpath, topath);
        String result = isRename? "成功" : "失敗";

6.刪除操作

通過FileSystem.delete()方法刪除指定的HDFS文件或目錄(永久刪除):

        public boolean delete(Path f, boolean recursive) throws IOException;

其中,f為需要刪除文件的完整路徑,recursive用來確定是否進行遞歸刪除。如果f是一個文件或空目錄,則不論recursive是何值,都刪除。如果f是一個非空目錄,則recursive為true時,目錄下內(nèi)容全部刪除;如果recursive為false,不刪除,并拋出IOException。

示例代碼實現(xiàn)如下:

        Path f = new Path(fileName);
        boolean isExists = hdfs.exists(f);
        if (isExists) { //if exists, delete
            boolean isDel = hdfs.delete(f, true);
            System.out.println(fileName + "  delete? \t" + isDel);
        } else {
            System.out.println(fileName + "  exist? \t" + isExists);
        }

7.文件夾操作

FileSystem中創(chuàng)建文件夾的方法如下:

        public boolean mkdirs(Path f) throws IOException;

與java.io.File.mkdirs方法一樣,創(chuàng)建目錄的同時,默認地創(chuàng)建缺失的父目錄。我們一般不需要創(chuàng)建目錄,一般在創(chuàng)建文件時,默認地就把所需的目錄都創(chuàng)建好了。

目錄創(chuàng)建的示例代碼實現(xiàn)如下:

        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        Path srcPath = new Path(path);
        boolean isok = fs.mkdirs(srcPath);
        if(isok) {
            System.out.println("create dir ok! ");
        } else {
            System.out.println("create dir failure");
        }
        fs.close();

使用FileSystem的listStatus()方法能夠列出某個目錄中的所有文件:

        public FileStatus[]listStatus(Path f) throws IOException
        public FileStatus[]listStatus(Path f, PathFilter filter) throws IOException
        public FileStatus[]listStatus(Path[]files) throws IOException
        public FileStatus[]listStatus(Path[]files, PathFilter filter)
          throws IOException

這一組方法都接收Path參數(shù),如果Path是一個文件,返回值是一個數(shù)組,數(shù)組里只有一個元素,是這個Path代表的文件的FileStatus對象;如果Path是一個目錄,返回值數(shù)組是該目錄下的所有文件和目錄的FileStatus組成的數(shù)組,有可能是一個0長數(shù)組;如果參數(shù)是Path[],則返回值相當(dāng)于多次調(diào)用單Path,然后把返回值整合到一個數(shù)組里;如果參數(shù)中包含PathFilter,則PathFilter會對返回的文件或目錄進行過濾,返回滿足條件的文件或目錄,條件由開發(fā)者自行定義。

FileSystem的globStatus方法利用通配符來列出文件和目錄:

        public FileStatus[]globStatus(Path pathPattern) throws IOException;
        public FileStatus[]globStatus(Path pathPattern, PathFilter filter)
          throws IOException;

文件夾刪除操作與文件刪除類似。

其他關(guān)于文件夾的操作方法還有FileSystem.getWorkingDirectory(返回當(dāng)前工作目錄)、FileSystem.setWorkingDirectory(更改當(dāng)前工作目錄)等。

8.屬性操作

FileSystem類中的getFileStatus()方法返回一個FileStatus實例,該FileStatus實例中,包含了該Path(文件或目錄)的元數(shù)據(jù)信息:文件大小、block大小、復(fù)制的份數(shù)、最后修改時間、所有者、權(quán)限等。示例代碼實現(xiàn)如下:

        FileStatus status = fs.getFileStatus(path);
        System.out.println("path = " + status.getPath());
        System.out.println("owner = " + status.getOwner());        System.out.println("block size = " + status.getBlockSize());
        System.out.println("permission = " + status.getPermission());
        System.out.println("replication = " + status.getReplication());

3.1.3 WebHDFS

Hadoop提供的Java Native API支持對文件或目錄的操作,為開發(fā)者提供了極大的便利。為滿足許多外部應(yīng)用程序操作HDFS文件系統(tǒng)的需求,Hadoop提供了兩種基于HTTP方式的接口:一是用于瀏覽文件系統(tǒng)的Web界面;另一個是WebHDFS REST API接口。

啟動HDFS時,NameNode和DataNode各自啟動了一個內(nèi)置的Web服務(wù)器,顯示了集群當(dāng)前的基本狀態(tài)和信息。默認配置下NameNode的首頁地址是http://namenode-name:50070/。這個頁面列出了集群里的所有DataNode和集群的基本狀態(tài)。

這個Web界面也可以用來瀏覽整個文件系統(tǒng)。使用NameNode首頁上的Browse the file system鏈接,輸入需要查看的目錄地址,即可看到,如圖3-7所示。

圖3-7 Web界面

WebHDFS基于HTTP,通過GET、PUT、POST和DELETE等操作,支持FileSystem/FileContext的全部API。具體操作類型見表3-11。

表3-11 WebHDFS的操作

在使用WebHDFS REST API接口前,要先對Hadoop進行配置和授權(quán)認證。編輯hdfs-site.xml文件,添加啟用WebHDFS(dfs.webhdfs.enabled)、kerberos驗證(dfs.web. authentication.kerberos.principal、dfs.web.authentication.kerberos.keytab)等屬性配置。配置完成后,啟動WebHDFS服務(wù)即可,如圖3-8所示。

圖3-8 啟動WebHDFS服務(wù)

WebHDFS默認的HTTP服務(wù)端口是14000。需要說明的是,WebHDFS的FileSystem模式是“webhdfs://”, URI的格式如下:

        webhdfs://<HOST>:<HTTP_PORT>/<PATH>

與之對應(yīng)的HDFS URI格式如下:

        hdfs://<HOST>:<RPC_PORT>/<PATH>

在REST API接口中,在path之前插入前綴“/webhdfs/v1”,操作語句被追加到最后,相應(yīng)的HTTP URL格式如下:

        http://<HOST>:<HTTP_PORT>/webhdfs/v1/<PATH>? op=...

下面我們以具體實例,來測試一下WebHDFS的功能。使用curl命令工具在HDFS根目錄下創(chuàng)建一個名為“webdir”的目錄,如圖3-9所示。

圖3-9 WebHDFS創(chuàng)建目錄的運行結(jié)果

3.1.4 其他接口

HDFS支持的使用接口除了前面介紹過的Java等以外,還有C、Thrift、HttpFS、HFTP、NFS等。下面簡單介紹幾種。

1. C接口

HDFS基于Java編寫,并沒有提供原生的C語言訪問接口,但HDFS提供了基于JNI(Java Native Interface)的C調(diào)用接口libhdfs,使C語言訪問HDFS成為可能。

libhdfs接口的頭文件和庫文件已包含在Hadoop發(fā)行版本中,可以直接使用。它的頭文件hdfs.h一般位于{HADOOP_HOME}/include目錄中,而其庫文件libhdfs.so通常則位于{HADOOP_HOME}/lib/native目錄中。不同的版本,庫文件所在位置稍有不同。

通過libhdfs訪問HDFS文件系統(tǒng)與使用C語言API訪問普通操作系統(tǒng)的文件系統(tǒng)類似。C++訪問HDFS的方式也與C語言類似。接口主要如下。

(1)建立、關(guān)閉與HDFS連接:hdfsConnect()、hdfsConnectAsUser()、hdfsDisconnect()。

(2)打開、關(guān)閉HDFS文件:hdfsOpenFile()、hdfsCloseFile()。當(dāng)用hdfsOpenFile()創(chuàng)建文件時,可以指定replication和blocksize參數(shù)。

(3)讀HDFS文件:hdfsRead()、hdfsPread()。

(4)寫HDFS文件:hdfsWrite()。HDFS不支持隨機寫,只能是從文件頭順序?qū)懭搿?/p>

(5)查詢HDFS文件信息:hdfsGetPathInfo()。

(6)查詢數(shù)據(jù)塊所在節(jié)點信息:hdfsGetHosts()。返回一個或多個數(shù)據(jù)塊所在數(shù)據(jù)節(jié)點的信息,一個數(shù)據(jù)塊可能存在于多個數(shù)據(jù)節(jié)點上。

libhdfs中的函數(shù)是通過JNI調(diào)用Java虛擬機的,在虛擬機中構(gòu)造對應(yīng)的HDFS的Java類,然后反射調(diào)用該類的功能函數(shù),占用內(nèi)存較多,不適合對虛擬要求較高的場景。

下面是一個簡單的例子:

        #include "hdfs.h"
        int main(int argc, char **argv) {
            hdfsFS fs = hdfsConnect("default", 0);
            const char *writePath = "/tmp/testfile.txt";
            hdfsFile writeFile =
              hdfsOpenFile(fs, writePath, O_WRONLY|O_CREAT, 0, 0, 0);
            if(! writeFile) {
              fprintf(stderr, "Failed to open %s for writing! \n", writePath);
              exit(-1);
            }
            char *buffer = "Hello, World! ";
            tSize num_written_bytes =
              hdfsWrite(fs, writeFile, (void*)buffer, strlen(buffer)+1);
            if (hdfsFlush(fs, writeFile)) {
              fprintf(stderr, "Failed to 'flush' %s\n", writePath);
              exit(-1);
            }
            hdfsCloseFile(fs, writeFile);
        }

2. HFTP

HFTP是一個可以實現(xiàn)從遠程HDFS集群讀取Hadoop文件系統(tǒng)數(shù)據(jù)的接口。HFTP默認是打開的,數(shù)據(jù)讀取通過HTTP協(xié)議,允許以瀏覽器的方式訪問和下載所有文件。這種方式帶來便利的同時,也存在一定的安全隱患。

HFTP是一個只讀的文件系統(tǒng),如果試圖用它寫或者修改文件系統(tǒng)的狀態(tài),將會拋出一個錯誤。如果使用多個不同版本的HDFS集群時,需要在集群之間移動數(shù)據(jù),HFTP是非常有用的。HFTP在不同HDFS版本之間都是兼容的,通常與distcp結(jié)合使用實現(xiàn)并行復(fù)制。

HSFTP是HFTP的一個擴展,默認使用HTTPS在傳輸時加密數(shù)據(jù)。

3. HttpFS

HttpFS是Cloudera公司提供的一個Web應(yīng)用,一般部署在內(nèi)嵌的Web服務(wù)器中,但獨立于Hadoop的NameNode。

HttpFS是提供REST HTTP接口的服務(wù)器,可以支持全部HDFS文件系統(tǒng)操作。通過WebHDFS REST API,可以對HDFS進行讀寫等訪問操作。與WebHDFS的區(qū)別是,不需要客戶端,就可以訪問Hadoop集群的每一個節(jié)點。

通過HttpFS,可以訪問放置在防火墻后面的Hadoop集群數(shù)據(jù)。HttpFS可以作為一個網(wǎng)關(guān)角色,是唯一可以穿過防火墻訪問內(nèi)部集群數(shù)據(jù)的系統(tǒng)。

HttpFS的內(nèi)置安全特性支持Hadoop偽身份驗證和HTTP SPNEGO Kerberos及其他插件式(pluggable)驗證機制。它還提供了對Hadoop代理用戶的支持。

主站蜘蛛池模板: 阿拉善左旗| 尼玛县| 曲阜市| 渭南市| 邵武市| 体育| 崇阳县| 互助| 晋江市| 金乡县| 炎陵县| 临武县| 邵阳县| 孙吴县| 青阳县| 常州市| 香河县| 同心县| 诏安县| 额尔古纳市| 三原县| 柏乡县| 彰化县| 从江县| 綦江县| 博乐市| 民勤县| 顺义区| 嘉禾县| 合江县| 宣化县| 即墨市| 平原县| 铁岭县| 乐亭县| 黔西| 山西省| 五华县| 西平县| 筠连县| 济阳县|