- 大數(shù)據(jù):從基礎(chǔ)理論到最佳實踐
- 祁偉
- 9146字
- 2021-01-07 18:48:00
3.1 HDFS接口與編程
HDFS提供了多種用戶操作和編程接口,既通過Shell命令管理文件與目錄、管理作業(yè)調(diào)度、控制與優(yōu)化集群性能等,也提供了Java、C語言等的編程接口,用戶可以通過編寫程序?qū)DFS進行擴展。
3.1.1 Shell命令
HDFS資源URI的格式如下:
scheme://authority/path
其中scheme是協(xié)議名,一般是file或hdfs; authority是授權(quán)訪問的主機名或IP; path是訪問路徑。例如:
hdfs://localhost:9000/user/chunk/test.txt
如果已經(jīng)在core-site.xml里配置了fs.default.name=hdfs://localhost:9000,則僅使用/user/chunk/test.txt即可。
在HDFS的所有接口中,Shell命令行接口最簡單,也是開發(fā)者比較熟悉的方式。我們通過使用“hdfs -help”命令,可以看到HDFS支持的文件系統(tǒng)命令,如圖3-1所示。

圖3-1 HDFS支持的文件系統(tǒng)命令
HDFS支持的文件系統(tǒng)命令主要有兩類。
(1)用戶命令:用于管理HDFS日常操作,如dfs、fsck、fetchdt等。
(2)系統(tǒng)管理命令:主要用于控制和管理HDFS集群,如balancer、namenode、datanode、dfsadmin、secondarynamenode等。限于篇幅,這里只介紹幾種常用的命令模塊。
1. hdfs dfs [GENERIC_OPTIONS][COMMAND_OPTIONS]
“hdfs dfs”提供了類似于Linux Shell一樣的命令集,其用法與Linux Shell基本一致。下面詳細介紹各個命令。
(1)appendToFile。
說明:將一個或者多個本地文件追加到目的文件。成功返回0,錯誤返回1。
格式:hdfs dfs -appendToFile <localsrc> ... <dst>
示例:
hdfs dfs -appendToFile localfile /user/hadoop/hadoopfile hdfs dfs -appendToFile localfile1 localfile2 /user/hadoop/hadoopfile hdfs dfs -appendToFile localfile hdfs://nn.example.com/hadoop/hadoopfile
(2)cat。
說明:將路徑指定文件的內(nèi)容輸出到stdout。成功返回0,錯誤返回-1。
格式:hdfs dfs -cat URI [URI ...]
示例:
hdfs dfs -cat hdfs://nn1.example.com/file1 hdfs://nn2.example.com/file2 hdfs dfs -cat file:///file3 /user/hadoop/file4
(3)chgrp。
說明:改變文件所屬的用戶組。如果使用-R選項,則這一操作對整個目錄結(jié)構(gòu)遞歸執(zhí)行。使用這一命令的用戶必須是文件的所屬用戶,或者是超級用戶。
格式:hdfs dfs -chgrp [-R]GROUP URI [URI ...]
(4)chmod。
說明:改變文件的權(quán)限。使用-R將使改變在目錄結(jié)構(gòu)下遞歸進行。命令的使用者必須是文件的所有者或者超級用戶。
格式:hdfs dfs -chmod [-R]<MODE[, MODE]... | OCTALMODE> URI [URI ...]
(5)chown。
說明:改變文件的所屬用戶。如果使用-R選項,則這一操作對整個目錄結(jié)構(gòu)遞歸執(zhí)行。使用這一命令的用戶必須是文件在命令變更之前的所屬用戶,或者是超級用戶。
格式:hdfs dfs -chown [-R][OWNER][:[GROUP]]URI [URI ]
(6)copyFromLocal。
說明:從本地復(fù)制,與put命令相似,但限定源路徑是本地的。
格式:hdfs dfs -copyFromLocal <localsrc> URI
(7)copyToLocal。
說明:復(fù)制到本地,與get命令相似,但限定目的路徑是本地的。
格式:hdfs dfs -copyToLocal [-ignorecrc][-crc]URI <localdst>
(8)count。
說明:計算文件、目錄的數(shù)量。成功返回0,錯誤返回-1。
格式:hdfs dfs -count [-q][-h]<paths>
示例:
hdfs dfs -count hdfs://nn1.example.com/file1 hdfs://nn2.example.com/file2 hdfs dfs -count -q hdfs://nn1.example.com/file1 hdfs dfs -count -q -h hdfs://nn1.example.com/file1
(9)cp。
說明:將文件從源路徑復(fù)制到目標路徑。這個命令允許有多個源路徑,但同時,目標路徑必須是一個目錄。成功返回0,錯誤返回-1。
格式:hdfs dfs -cp [-f][-p | -p[topax]]URI [URI ...]<dest>
示例:
hdfs dfs -cp /user/hadoop/file1 /user/hadoop/file2 hdfs dfs -cp /user/hadoop/file1 /user/hadoop/file2 /user/hadoop/dir
(10)du。
說明:顯示目錄中所有文件的大小,或者當(dāng)只指定一個文件時,顯示此文件的大小。成功返回0,錯誤返回-1。
格式:hdfs dfs -du [-s][-h]URI [URI ...]
示例:
hdfs dfs -du /user/hadoop/dir1 /user/hadoop/file1 hdfs://nn.example.com/user/hadoop/dir1
(11)dus。
說明:顯示文件的大小。此命令可以用“du -s”替代。
格式:hdfs dfs -dus <args>
(12)expunge。
作用:清空回收站。
格式:hdfs dfs -expunge
(13)get。
說明:復(fù)制文件到本地文件系統(tǒng)。可用“-ignorecrc”選項復(fù)制CRC校驗失敗的文件。使用“-crc”選項復(fù)制文件以及CRC信息。成功返回0,錯誤返回-1。
格式:hdfs dfs -get [-ignorecrc][-crc]<src> <localdst>
示例:
hdfs dfs -get /user/hadoop/file localfile hdfs dfs -get hdfs://nn.example.com/user/hadoop/file localfile
(14)getfacl。
說明:顯示文件或者目錄的權(quán)限控制列表。成功返回0,錯誤返回非零值。
格式:hdfs dfs -getfacl [-R]<path>
示例:
hdfs dfs -getfacl /file hdfs dfs -getfacl -R /dir
(15)getfattr。
說明:顯示文件或者目錄的擴展屬性。成功返回0,錯誤返回非零值。
格式:hdfs dfs -getfattr [-R]-n name | -d [-e en]<path>
示例:
hdfs dfs -getfattr -d /file hdfs dfs -getfattr -R -n user.myAttr /dir
(16)getmerge。
說明:接受一個源目錄和一個目標文件作為輸入,并且將源目錄中所有的文件連接成本地目標文件。addnl是可選的,用于指定在每個文件結(jié)尾添加一個換行符。
格式:hdfs dfs -getmerge <src> <localdst> [addnl]
(17)ls。
說明:與Linux中一樣,返回子目錄或子文件列表。成功返回0,錯誤返回-1。
格式:hdfs dfs -ls [-R]<args>
示例:
hdfs dfs -ls /user/hadoop/file1
(18)lsr。
說明:ls命令的遞歸版本,一般使用“l(fā)s -R”代替。
格式:hdfs dfs -lsr <args>
(19)mkdir。
說明:創(chuàng)建目錄,加-p選項創(chuàng)建多層目錄。成功返回0,錯誤返回-1。
格式:hdfs dfs -mkdir [-p]<paths>
示例:
hdfs dfs -mkdir /user/hadoop/dir1 /user/hadoop/dir2 hdfs dfs -mkdir hdfs://nn1.example.com/user/hadoop/dir hdfs://nn2.example.com/user/hadoop/dir
(20)moveFromLocal。
說明:類似put,區(qū)別在于put操作完成后刪除。
格式:hdfs dfs -moveFromLocal <localsrc> <dst>
(21)mv。
說明:將文件從源路徑移動到目標路徑。這個命令允許有多個源路徑,此時,目標路徑必須是一個目錄。不允許在不同的文件系統(tǒng)間移動文件。成功返回0,錯誤返回-1。
格式:hdfs dfs -mv URI [URI ...]<dest>
示例:
hdfs dfs -mv /user/hadoop/file1 /user/hadoop/file2 hdfs dfs -mv hdfs://nn.example.com/file1 hdfs://nn.example.com/file2 hdfs://nn.example.com/file3 hdfs://nn.example.com/dir1
(22)put。
說明:從本地文件系統(tǒng)中復(fù)制單個或多個源路徑到目標文件系統(tǒng)。也支持從標準輸入設(shè)備中讀取輸入,寫入目標文件系統(tǒng)。成功返回0,錯誤返回-1。
格式:hdfs dfs -put <localsrc> ... <dst>
示例:
hdfs dfs -put localfile /user/hadoop/hadoopfile hdfs dfs -put localfile1 localfile2 /user/hadoop/hadoopdir hdfs dfs -put localfile hdfs://nn.example.com/hadoop/hadoopfile
(23)rm。
說明:刪除指定的文件或目錄。成功返回0,錯誤返回-1。
格式:hdfs dfs -rm [-f][-r|-R][-skipTrash]URI [URI ...]
示例:
hdfs dfs -rm hdfs://nn.example.com/file /user/hadoop/emptydir
(24)rmr。
說明:rm的遞歸版本,已過時,一般使用“rm -r”代替。
格式:hdfs dfs -rmr [-skipTrash]URI [URI ...]
(25)setfacl。
說明:設(shè)置文件或者目錄的權(quán)限控制列表。成功返回0,錯誤返回非零值。
格式:hdfs dfs -setfacl [-R][-b|-k -m|-x <acl_spec> <path>]|[--set <acl_spec> <path>]
示例:
hdfs dfs -setfacl -m user:hadoop:rw- /file hdfs dfs -setfacl -x user:hadoop /file hdfs dfs -setfacl -b /file hdfs dfs -setfacl -k /dir hdfs dfs -setfacl --set user::rw-, user:hadoop:rw-, group::r--, other::r-- /file hdfs dfs -setfacl -R -m user:hadoop:r-x /dir hdfs dfs -setfacl -m default:user:hadoop:r-x /dir
(26)setfattr。
說明:設(shè)置文件或者目錄的擴展屬性。成功返回0,錯誤返回非零值。
格式:hdfs dfs -setfattr -n name [-v value]| -x name <path>
示例:
hdfs dfs -setfattr -n user.myAttr -v myValue /file hdfs dfs -setfattr -n user.noValue /file hdfs dfs -setfattr -x user.myAttr /file
(27)setrep。
說明:改變文件和目錄的復(fù)制因子。成功返回0,錯誤返回-1。
格式:hdfs dfs -setrep [-R][-w]<numReplicas> <path>
示例:
hdfs dfs -setrep -w 3 /user/hadoop/dir1
(28)stat。
說明:返回指定路徑的統(tǒng)計信息。成功返回0,錯誤返回-1。
格式:hdfs dfs -stat URI [URI ...]
示例:
hdfs dfs -stat path
(29)tail。
說明:將文件尾部1KB的內(nèi)容輸出到stdout。成功返回0,錯誤返回-1。
格式:hdfs dfs -tail [-f]URI
示例:
hdfs dfs -tail pathname
(30)test
說明:檢查文件。選項“-e”檢查文件是否存在,如果存在則返回0;選項“-z”檢查文件是否為0字節(jié),如果是則返回0;選項“-d”檢查路徑是否為目錄,如果是則返回1,否則返回0。
格式:hdfs dfs -test -[ezd]URI
示例:
hdfs dfs -test -e filename
(31)text。
說明:將源文件輸出為文本格式。允許的格式是zip和TextRecordInputStream。
格式:hdfs dfs -text <src>
(32)touchz。
說明:創(chuàng)建一個空文件。成功返回0,錯誤返回-1。
格式:hdfs dfs -touchz URI [URI ...]
示例:
hdfs dfs -touchz pathname
小提示
“hadoop dfs”與“hdfs dfs”都是操作HDFS文件系統(tǒng)的命令,“hadoop dfs”屬于早期版本的格式,已經(jīng)過時,一般使用“hdfs dfs”。
“hadoop fs”也是文件系統(tǒng)操作命令,但使用范圍更廣,能夠操作其他格式文件系統(tǒng),如local、HDFS等,可以在本地與Hadoop分布式文件系統(tǒng)的交互操作中使用。
2. hdfs fsck [GENERIC_OPTIONS]<path> [-list-corruptfileblocks | [-move | -delete| -openforwrite][-files [-blocks [-locations | -racks]]]][-includeSnapshots]
fsck是一個文件系統(tǒng)健康狀況檢查工具,用來檢查各類問題,比如,文件塊丟失等(如圖3-2所示)。但是,注意它不會主動恢復(fù)備份缺失的block,這個是由NameNode單獨的線程異步處理的。

圖3-2 fsck命令的運行結(jié)果
fsck命令的參數(shù)說明見表3-1。
表3-1 fsck參數(shù)的說明

3. hdfs datanode [-regular | -rollback | -rollingupgrade rollback]
運行一個HDFS集群的數(shù)據(jù)節(jié)點。參數(shù)說明見表3-2。
表3-2 hdfs datanode命令參數(shù)的說明

4. hdfs namenode [GENERIC_OPTIONS]
“hdfs namenode”是運行NameNode的命令,是一個比較核心的工具。該命令的主要參數(shù)說明見表3-3。
表3-3 hdfs namenode命令參數(shù)的說明

5. hdfs dfsadmin [GENERIC_OPTIONS]
dfsadmin是一個多任務(wù)的工具,我們可以使用它來獲取HDFS的狀態(tài)信息,以及在HDFS上執(zhí)行的一系列管理操作。該命令的主要參數(shù)說明見表3-4。
表3-4 hdfs dfsadmin命令參數(shù)的說明

續(xù)表

dfsadmin命令的使用示例如圖3-3所示。

圖3-3 dfsadmin命令的使用示例
6. hdfs cacheadmin
管理員和用戶通過“hdfs cacheadmin”命令管理緩存資源。
緩存指令由一個唯一的無重復(fù)的64位整數(shù)ID來標識。即使緩存指令后來被刪除了,ID也不會重復(fù)使用。緩存池由一個唯一的字符串名稱來標識。
(1)增加緩存:addDirective。
用法:hdfs cacheadmin -addDirective -path <path> -pool <pool-name> [-force][-replication<replication>][-ttl <time-to-live>]
參數(shù)說明見表3-5。
表3-5 addDirective的參數(shù)說明

(2)刪除一個緩存:removeDirective。
用法:hdfs cacheadmin -removeDirective <id>
參數(shù)id指定要刪除的緩存指令的ID。刪除時,必須對該指令的緩存池擁有寫權(quán)限。
(3)刪除指定路徑下的每一個緩存:removeDirectives。
用法:hdfs cacheadmin -removeDirectives <path>
參數(shù)path中設(shè)置要刪除的緩存指令的路徑。刪除時必須對該指令的緩存池擁有寫權(quán)限。
(4)緩存列表:listDirectives。
用法:hdfs cacheadmin -listDirectives [-stats][-path <path>][-pool <pool>]
參數(shù)說明見表3-6。
表3-6 listDirectives的參數(shù)說明

(5)新增緩存池:addPool。
用法:hdfs cacheadmin -addPool <name> [-owner <owner>][-group <group>][-mode<mode>][-limit <limit>][-maxTtl <maxTtl>
參數(shù)說明見表3-7。
表3-7 addPool的參數(shù)說明

(6)修改緩存池:modifyPool。
用法:hdfs cacheadmin -modifyPool <name> [-owner <owner>][-group <group>][-mode<mode>][-limit <limit>][-maxTtl <maxTtl>]
參數(shù)說明見表3-8。
表3-8 modifyPool的參數(shù)說明

(7)刪除緩存池:removePool。
用法:hdfs cacheadmin -removePool <name>
參數(shù)name指定要刪除的緩存池的名稱。
(8)緩存池列表:listPools。
用法:hdfs cacheadmin -listPools [-stats][<name>]
參數(shù)說明見表3-9。
表3-9 addPool的參數(shù)說明

7. hdfs balancer [-threshold <threshold>][-policy <policy>]
HDFS集群非常容易出現(xiàn)機器與機器之間磁盤利用率不平衡的情況,尤其是增加新的數(shù)據(jù)節(jié)點時。保證HDFS中的數(shù)據(jù)平衡非常重要。HDFS出現(xiàn)不平衡的狀況將引發(fā)很多問題,比如MapReduce程序無法很好地利用本地計算的優(yōu)勢、機器之間無法達到更好的網(wǎng)絡(luò)帶寬使用率等。
在Hadoop中,包含一個Balancer程序,可以調(diào)節(jié)HDFS集群平衡的狀態(tài)。啟動Balancer服務(wù)時,界面如圖3-4所示。

圖3-4 啟動Balancer服務(wù)
服務(wù)啟動后,集群管理人員可用balancer命令進行分析和再平衡數(shù)據(jù),如圖3-5所示。

圖3-5 可用balancer命令進行分析和再平衡數(shù)據(jù)
參數(shù)threshold是判斷集群是否平衡的目標參數(shù),表示HDFS達到平衡狀態(tài)的磁盤使用率偏差值。默認設(shè)置為10,參數(shù)取值范圍是0~100。如果機器之間磁盤使用率偏差小于10%,我們就認為HDFS集群已經(jīng)達到了平衡的狀態(tài)。
8. hdfs version
hdfs version命令用于查看當(dāng)前系統(tǒng)的版本,運行示例如圖3-6所示。

圖3-6 使用hdfs version命令查看當(dāng)前系統(tǒng)的版本
3.1.2 Java接口操作
由于Hadoop本身就是使用Java語言編寫的,理論上,通過Java API能夠調(diào)用所有的Hadoop文件系統(tǒng)的操作接口。
Hadoop有一個抽象的文件系統(tǒng)概念,在Java抽象類org.apache.hadoop.fs中定義了接口。只要某個文件系統(tǒng)實現(xiàn)了這個接口,那么,它就可以作為Hadoop支持的文件系統(tǒng)。目前Hadoop能夠支持的文件系統(tǒng)如表3-10所示。
表3-10 Hadoop文件類的實現(xiàn)

在Hadoop中,主要是定義了一組分布式文件系統(tǒng)和通用的I/O組件和接口,Hadoop的文件系統(tǒng)準確地應(yīng)該稱作Hadoop I/O。而HDFS是實現(xiàn)該文件接口的Hadoop自帶的分布式文件項目,是對Hadoop I/O接口的實現(xiàn)。在處理大數(shù)據(jù)集時,為實現(xiàn)最優(yōu)性能,通常使用HDFS存儲。
org.apache.hadoop.fs包由接口類(FsConstants、Syncable等)、Java類(AbstractFileSystem、BlockLocation、FileSystem、FileUtil、FSDataInputStream等)、枚舉類型(如CreateFlag)、異常類(ChecksumException、InvalidPathException等)和錯誤類(如FSError)組成。每個子對象中都定義了相應(yīng)的方法,通過對org.apache.hadoop.fs包的封裝與調(diào)用,可以拓展HDFS應(yīng)用,更好地幫助用戶使用集群海量存儲。
在介紹Java接口操作之前,先介紹幾個常用的Java類。
(1)FileSystem。
org.apache.hadoop.fs.FileSystem,通用文件系統(tǒng)基類,用于與HDFS文件系統(tǒng)交互,編寫的HDFS程序都需要重寫FileSystem類。通過FileSystem,可以非常方便地像操作本地文件系統(tǒng)一樣操作HDFS集群文件。
FileSystem提供了get方法,一個是通過配置文件獲取與HDFS的連接;一個是通過URL指定配置文件,獲取與HDFS的連接,URL的格式為hdfs://namenode/xxx.xml。
方法的原型如下:
public static FileSystem get(Configuration conf) throws IOException; public static FileSystem get(URI uri, Configuration conf) throws IOException; public static FileSystem get(final URI uri, final Configuration conf, final String user) throws IOException, InterruptedException;
其中,Configuration(org.apache.hadoop.conf.Configuration)類對象封裝了客戶端或服務(wù)器的配置;URI是指文件在HDFS里存放的路徑。
(2)FSDataInputStream。
org.apache.hadoop.fs.FSDataInputStream,文件輸入流,用于讀取HDFS文件,它是Java中DataInputStream的派生類,支持從任意位置讀取流式數(shù)據(jù)。
常用的讀取方法是從指定的位置,讀取指定大小的數(shù)據(jù)至緩存區(qū)。方法如下所示:
int read(long position, byte[]buffer, int offset, int length)
還有用于隨時定位的方法,可以定位到指定的讀取點,如下所示:
void seek(long desired)
通過long getPos()方法,還可以獲取當(dāng)前的讀取點。
(3)FSDataOutputStream。
org.apache.hadoop.fs.FSDataOutputStream,文件輸出流,是DataOutputStream的派生類,通過這個類,能夠向HDFS順序?qū)懭霐?shù)據(jù)流。
通常的寫入方法為write,如下所示:
public void write(int b)
獲取當(dāng)前寫入點的函數(shù)為long getPos()。
(4)Path。
org.apache.hadoop.fs.Path,文件與目錄定位類,用于定義HDFS集群中指定的目錄與文件絕對或相對路徑。
可以通過多種方式構(gòu)造Path,如通過URL的模式,通常編寫方式為:
hdfs://ip:port/directory/filename
Path可以與FileSystem的open函數(shù)相關(guān)聯(lián),通過Path構(gòu)造訪問路徑,用FileSystem進行訪問。
(5)FileStatus。
org.apache.hadoop.fs.FileStatus,文件狀態(tài)顯示類,可以獲取文件與目錄的元數(shù)據(jù)、長度、塊大小、所屬用戶、編輯時間等信息,同時,可以設(shè)置文件用戶、權(quán)限等內(nèi)容。
FileStatus有很多get與set方法,如獲取文件長度的long getLen()方法、設(shè)置文件權(quán)限的setPermission(FsPermission permission)方法等。
下面,我們開始Hadoop的Java操作之旅。
1.創(chuàng)建文件
FileSystem類里提供了很多API,用來創(chuàng)建文件,其中,最簡單的一個是:
public FSDataOutputStream create(Path f) throws IOException;
它創(chuàng)建一個Path類代表的文件,并返回一個輸出流。這個方法有多個重載方法,可以用來設(shè)置是否覆蓋已有文件、該文件復(fù)制的份數(shù)、寫入時的緩沖區(qū)大小、文件塊大小(block)、權(quán)限等。默認情況下,如果Path中文件的父目錄(或者更上一級目錄)不存在,這些目錄會被自動創(chuàng)建。
2.讀取數(shù)據(jù)
通過調(diào)用FileSystem實例的open方法打開文件,得到一個輸入流。下面是使用FileSystem類讀取HDFS中文件內(nèi)容的完整程序:
import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; public class FileSystemCat { public static void main(String[]args) throws Exception { String uri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); FSDataInputStream in = null; try { in = fs.open(new Path(uri)); IOUtils.copyBytes(in, System.out, 4096, false); } finally { IOUtils.closeStream(in); } } }
此外,F(xiàn)SDataInputStream類同時也實現(xiàn)了PositionedReadable(org.apache.hadoop.fs. PositionedReadable)接口,接口中定義的三個方法允許在任意位置讀取文件的內(nèi)容:
public int read(long position, byte[]buffer, int offset, int length) throws IOException; public void readFully(long position, byte[]buffer, int offset, int length) throws IOException; public void readFully(long position, byte[]buffer) throws IOException;
結(jié)合第2章內(nèi)容,下面我們結(jié)合程序?qū)崿F(xiàn)深入剖析HDFS讀文件時的數(shù)據(jù)流向過程。
(1)客戶端通過調(diào)用FileSystem.open()方法打開一個文件,對于HDFS來講,其實是調(diào)用DistributedFileSystem實例的open方法。
(2)DistributedFileSystem通過遠程方法調(diào)用訪問NameNode,獲取該文件的前幾個blocks所在的位置信息;針對每個block, NameNode都會返回有該block數(shù)據(jù)信息的所有DataNodes節(jié)點,比如配置的dfs.replication為3,就會每個block返回3個DataNodes節(jié)點信息,這些節(jié)點是按距離客戶端的遠近排序的,如果發(fā)起讀文件的客戶端就在包含該block的DataNode上,那么這個DataNode就排第一位(這種情況在做Map任務(wù)時常見),客戶端就會從本機讀取數(shù)據(jù)。
DistributedFileSystem的open方法返回一個FSDataInputStream, FSDataInputStream里包裝著一個DFSInputStream, DFSInputStream真正管理DataNodes和NameNode的I/O。
(3)客戶端調(diào)用FSDataInputStream.read()方法,F(xiàn)SDataInputStream里已經(jīng)緩存了該文件前幾個block所在的DataNode的地址,于是,從第一個block的第一個地址(也就是最近的DataNode)開始連接讀取。
(4)反復(fù)調(diào)用read()方法,數(shù)據(jù)不斷地從DataNode流向客戶端。
(5)當(dāng)一個block的數(shù)據(jù)讀完時,DFSInputStream會關(guān)閉當(dāng)前DataNode的連接,打開下一個block所在的最優(yōu)DataNode的連接繼續(xù)讀取;這些對客戶端是透明的,在客戶端看來,就是在讀一個連續(xù)的流。
(6)這樣,一個block一個block地讀下去,當(dāng)需要使用更多block的存儲信息時,DFSInputStream會再次調(diào)用NameNode,獲取下一批block的存儲位置信息,直到客戶端停止讀取,調(diào)用FSDataInputStream.close()方法,整個讀取過程結(jié)束。
小提示
文件操作還可以使用Hadoop URL的方式,示例代碼如下:
import java.io.InputStream; import java.net.URL; import org.apache.hadoop.fs.FsUrlStreamHandlerFactory; import org.apache.hadoop.io.IOUtils; public class URLCat { static { URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()); } public static void main(String[]args) throws Exception { InputStream in = null; try { in = new URL(args[0]).openStream(); IOUtils.copyBytes(in, System.out, 4096, false); } finally { IOUtils.closeStream(in); } } }
在上面的程序中,先設(shè)置URLStreamHandlerFactory,然后通過URL打開一個流,讀取流,就得到了文件的內(nèi)容,通過IOUtils.copyBytes()把讀到的內(nèi)容寫出到標準輸出流里,也就是控制臺上,從而實現(xiàn)了類似于Linux里的cat命令的功能。最后關(guān)閉輸入流。
3.寫入數(shù)據(jù)
與讀操作類似,Hadoop對于寫操作也提供了一個類:FSDataOutputStream,這個類重載了很多java.io.DataOutputStream的write方法,用于寫入很多類型的數(shù)據(jù),比如int、char、字節(jié)數(shù)組等。
HDFS寫文件的示例代碼如下:
FileSystem hdfs = FileSystem.get(new Configuration()); Path path = new Path("/testfile"); FSDataOutputStream dos = hdfs.create(path); byte[]readBuf = "Hello World".getBytes("UTF-8"); dos.write(readBuf, 0, readBuf.length); dos.close(); hdfs.close();
如果希望向已有文件追加內(nèi)容,可以調(diào)用:
public FSDataOutputStream append(Path f) throws IOException;
如果文件不存在時,append方法也可以用來新建一個文件。
下面,我們結(jié)合以上的程序,深入剖析HDFS寫文件時的數(shù)據(jù)流向過程。
(1)客戶端調(diào)用DistributedFileSystem.create()方法創(chuàng)建一個文件。
(2)DistributedFileSystem向NameNode發(fā)起遠程方法調(diào)用,創(chuàng)建一個文件,但是,NameNode沒有把它關(guān)聯(lián)到任何block上去;NameNode在這一步做了很多檢查工作,保證該文件當(dāng)前不存在,客戶端有創(chuàng)建該文件的權(quán)限等。如果這些檢查都通過了,NameNode創(chuàng)建一條新文件記錄;否則,創(chuàng)建失敗,客戶端返回IOException。DistributedFileSystem返回一個FSDataOutputStream,像讀文件時一樣,這個FSDataOutputStream里包裝著一個DFSOutputStream,由它來實際處理與DataNodes和NameNode的通信。
(3)客戶端向DFSOutputStream里寫數(shù)據(jù),DFSOutputStream把數(shù)據(jù)分成包,丟進一個稱為data queue的隊列中。DataStreamer負責(zé)向NameNode申請新的block,新的block被分配在了一個或多個(默認為3個)節(jié)點上,這些節(jié)點就形成一個管道。
(4)DataStreamer把data queue里的包拿出來,通過管道輸送給第1個節(jié)點,第1個節(jié)點再通過管道輸送給第2個節(jié)點,第2個再輸送給第3個。以此類推。
(5)DFSOutputStream同時還在內(nèi)部維護一個通知隊列,名叫ack queue,里面保存發(fā)過的數(shù)據(jù)包。一個包只有被所有管道上的DataNodes通知收到了,才會被移除。如果任意一個DataNode接收失敗了,首先,管道關(guān)閉,然后把ack queue里的包都放回到data queue的頭部,以便使失敗節(jié)點的下游節(jié)點不會丟失這些數(shù)據(jù)。打開管道,把壞節(jié)點移除,數(shù)據(jù)會繼續(xù)向其他好節(jié)點輸送,直到管道上的節(jié)點都完成了。如果少復(fù)制了一個節(jié)點,向NameNode報告一下,說現(xiàn)在這個block沒有達到設(shè)定的副本數(shù),然后就返回成功了,后期,NameNode會組織一個異步的任務(wù),把副本數(shù)恢復(fù)到設(shè)定值。然后,接下來的數(shù)據(jù)包和數(shù)據(jù)塊正常寫入。
如果多個DataNodes都失敗了,會檢測hdfs-site.xml里的dfs.replication.min參數(shù),默認值是1,意思是只要有1個DataNode接收成功,就認為數(shù)據(jù)寫入成功了。客戶端就會收到寫入成功的返回。后期,Hadoop會發(fā)起異步任務(wù)把副本數(shù)恢復(fù)到dfs.replication設(shè)置的值。
以上操作對客戶端都是透明的,客戶端不知道發(fā)生了這些事情,只知道寫文件成功了。
(6)當(dāng)客戶端完成數(shù)據(jù)寫入后,調(diào)用流的close()方法,這個操作把data queue里的所有剩余的包都發(fā)給管道。
(7)等所有包都收到了寫成功的反饋后,客戶端通知NameNode寫文件完成了。因為DataStream寫文件前就先向NameNode申請block的位置信息了,所以寫文件完成時,NameNode已知道每個block的位置信息,它只需等最小的副本數(shù)寫成功,就可以返回成功。
4.文件讀寫位置
讀取文件時(FSDataInputStream),允許使用seek()方法在文件中定位。支持隨機訪問,理論上,可以從流的任何位置讀取數(shù)據(jù),但調(diào)用seek()方法的開銷是相當(dāng)巨大的,應(yīng)該盡量少調(diào)用,盡可能地使程序做到順序讀。
由于HDFS只允許對一個打開的文件順序?qū)懭耄蛳蛞粋€已有文件的尾部追加,不允許在任意位置寫,F(xiàn)SDataOutputStream沒有seek方法。但FSDataOutputStream類提供了一個getPos()方法,可以查詢當(dāng)前在往文件的哪個位置寫的寫入偏移量:
public long getPos() throws IOException;
5.重命名
通過FileSystem.rename()方法,可為指定的HDFS文件重命名:
protected void rename(Path src, Path dst, Options) throws IOException;
示例代碼實現(xiàn)如下:
Configuration conf = new Configuration(); FileSystem hdfs = FileSystem.get(conf); Path frpath = new Path("/test"); //舊的文件名 Path topath = new Path("/testNew"); //新的文件名 boolean isRename = hdfs.rename(frpath, topath); String result = isRename? "成功" : "失敗";
6.刪除操作
通過FileSystem.delete()方法刪除指定的HDFS文件或目錄(永久刪除):
public boolean delete(Path f, boolean recursive) throws IOException;
其中,f為需要刪除文件的完整路徑,recursive用來確定是否進行遞歸刪除。如果f是一個文件或空目錄,則不論recursive是何值,都刪除。如果f是一個非空目錄,則recursive為true時,目錄下內(nèi)容全部刪除;如果recursive為false,不刪除,并拋出IOException。
示例代碼實現(xiàn)如下:
Path f = new Path(fileName); boolean isExists = hdfs.exists(f); if (isExists) { //if exists, delete boolean isDel = hdfs.delete(f, true); System.out.println(fileName + " delete? \t" + isDel); } else { System.out.println(fileName + " exist? \t" + isExists); }
7.文件夾操作
FileSystem中創(chuàng)建文件夾的方法如下:
public boolean mkdirs(Path f) throws IOException;
與java.io.File.mkdirs方法一樣,創(chuàng)建目錄的同時,默認地創(chuàng)建缺失的父目錄。我們一般不需要創(chuàng)建目錄,一般在創(chuàng)建文件時,默認地就把所需的目錄都創(chuàng)建好了。
目錄創(chuàng)建的示例代碼實現(xiàn)如下:
Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); Path srcPath = new Path(path); boolean isok = fs.mkdirs(srcPath); if(isok) { System.out.println("create dir ok! "); } else { System.out.println("create dir failure"); } fs.close();
使用FileSystem的listStatus()方法能夠列出某個目錄中的所有文件:
public FileStatus[]listStatus(Path f) throws IOException public FileStatus[]listStatus(Path f, PathFilter filter) throws IOException public FileStatus[]listStatus(Path[]files) throws IOException public FileStatus[]listStatus(Path[]files, PathFilter filter) throws IOException
這一組方法都接收Path參數(shù),如果Path是一個文件,返回值是一個數(shù)組,數(shù)組里只有一個元素,是這個Path代表的文件的FileStatus對象;如果Path是一個目錄,返回值數(shù)組是該目錄下的所有文件和目錄的FileStatus組成的數(shù)組,有可能是一個0長數(shù)組;如果參數(shù)是Path[],則返回值相當(dāng)于多次調(diào)用單Path,然后把返回值整合到一個數(shù)組里;如果參數(shù)中包含PathFilter,則PathFilter會對返回的文件或目錄進行過濾,返回滿足條件的文件或目錄,條件由開發(fā)者自行定義。
FileSystem的globStatus方法利用通配符來列出文件和目錄:
public FileStatus[]globStatus(Path pathPattern) throws IOException; public FileStatus[]globStatus(Path pathPattern, PathFilter filter) throws IOException;
文件夾刪除操作與文件刪除類似。
其他關(guān)于文件夾的操作方法還有FileSystem.getWorkingDirectory(返回當(dāng)前工作目錄)、FileSystem.setWorkingDirectory(更改當(dāng)前工作目錄)等。
8.屬性操作
FileSystem類中的getFileStatus()方法返回一個FileStatus實例,該FileStatus實例中,包含了該Path(文件或目錄)的元數(shù)據(jù)信息:文件大小、block大小、復(fù)制的份數(shù)、最后修改時間、所有者、權(quán)限等。示例代碼實現(xiàn)如下:
FileStatus status = fs.getFileStatus(path); System.out.println("path = " + status.getPath()); System.out.println("owner = " + status.getOwner()); System.out.println("block size = " + status.getBlockSize()); System.out.println("permission = " + status.getPermission()); System.out.println("replication = " + status.getReplication());
3.1.3 WebHDFS
Hadoop提供的Java Native API支持對文件或目錄的操作,為開發(fā)者提供了極大的便利。為滿足許多外部應(yīng)用程序操作HDFS文件系統(tǒng)的需求,Hadoop提供了兩種基于HTTP方式的接口:一是用于瀏覽文件系統(tǒng)的Web界面;另一個是WebHDFS REST API接口。
啟動HDFS時,NameNode和DataNode各自啟動了一個內(nèi)置的Web服務(wù)器,顯示了集群當(dāng)前的基本狀態(tài)和信息。默認配置下NameNode的首頁地址是http://namenode-name:50070/。這個頁面列出了集群里的所有DataNode和集群的基本狀態(tài)。
這個Web界面也可以用來瀏覽整個文件系統(tǒng)。使用NameNode首頁上的Browse the file system鏈接,輸入需要查看的目錄地址,即可看到,如圖3-7所示。

圖3-7 Web界面
WebHDFS基于HTTP,通過GET、PUT、POST和DELETE等操作,支持FileSystem/FileContext的全部API。具體操作類型見表3-11。
表3-11 WebHDFS的操作

在使用WebHDFS REST API接口前,要先對Hadoop進行配置和授權(quán)認證。編輯hdfs-site.xml文件,添加啟用WebHDFS(dfs.webhdfs.enabled)、kerberos驗證(dfs.web. authentication.kerberos.principal、dfs.web.authentication.kerberos.keytab)等屬性配置。配置完成后,啟動WebHDFS服務(wù)即可,如圖3-8所示。

圖3-8 啟動WebHDFS服務(wù)
WebHDFS默認的HTTP服務(wù)端口是14000。需要說明的是,WebHDFS的FileSystem模式是“webhdfs://”, URI的格式如下:
webhdfs://<HOST>:<HTTP_PORT>/<PATH>
與之對應(yīng)的HDFS URI格式如下:
hdfs://<HOST>:<RPC_PORT>/<PATH>
在REST API接口中,在path之前插入前綴“/webhdfs/v1”,操作語句被追加到最后,相應(yīng)的HTTP URL格式如下:
http://<HOST>:<HTTP_PORT>/webhdfs/v1/<PATH>? op=...
下面我們以具體實例,來測試一下WebHDFS的功能。使用curl命令工具在HDFS根目錄下創(chuàng)建一個名為“webdir”的目錄,如圖3-9所示。

圖3-9 WebHDFS創(chuàng)建目錄的運行結(jié)果
3.1.4 其他接口
HDFS支持的使用接口除了前面介紹過的Java等以外,還有C、Thrift、HttpFS、HFTP、NFS等。下面簡單介紹幾種。
1. C接口
HDFS基于Java編寫,并沒有提供原生的C語言訪問接口,但HDFS提供了基于JNI(Java Native Interface)的C調(diào)用接口libhdfs,使C語言訪問HDFS成為可能。
libhdfs接口的頭文件和庫文件已包含在Hadoop發(fā)行版本中,可以直接使用。它的頭文件hdfs.h一般位于{HADOOP_HOME}/include目錄中,而其庫文件libhdfs.so通常則位于{HADOOP_HOME}/lib/native目錄中。不同的版本,庫文件所在位置稍有不同。
通過libhdfs訪問HDFS文件系統(tǒng)與使用C語言API訪問普通操作系統(tǒng)的文件系統(tǒng)類似。C++訪問HDFS的方式也與C語言類似。接口主要如下。
(1)建立、關(guān)閉與HDFS連接:hdfsConnect()、hdfsConnectAsUser()、hdfsDisconnect()。
(2)打開、關(guān)閉HDFS文件:hdfsOpenFile()、hdfsCloseFile()。當(dāng)用hdfsOpenFile()創(chuàng)建文件時,可以指定replication和blocksize參數(shù)。
(3)讀HDFS文件:hdfsRead()、hdfsPread()。
(4)寫HDFS文件:hdfsWrite()。HDFS不支持隨機寫,只能是從文件頭順序?qū)懭搿?/p>
(5)查詢HDFS文件信息:hdfsGetPathInfo()。
(6)查詢數(shù)據(jù)塊所在節(jié)點信息:hdfsGetHosts()。返回一個或多個數(shù)據(jù)塊所在數(shù)據(jù)節(jié)點的信息,一個數(shù)據(jù)塊可能存在于多個數(shù)據(jù)節(jié)點上。
libhdfs中的函數(shù)是通過JNI調(diào)用Java虛擬機的,在虛擬機中構(gòu)造對應(yīng)的HDFS的Java類,然后反射調(diào)用該類的功能函數(shù),占用內(nèi)存較多,不適合對虛擬要求較高的場景。
下面是一個簡單的例子:
#include "hdfs.h" int main(int argc, char **argv) { hdfsFS fs = hdfsConnect("default", 0); const char *writePath = "/tmp/testfile.txt"; hdfsFile writeFile = hdfsOpenFile(fs, writePath, O_WRONLY|O_CREAT, 0, 0, 0); if(! writeFile) { fprintf(stderr, "Failed to open %s for writing! \n", writePath); exit(-1); } char *buffer = "Hello, World! "; tSize num_written_bytes = hdfsWrite(fs, writeFile, (void*)buffer, strlen(buffer)+1); if (hdfsFlush(fs, writeFile)) { fprintf(stderr, "Failed to 'flush' %s\n", writePath); exit(-1); } hdfsCloseFile(fs, writeFile); }
2. HFTP
HFTP是一個可以實現(xiàn)從遠程HDFS集群讀取Hadoop文件系統(tǒng)數(shù)據(jù)的接口。HFTP默認是打開的,數(shù)據(jù)讀取通過HTTP協(xié)議,允許以瀏覽器的方式訪問和下載所有文件。這種方式帶來便利的同時,也存在一定的安全隱患。
HFTP是一個只讀的文件系統(tǒng),如果試圖用它寫或者修改文件系統(tǒng)的狀態(tài),將會拋出一個錯誤。如果使用多個不同版本的HDFS集群時,需要在集群之間移動數(shù)據(jù),HFTP是非常有用的。HFTP在不同HDFS版本之間都是兼容的,通常與distcp結(jié)合使用實現(xiàn)并行復(fù)制。
HSFTP是HFTP的一個擴展,默認使用HTTPS在傳輸時加密數(shù)據(jù)。
3. HttpFS
HttpFS是Cloudera公司提供的一個Web應(yīng)用,一般部署在內(nèi)嵌的Web服務(wù)器中,但獨立于Hadoop的NameNode。
HttpFS是提供REST HTTP接口的服務(wù)器,可以支持全部HDFS文件系統(tǒng)操作。通過WebHDFS REST API,可以對HDFS進行讀寫等訪問操作。與WebHDFS的區(qū)別是,不需要客戶端,就可以訪問Hadoop集群的每一個節(jié)點。
通過HttpFS,可以訪問放置在防火墻后面的Hadoop集群數(shù)據(jù)。HttpFS可以作為一個網(wǎng)關(guān)角色,是唯一可以穿過防火墻訪問內(nèi)部集群數(shù)據(jù)的系統(tǒng)。
HttpFS的內(nèi)置安全特性支持Hadoop偽身份驗證和HTTP SPNEGO Kerberos及其他插件式(pluggable)驗證機制。它還提供了對Hadoop代理用戶的支持。
- 我的J2EE成功之路
- 人工智能超越人類
- Python Artificial Intelligence Projects for Beginners
- ROS機器人編程與SLAM算法解析指南
- Apache Spark Deep Learning Cookbook
- 網(wǎng)絡(luò)安全管理實踐
- 突破,Objective-C開發(fā)速學(xué)手冊
- TensorFlow Reinforcement Learning Quick Start Guide
- 邊緣智能:關(guān)鍵技術(shù)與落地實踐
- 激光選區(qū)熔化3D打印技術(shù)
- 人工智能技術(shù)入門
- 學(xué)練一本通:51單片機應(yīng)用技術(shù)
- Linux系統(tǒng)管理員工具集
- Learning Cassandra for Administrators
- 機器學(xué)習(xí)案例分析(基于Python語言)