- Hadoop 2.X HDFS源碼剖析
- 徐鵬
- 9874字
- 2024-01-05 17:29:20
1.2.1 Hadoop RPC接口
Hadoop RPC調用使得HDFS進程能夠像本地調用一樣調用另一個進程中的方法,并且可以傳遞Java基本類型或者自定義類作為參數,同時接收返回值。如果遠程進程在調用過程中出現異常,本地進程也會收到對應的異常。目前Hadoop RPC調用是基于Protobuf實現的,我們會在第2章中介紹底層的具體實現,本節主要介紹Hadoop RPC接口的定義。Hadoop RPC接口主要定義在org.apache.hadoop.hdfs.protocol包和org.apache.hadoop.hdfs.server.protocol包中,包括以下幾個接口。
■ ClientProtocol:ClientProtocol定義了客戶端與名字節點間的接口,這個接口定義的方法非常多,客戶端對文件系統的所有操作都需要通過這個接口,同時客戶端讀、寫文件等操作也需要先通過這個接口與Namenode協商之后,再進行數據塊的讀出和寫入操作。
■ ClientDatanodeProtocol:客戶端與數據節點間的接口。ClientDatanodeProtocol中定義的方法主要是用于客戶端獲取數據節點信息時調用,而真正的數據讀寫交互則是通過流式接口進行的。
■ DatanodeProtocol:數據節點通過這個接口與名字節點通信,同時名字節點會通過這個接口中方法的返回值向數據節點下發指令。注意,這是名字節點與數據節點通信的唯一方式。這個接口非常重要,數據節點會通過這個接口向名字節點注冊、匯報數據塊的全量以及增量的存儲情況。同時,名字節點也會通過這個接口中方法的返回值,將名字節點指令帶回該數據塊,根據這些指令,數據節點會執行數據塊的復制、刪除以及恢復操作。
■ InterDatanodeProtocol:數據節點與數據節點間的接口,數據節點會通過這個接口和其他數據節點通信。這個接口主要用于數據塊的恢復操作,以及同步數據節點上存儲的數據塊副本的信息。
■ NamenodeProtocol:第二名字節點與名字節點間的接口。由于Hadoop2.X中引入了HA機制,檢查點操作也不再由第二名字節點執行了,所以NamenodeProtocol我們就不詳細介紹了。
■ 其他接口:主要包括安全相關接口(RefreshAuthorizationPolicyProtocol、RefreshUser MappingsProtocol)、HA相關接口(HAServiceProtocol)等。HA相關接口的實現和定義我們將在第3章的HA小節中介紹。
下面我們重點介紹ClientProtocol、ClientDatanodeProtocol、DatanodeProtocol、InterDatanode Protocol和NamenodeProtocol等接口的定義。
1.ClientProtocol
ClientProtocol定義了所有由客戶端發起的、由Namenode響應的操作。這個接口非常大,有80多個方法,我們把這個接口中的方法分為如下幾類。
■ HDFS文件讀相關的操作。
■ HDFS文件寫以及追加寫的相關操作。
■ 管理HDFS命名空間(namespace)的相關操作。
■ 系統問題與管理相關的操作。
■ 快照相關的操作。
■ 緩存相關的操作。
■ 其他操作。
HDFS文件讀操作、HDFS文件寫與追加寫操作,以及命名空間的管理操作,這三個部分都可以在FileSystem類中找到對應的方法,這些方法都是用來支持Hadoop文件系統實現的。對于系統問題與管理相關的操作,則是由DFSAdmin這個工具類發起的,其中的方法是用于支持管理員配置和管理HDFS的。而快照和緩存則都是Hadoop2.X中引入的新特性,ClientProtocol中也有對應的方法用于支持這兩個新特性。當然,ClientProtocol中還包括安全、XAttr等方法,這部分不是重點,我們就不再詳細介紹了。
(1)讀數據相關方法
ClientProtocol中與客戶端讀取文件相關的方法主要有兩個:getBlockLocations()和reportBadBlocks()。
客戶端會調用ClientProtocol.getBlockLocations()方法獲取HDFS文件指定范圍內所有數據塊的位置信息。這個方法的參數是HDFS文件的文件名以及讀取范圍,返回值是文件指定范圍內所有數據塊的文件名以及它們的位置信息,使用LocatedBlocks對象封裝。每個數據塊的位置信息指的是存儲這個數據塊副本的所有Datanode的信息,這些Datanode會以與當前客戶端的距離遠近排序。客戶端讀取數據時,會首先調用getBlockLocations()方法獲取HDFS文件的所有數據塊的位置信息,然后客戶端會根據這些位置信息從數據節點讀取數據塊。ClientProtocol.getBlockLocations()方法的定義如下:


客戶端會調用ClientProtocol.reportBadBlocks()方法向Namenode匯報錯誤的數據塊。當客戶端從數據節點讀取數據塊且發現數據塊的校驗和并不正確時,就會調用這個方法向Namenode匯報這個錯誤的數據塊信息。ClientProtocol.reportBadBlocks()方法的定義如下:

(2)寫/追加寫數據相關方法
在HDFS客戶端操作中最重要的一部分就是寫入一個新的HDFS文件,或者打開一個已有的HDFS文件并執行追加寫操作。ClientProtocol中定義了8個方法支持HDFS文件的寫操作:create()、append()、addBlock()、complete()、abandonBlock()、getAddtionnalDatanodes()、updateBlockForPipeline()和updatePipeline()。
create()方法用于在HDFS的文件系統目錄樹中創建一個新的空文件,創建的路徑由src參數指定。這個空文件創建后對于其他的客戶端是“可讀”的,但是這些客戶端不能刪除、重命名或者移動這個文件,直到這個文件被關閉或者租約過期。客戶端寫一個新的文件時,會首先調用create()方法在文件系統目錄樹中創建一個空文件,然后調用addBlock()方法獲取存儲文件數據的數據塊的位置信息,最后客戶端就可以根據位置信息建立數據流管道,向數據節點寫入數據了。create()方法的定義如下:

append()方法用于打開一個已有的文件,如果這個文件的最后一個數據塊沒有寫滿,則返回這個數據塊的位置信息(使用LocatedBlock對象封裝);如果這個文件的最后一個數據塊正好寫滿,則創建一個新的數據塊并添加到這個文件中,然后返回這個新添加的數據塊的位置信息。客戶端追加寫一個已有文件時,會先調用append()方法獲取最后一個可寫數據塊的位置信息,然后建立數據流管道,并向數據節點寫入追加的數據。如果客戶端將這個數據塊寫滿,與create()方法一樣,客戶端會調用addBlock()方法獲取新的數據塊。

客戶端調用addBlock()方法向指定文件添加一個新的數據塊,并獲取存儲這個數據塊副本的所有數據節點的位置信息(使用LocatedBlock對象封裝)。要特別注意的是,調用addBlock()方法時還要傳入上一個數據塊的引用。Namenode在分配新的數據塊時,會順便提交上一個數據塊,這里previous參數就是上一個數據塊的引用。excludeNodes參數則是數據節點的黑名單,保存了客戶端無法連接的一些數據節點,建議Namenode在分配保存數據塊副本的數據節點時不要考慮這些節點。favoredNodes參數則是客戶端所希望的保存數據塊副本的數據節點的列表。客戶端調用addBlock()方法獲取新的數據塊的位置信息后,會建立到這些數據節點的數據流管道,并通過數據流管道將數據寫入數據節點。addBlock()方法的定義如下:

當客戶端完成了整個文件的寫入操作后,會調用complete()方法通知Namenode。這個操作會提交新寫入HDFS文件的所有數據塊,當這些數據塊的副本數量滿足系統配置的最小副本系數(默認值為1),也就是該文件的所有數據塊至少有一個有效副本時,complete()方法會返回true,這時Namenode中文件的狀態也會從構建中狀態轉換為正常狀態;否則,complete()會返回false,客戶端就需要重復調用complete()操作,直至該方法返回true。

上面描述的5個方法都是在正常流程時,客戶端寫文件必須調用的方法。但是對于一個分布式系統來說,寫流程中涉及的任何一個節點都有可能出現故障。出現故障的情況也是需要考慮在內的,所以ClientProtocol定義了abandonBlock()、getAdditionnalDatanode()、updateBlockForPipeline()以及updatePipeline()等方法,用于在異常情況下進行恢復操作。
客戶端調用abandonBlock()方法放棄一個新申請的數據塊。考慮下面這種情況:當客戶端獲取了一個新申請的數據塊,發現無法建立到存儲這個數據塊副本的某些數據節點的連接時,會調用abandonBlock()方法通知名字節點放棄這個數據塊,之后客戶端會再次調用addBlock()方法獲取新的數據塊,并在傳入參數時將無法連接的數據節點放入excludeNodes參數列表中,以避免Namenode將數據塊的副本分配到該節點上,造成客戶端再次無法連接這個節點的情況。

abandonBlock()方法用于處理客戶端建立數據流管道時數據節點出現故障的情況。那么,如果客戶端已經成功建立了數據流管道,在客戶端寫某個數據塊時,存儲這個數據塊副本的某個數據節點出現了錯誤該如何處理呢?這個操作就比較復雜了,客戶端首先會調用getAdditionalDatanode()方法向Namenode申請一個新的Datanode來替代出現故障的Datanode。然后客戶端會調用updateBlockForPipeline()方法向Namenode申請為這個數據塊分配新的時間戳,這樣故障節點上的沒能寫完整的數據塊的時間戳就會過期,在后續的塊匯報操作中會被刪除。最后客戶端就可以使用新的時間戳建立新的數據流管道,來執行對數據塊的寫操作了。數據流管道建立成功后,客戶端還需要調用updatePipeline()方法更新Namenode中當前數據塊的數據流管道信息。至此,一個完整的恢復操作結束。
上面我們描述的都是在寫數據操作時數據節點發生故障的情況,包括了數據流管道建立時以及建立后數據節點發生故障的情況。在寫數據的過程中,Client節點也有可能在任意時刻發生故障,為了預防這種情況,對于任意一個Client打開的文件都需要Client定期調用ClientProtocol.renewLease()方法更新租約(關于租約請參考第3章中租約相關小節)。如果Namenode長時間沒有收到Client的租約更新消息,就會認為Client發生故障,這時就會觸發一次租約恢復操作,關閉文件并且同步所有數據節點上這個文件數據塊的狀態,確保HDFS系統中這個文件是正確且一致保存的。
如果在寫操作時,名字節點發生故障該如何處理呢?這就要涉及HDFS的HA架構了,請讀者參考第3章的HA部分。
(3)命名空間管理的相關方法
ClientProtocol中有很重要的一部分操作是對Namenode命名空間的修改。我們知道FileSystem類也定義了對文件系統命名空間修改操作的API(FileSystem類抽象了一個文件系統對外提供的API接口),HDFS則滿足FileSystem類抽象的所有方法。表1-1總結了FileSystem API與ClientProtocol接口的對應關系。
表1-1 FileSystem API與ClientProtocol接口的對應關系

通過表1-1我們可以看出,ClientProtocol中涉及的命名空間管理的方法都有與之對應的HDFS文件系統API,且方法的名稱和參數很相近。在這里我們以setReplication()為例,setReplication()方法在ClientProtocol中的定義是:

在FileSystem接口中的定義是:

可以看到,setReplication()方法在FileSystem和ClientProtocol中定義的方法名及參數都很接近,大部分情況下ClientProtocol接口方法定義的參數更多,可以很好地支持FileSystemAPI定義的操作。
(4)系統問題與管理操作
ClientProtocol中另一個重要的部分就是支持DFSAdmin工具的接口方法,DFSAdmin是供HDFS管理員管理HDFS集群的命令行工具。一個典型的dfsadmin命令如下所示,管理員可以添加不同的參數以觸發HDFS進行相應的操作。

表1-2給出了ClientProtocol中定義的接口方法與dfsadmin命令參數之間的對應關系,我們會重點講解幾個比較重要的方法。
表1-2 ClientProtocol中定義的接口方法與dfsadmin命令參數之間的對應關系

續表

首先看一下setSafeMode()方法,這里涉及一個非常重要的概念——安全模式。安全模式是Namenode的一種狀態,處于安全模式中的Namenode不接受客戶端對命名空間的修改操作,整個命名空間都處于只讀狀態。同時,Namenode也不會向Datanode下發任何數據塊的復制、刪除指令。管理員可以通過dfsadmin setSafemode命令觸發Namenode進入或者退出安全模式,同時還可以使用這個命令查詢安全模式的狀態。需要注意的是,剛剛啟動的Namenode會直接自動進入安全模式,當Namenode中保存的滿足最小副本系數的數據塊達到一定的比例時,Namenode會自動退出安全模式。而對于用戶通過dfsAdmin方式觸發Namenode進入安全模式的情況,則只能由管理員手動關閉安全模式,Namenode不可以自動退出。dfsadmin setSafemode命令正是通過調用ClientProtocol.setSafeMode()方法實現的。setSafeMode()方法的定義如下:

了解了安全模式之后,我們來看看必須在安全模式中才能進行的兩個操作。`-saveNamespace`用于將整個命名空間保存到新的fsimage文件中,并且重置editlog文件;而`-rollEdits`則會觸發重置editlog文件的操作,關閉當前正在寫入的editlog文件,開啟一個新的editlog文件(fsiamge與editlog文件請參考第3章的fsimage小節)。
refreshNodes()方法會觸發Namenode刷新數據節點列表。管理員可以通過include文件指定可以連接到Namenode的數據節點列表,通過exclude文件指定不能連接到Namenode的數據節點列表。每當管理員修改了這兩個配置文件后,都需要通過`-refreshNodes`選項觸發Namenode刷新數據節點列表,這個操作會造成Namenode從文件系統中移除已有的數據節點,或者添加新的數據節點(請參考第3章的數據節點管理小節)。
finalizeUpgrade()和rollingUpgrade()操作都是與Namenode升級相關的,管理員可以通過`-rollingUpgrade`選項觸發Namenode進行升級操作。當Namenode成功地執行了升級操作后,管理員可以通過`-finalizeUpgrade`提交升級操作,提交升級操作會刪除升級操作創建的一些臨時目錄,提交升級操作之后就不可以再回滾了(請參考第4章的Storage小節)。
對于其他方法,請讀者參考表1-2中的說明,這里不再詳細介紹了。
(5)快照相關操作
Hadoop 2.X添加了新的快照特性,用戶可以為HDFS的任意路徑創建快照。快照保存了一個時間點上HDFS某個路徑中所有數據的拷貝,快照可以將失效的集群回滾到之前一個正常的時間點上。用戶可以通過`hdfs dfs`命令執行創建、刪除以及重命名快照等操作,ClientProtocol也定義了對應的方法來支持快照命令。
需要特別注意的是,在創建快照之前,必須先通過`hdfs dfsadmin -allowSnapshot`命令開啟目錄的快照功能,否則不可以在該目錄上創建快照。表1-3給出了快照操作與ClientProtocol中相關方法的對應關系,請讀者參考(快照相關內容我們會在第3章的快照小節中介紹)。
表1-3 快照操作與ClientProtocol中相關方法的對應關系

(6)緩存相關操作
HDFS 2.3版本添加了集中式緩存管理(HDFS Centralized Cache Management)功能。用戶可以指定一些經常被使用的數據或者高優先級任務對應的數據,讓它們常駐內存而不被淘汰到磁盤上,這對于提升Hadoop系統和上層應用的執行效率與實時性有很大的幫助。
這里涉及兩個概念。
■ cache directive:表示要被緩存到內存的文件或者目錄。
■ cache pool:用于管理一系列的cache directive,類似于命名空間。同時使用UNIX風格的文件讀、寫、執行權限管理機制。
表1-4總結了緩存相關命令與ClientProtocol方法之間的對應關系,請讀者參考(緩存相關內容我們會在第3章的緩存管理小節中介紹)。
表1-4 緩存相關命令與ClientProtocol方法之間的對應關系

續表

(7)其他操作
安全相關以及XAttr相關命令,主要都是增加、刪除以及List操作,這里就不再詳細介紹了
2.ClientDatanodeProtocol
ClientDatanodeProtocol定義了Client與Datanode之間的接口。相比ClientProtocol,ClientDatanodeProtocol的定義簡單很多,如圖1-2所示。

圖1-2 ClientDatanodeProtocol定義
ClientDatanodeProtocol中定義的接口可以分為兩部分:一部分是支持HDFS文件讀取操作的,例如getReplicaVisibleLength()以及getBlockLocalPathInfo();另一部分是支持DFSAdmin中與數據節點管理相關的命令。下面我們就看一下圖1-2中所示9個方法的具體定義。
(1)getReplicaVisibleLength()
客戶端會調用getReplicaVisibleLength()方法從數據節點獲取某個數據塊副本真實的數據長度。當客戶端讀取一個HDFS文件時,需要獲取這個文件對應的所有數據塊的長度,用于建立數據塊的輸入流,然后讀取數據。但是Namenode元數據中文件的最后一個數據塊長度與Datanode實際存儲的可能不一致,所以客戶端在創建輸入流時就需要調用getReplicaVisibleLength()方法從Datanode獲取這個數據塊的真實長度。
(2)getBlockLocalPathInfo()
HDFS對于本地讀取,也就是Client和保存該數據塊的Datanode在同一臺物理機器上時,是有很多優化的。Client會調用ClientProtocol.getBlockLocalPathInfo()方法獲取指定數據塊文件以及數據塊校驗文件在當前節點上的本地路徑,然后利用這個本地路徑執行本地讀取操作,而不是通過流式接口執行遠程讀取,這樣也就大大優化了讀取的性能。
在HDFS 2.6版本中,客戶端會通過調用DataTransferProtocol接口從數據節點獲取數據塊文件的文件描述符,然后打開并讀取文件以實現短路讀操作,而不是通過ClientDatanodeProtoco接口。客戶端的短路讀操作請參考第5章的文件短路讀操作小節。
(3)refreshNamenodes()
在用戶管理員命令中有一個`hdfs dfsadmindatanodehost:port`命令,用于觸發指定的Datanode重新加載配置文件,停止服務那些已經從配置文件中刪除的塊池(blockPool),開始服務新添加的塊池。塊池的概念請參考第4章的Datanode邏輯結構小節。
這條命令底層就是由ClientDatanodeProtocol.refreshNamenodes()方法實現的,客戶端會通過這個接口觸發對應的Datanode執行操作。
(4)deleteBlockPool()
在用戶管理員命令中還有一個與塊池管理相關的`hdfs dfsadmin-deleteBlockPool datanode-host:port blockpoolId [force]`命令,用于從指定Datanode刪除blockpoolId對應的塊池,如果force參數被設置了,那么無論這個塊池目錄中有沒有數據都會被強制刪除;否則,只有這個塊池目錄為空的情況下才會被刪除。需要注意的是,如果Datanode還在服務這個塊池,這個命令的執行將會失敗。要停止一個數據節點服務指定的塊池,需要調用上面提到的refreshNamenodes()方法。
deleteBlockPool()方法有兩個參數,其中blockpoolId用于設置要被刪除的塊池ID;force用于設置是否強制刪除。

(5)getHdfsBlocksMetadata()
getHdfsBlocksMetadata()方法主要用于獲取數據塊是存儲在指定Datanode的哪個卷(volume)上的,這個方法主要是為了支持DistributedFileSystem.getFileBlockStorageLocations()方法。關于卷(volume)的定義請參考第4章。
(6)shutdownDatanode()
shutdownDatanode()方法用于關閉一個數據節點,這個方法主要是為了支持管理命令`hdfs dfsadmin-shutdownDatanode <datanode_host:ipc_port> [upgrade]`。
(7)getDatanodeInfo()
getDatanodeInfo()方法用于獲取指定Datanode的信息,這里的信息包括Datanode運行的HDFS版本、Datanode配置的HDFS版本,以及Datanode的啟動時間。對應于管理命令`hdfs dfsadmin-getDatanodeInfo`。
(8)startReconfiguration()
startReconfiguration()方法用于觸發Datanode異步地從磁盤重新加載配置,并且應用該配置。這個方法用于支持管理命令` hdfs dfsadmin-getDatanodeInfo-reconfigstart`。
(9)getReconfigurationStatus()
我們知道startReconfiguration()方法是異步地加載配置操作,所以HDFS提供了getReconfigurationStatus()方法用于查詢上一次觸發的重新加載配置操作的運行情況。對應于管理命令`hdfs dfsadmin-getDatanodeInfo-reconfigstartstatus`。我們可以看到getReconfigurationStatus()方法和startReconfiguration()方法對應的管理命令是一樣的,只不過參數不同,一個是start,一個是status。
3.DatanodeProtocol
ClientProtocol和DatanodeProtocol都是由客戶端發起調用的接口,下面我們介紹服務器間的接口。DatanodeProtocol是Datanode與Namenode間的接口,Datanode會使用這個接口與Namenode握手、注冊、發送心跳、進行全量以及增量的數據塊匯報。Namenode會在Datanode的心跳響應中攜帶名字節點指令,Datanode收到名字節點指令之后會執行對應的操作。要特別注意的是,Namenode向Datanode下發名字節點指令是沒有任何其他接口的,只會通過DatanodeProtocol的返回值來下發命令。
DatanodeProtocol定義的方法如圖1-3所示,我們可以將DatanodeProtocol定義的方法分為三種類型:Datanode啟動相關、心跳相關以及數據塊讀寫相關。下面將會分別介紹這三種類型的方法,以及Namenode向Datanode下發的指令。

圖1-3 DatanodeProtocol定義
(1)Datanode啟動相關方法
一個完整的Datanode啟動操作會與Namenode進行4次交互,也就是調用4次DatanodeProtocol定義的方法。首先調用versionRequest()與Namenode進行握手操作,然后調用registerDatanode()向Namenode注冊當前的Datanode,接著調用blockReport()匯報Datanode上存儲的所有數據塊,最后調用cacheReport()匯報Datanode緩存的所有數據塊。
我們首先看一下versionRequest()方法。Datanode啟動時會首先調用versionRequest()方法與Namenode進行握手。這個方法的返回值是一個NamespaceInfo對象,NamespaceInfo對象會封裝當前HDFS集群的命名空間信息,包括存儲系統的布局版本號(layoutversion)、當前的命名空間的ID(namespaceId)、集群ID(clusterId)、文件系統的創建時間(ctime)、構建時的HDFS版本號(buildVersion)、塊池ID(blockpoolId)、當前的軟件版本號(softwareVersion)等。Datanode獲取到NamespaceInfo對象后,就會比較Datanode當前的HDFS版本號和Namenode的HDFS版本號,如果Datanode版本與Namenode版本不能協同工作,則拋出異常,Datanode也就無法注冊到該Namenode上。如果當前Datanode上已經有了文件存儲的目錄,那么Datanode還會檢查Datanode存儲上的塊池ID、文件系統ID以及集群ID與Namenode返回的是否一致。

成功進行握手操作后,Datanode會調用ClientProtocol.registerDatanode()方法向Namenode注冊當前的Datanode,這個方法的參數是一個DatanodeRegistration對象,它封裝了DatanodeID、Datanode的存儲系統的布局版本號(layoutversion)、當前命名空間的ID(namespaceId)、集群ID(clusterId)、文件系統的創建時間(ctime)以及Datanode當前的軟件版本號(softwareVersion)。名字節點會判斷Datanode的軟件版本號與Namenode的軟件版本號是否兼容,如果兼容則進行注冊操作,并返回一個DatanodeRegistration對象供Datanode后續處理邏輯使用。

Datanode成功向Namenode注冊之后,Datanode會通過調用DatanodeProtocol.blockReport()方法向Namenode上報它管理的所有數據塊的信息。這個方法需要三個參數:Datanode Registration用于標識當前的Datanode;poolId用于標識數據塊所在的塊池ID;reports是一個StorageBlockReport對象的數組,每個StorageBlockReport對象都用于記錄Datanode上一個存儲空間存儲的數據塊。這里需要特別注意的是,上報的數據塊是以長整型數組保存的,每個已經提交的數據塊(finalized)以3個長整型來表示,每個構建中的數據塊(under-construction)以4個長整型來表示。之所以不使用ExtendedBlock對象保存上報的數據塊,是因為這樣可以減少blockReport()操作所使用的內存,Namenode接收到消息時,不需要創建大量的ExtendedBlock對象,只需要不斷地從長整型數組中提取數據塊即可。

Namenode接收到blockReport()請求之后,會根據Datanode上報的數據塊存儲情況建立數據塊與數據節點之間的對應關系。同時,Namenode會在blockReport()的響應中攜帶名字節點指令,通知數據節點進行重新注冊、發送心跳、備份或者刪除Datanode本地磁盤上數據塊副本的操作。這些名字節點指令都是以DatanodeCommand對象封裝的,我們會在DatanodeCommand小節詳細介紹名字節點指令以及DatanodeCommand對象。
blockReport()方法只在Datanode啟動時以及指定間隔時執行一次。在這里間隔是由dfs.blockreport.intervalMsec參數配置的,默認是6小時執行一次。cacheReport()方法與blockReport()方法是完全一致的,只不過匯報的是當前Datanode上緩存的所有數據塊。cacheReport()方法的定義如下:

(2)心跳相關方法
我們知道分布式系統的節點之間大多采用心跳來維護節點的健康狀態。HDFS也是一樣,Datanode會定期(由dfs.heartbeat.interval配置項配置,默認是3秒)向Namenode發送心跳,如果Namenode長時間沒有接到Datanode發送的心跳,則Namenode會認為該Datanode失效。
ClientProtocol.sendHeartbeat()方法就是用于心跳匯報的接口,除了攜帶標識Datanode身份的DatanodeRegistration對象外,還包括數據節點上所有存儲的狀態、緩存的狀態、正在寫文件數據的連接數、讀寫數據使用的線程數等。
sendHeartbeat()會返回一個HeartbeatResponse對象,這個對象包含了Namenode向Datanode發送的名字節點指令,以及當前Namenode的HA狀態。需要特別注意的是,在開啟了HA的HDFS集群中,Datanode是需要同時向Active Namenode以及Standby Namenode發送心跳的,不過只有ActiveNamenode才能向Datanode下發名字節點指令。

(3)數據塊讀寫相關方法
上面兩個小節我們介紹了Datanode啟動時以及發送心跳時與Namenode交互的方法。這一小節將介紹Datanode在進行數據塊讀寫操作時與Namenode交互的方法,包括DatanodeProtocol中的reportBadBlocks()、blockReceivedAndDeleted()以及commitBlockSynchronization()方法。下面我們依次介紹這三個方法的定義及使用。
reportBadBlocks()與ClientProtocol.reportBadBlocks()方法很類似,Datanode會調用這個方法向Namenode匯報損壞的數據塊。Datanode會在三種情況下調用這個方法:DataBlockScanner線程定期掃描數據節點上存儲的數據塊,發現數據塊的校驗出現錯誤時;數據流管道寫數據時,Datanode接受了一個新的數據塊,進行數據塊校驗操作出現錯誤時;進行數據塊復制操作(DataTransfer),Datanode讀取本地存儲的數據塊時,發現本地數據塊副本的長度小于Namenode記錄的長度,則認為該數據塊已經無效,會調用reportBadBlocks()方法。reportBadBlocks()方法的參數是LocatedBlock對象,這個對象描述了出現錯誤數據塊的位置,Namenode收到reportBadBlocks()請求后,會下發數據塊副本刪除指令刪除錯誤的數據塊。

Datanode會定期(默認是5分鐘,不可以配置)調用blockReceivedAndDeleted()方法向Namenode匯報Datanode新接受的數據塊或者刪除的數據塊。Datanode接受一個數據塊,可能是因為Client寫入了新的數據塊,或者從別的Datanode上復制一個數據塊到當前Datanode。Datanode刪除一個數據塊,則有可能是因為該數據塊的副本數量過多,Namenode向當前Datanode下發了刪除數據塊副本的指令。我們可以把blockReceivedAndDeleted()方法理解為blockReport()的增量匯報,這個方法的參數包括DatanodeRegistration對象、增量匯報數據塊所在的塊池ID,以及StorageReceivedDeletedBlocks對象的數組,這里的StorageReceived DeletedBlocks對象封裝了Datanode的一個數據存儲上新添加以及刪除的數據塊集合。Namenode接受了這個請求之后,會更新它內存中數據塊與數據節點的對應關系。

DatanodeProtocol中與數據塊讀寫相關的最后一個方法是commitBlockSynchronization(),這個方法用于在租約恢復操作時同步數據塊的狀態。在租約恢復操作時,主數據節點完成所有租約恢復協調操作后調用commitBlockSynchronization()方法同步Datanode和Namenode上數據塊的狀態,所以commitBlockSynchronization()方法包含了大量的參數。對于租約恢復,我們會在第3章的租約管理小節以及第4章的文件系統數據集小節中介紹,請讀者參考這兩個章節內容。

(4)其他方法
DataProtocol中最后一個方法是errorReport(),該方法用于向名字節點上報運行過程中發生的一些狀況,如磁盤不可用等,這個方法在調試時非常有用。由于這個方法并不涉及具體的邏輯,我們就不再詳細介紹了。
(5)DatanodeCommand
通過前面幾個小節的介紹我們知道,sendHeartbeat()、blockReport()以及cacheReport()方法的返回值都會攜帶Namenode向Datanode下發的名字節點指令。在HDFS中,使用DatanodeCommand類描述Namenode向Datanode發出的名字節點指令。DatanodeCommand類以及它的子類結構如圖1-4所示。

圖1-4 DatanodeCommand類結構
DatanodeCommand是所有名字節點的基類,它一共有7個子類,但在DatanodeProtocol中一共定義了10個名字節點指令,每個指令都有一個唯一的編號與之對應,代碼如下:

可以看到,DatanodeProtocol中定義的指令類型數量和DatanodeCommand子類的數量并不一致。這是因為指令編號DNA_SHUTDOWN已經廢棄不用了,Datanode接收到DNA_SHUTDOWN指令后會直接拋出UnsupportedOperationException異常。關閉Datanode是通過調用ClientDatanodeProtocol.shutdownDatanode()方法來觸發的。
這里同時要注意的是,DNA_TRANSFER、DNA_RECOVERBLOCK以及DNA_INVALIDATE都是通過BlockCommand子類來封裝的,只不過參數不同。DNA_TRANSFER指令用于觸發數據節點的數據塊復制操作,當HDFS系統中某個數據塊的副本數小于配置的副本系數時,Namenode會通過DNA_TRANSFER指令通知某個擁有這個數據塊副本的Datanode將該數據塊復制到其他數據節點上。DNA_INVALIDATE用于通知Datanode刪除數據節點上的指定數據塊,這是因為Namenode發現了某個數據塊的副本數已經超過了配置的副本系數,這時Namenode會通知某個數據節點刪除這個數據節點上多余的數據塊副本。當客戶端在寫文件時發生異常退出,會造成數據流管道中不同數據節點上數據塊狀態的不一致,這時Namenode會從數據流管道中選出一個數據節點作為主恢復節點,協調數據流管道中的其他數據節點進行租約恢復操作,以同步這個數據塊的狀態。此時Namenode就會向這個數據節點下發DNA_RECOVERBLOCK指令,通知數據節點開始租約恢復操作。
至于DatanodeCommand中其他類的作用都比較簡單,這里就不再單獨介紹了。
4.InterDatanodeProtocol
介紹完了Datanode與Namenode之間的接口,我們來介紹InterDatanodeProtocol——Datanode與Datanode之間的接口。InterDatanodeProtocol接口主要用于租約恢復操作,如圖1-5所示,InterDatanodeProtocol只有initReplicaRecovery()和updateReplicaUnderRecovery()兩個方法。

圖1-5 InterDatanodeProtocol接口
客戶端打開一個文件進行寫操作時,首先要獲取這個文件的租約,并且還需要定期更新租約。當Namenode的租約監控線程發現某個HDFS文件租約長期沒有更新時,就會認為寫這個文件的客戶端發生異常,這時Namenode就需要觸發租約恢復操作——同步數據流管道中所有Datanode上該文件數據塊的狀態,并強制關閉這個文件。
租約恢復的控制并不是由Namenode負責的,而是Namenode從數據流管道中選出一個主恢復節點,然后通過下發DatanodeCommand的恢復指令觸發這個數據節點控制租約恢復操作,也就是由這個主恢復節點協調整個租約恢復操作的過程。主恢復節點會調用InterDatanodeProtocol接口來指揮數據流管道的其他數據節點進行租約恢復。租約恢復操作其實很簡單,就是將數據流管道中所有數據節點上保存的同一個數據塊狀態(時間戳和數據塊長度)同步一致。當成功完成租約恢復后,主恢復節點會調用DatanodeProtocol.commitBlock Synchronization()方法同步名字節點上該數據塊的時間戳和數據塊長度,保持名字節點和數據節點的一致。
由于數據流管道中同一個數據塊狀態(長度和時間戳)在不同的Datanode上可能是不一致的,所以主恢復節點會首先調用InterDatanodeProtocol.initReplicaRecovery()方法獲取數據流管道中所有數據節點上保存的指定數據塊的狀態,這里的數據塊狀態使用ReplicaRecoveryInfo類封裝。主恢復節點會根據收集到的這些狀態,確定一個當前數據塊的新長度,并且使用Namenode下發的recoverId作為數據塊的新時間戳。

主恢復節點計算出數據塊的新長度后,就會調用InterDatanodeProtocol.updateReplicaUnder Recovery()方法將數據流管道中所有節點上該數據塊的長度同步為新的長度,將數據塊的時間戳同步為新的時間戳。

當完成了所有的同步操作后,主恢復節點節就可以調用DatanodeProtocol.commitBlock Synchronization()將Namenode上該數據塊的長度和時間戳同步為新的長度和時間戳,這樣Datanode和Namenode的數據也就一致了。
5.NamenodeProtocol
NamenodeProtocol定義了第二名字節點與名字節點之間的接口。由于在Hadoop 2.X架構中第二名字節點的功能已經完全被Standby節點所取代,這個接口我們就不再詳細介紹了。圖1-6給出了NamenodeProtocol定義的所有方法,讀者可以參考這個圖讀取相關的代碼。

圖1-6 NamenodeProtocol定義
- 計算機網絡
- Learn Blockchain Programming with JavaScript
- Practical Internet of Things Security
- C語言程序設計實訓教程
- Java加密與解密的藝術(第2版)
- 編譯系統透視:圖解編譯原理
- 零基礎輕松學SQL Server 2016
- RSpec Essentials
- Natural Language Processing with Python Quick Start Guide
- C#面向對象程序設計(第2版)
- 零基礎C#學習筆記
- 實戰Python網絡爬蟲
- JavaEE架構與程序設計
- Android應用開發攻略
- Visual FoxPro數據庫程序設計