- Flink原理深入與編程實戰:Scala+Java(微課視頻版)
- 辛立偉編著
- 2188字
- 2023-07-17 18:54:32
1.4.2 Flink完全分布式集群安裝
Flink支持完全分布式安裝模式,這時它由一個master節點和多個worker節點構成。本節將演示如何搭建一個三節點的Flink集群,如圖1-30所示。

圖1-30 Flink完全分布式集群設計
1.完全分布式集群安裝
Flink完全分布式集群搭建步驟如下:
(1)配置從master(主節點)到worker(工作節點)的SSH無密登錄,并保持節點上相同的目錄結構。
注意:SSH原理:master作為客戶端,要實現無密碼公鑰認證,連接到服務器端worker上時,需要在master上生成一個密鑰對,包括一個公鑰和一個私鑰,而后將公鑰復制到worker上。當master通過SSH連接worker時,worker就會生成一個隨機數并用master的公鑰對隨機數進行加密,并發送給master。master收到加密數之后再用私鑰進行解密,并將解密數回傳給worker,worker確認解密數無誤之后就允許master進行連接了。這就是一個公鑰認證過程,其間不需要用戶手工輸入密碼。重要過程是將客戶端master公鑰復制到worker上。
①在每臺機器上,執行的命令如下:

②在master節點上,生成公私鑰,命令如下:

然后一路按Enter鍵,在.ssh目錄下生成公私鑰。
③將master上的公鑰分別加入master、worker1和worker2機器的授權文件中。
在master機器上,執行的命令如下:

④測試。在master機器上,使用SSH分別連接master、worker1和worker2,命令如下:

這時會發現不需要輸入密碼,直接就連接上了這兩臺機器。
(2)Flink要求在主節點和所有工作節點上設置Java_HOME環境變量,并指向Java安裝的目錄。檢查Java的安裝和版本信息,使用的命令如下:

(3)下載Flink安裝包。下載網址為https://flink.apache.org/downloads.html。可以選擇任何喜歡的Hadoop/Scala組合。因為Flink版本更新迭代比較快,并且每次版本升級都有許多API變動,因此建議讀者學習時也安裝與本書相同的版本。本教程選擇使用的是1.13.2版本,基于Scala 2.12。
(4)將下載的最新版本的Flink壓縮包復制到master節點的~/software/目錄下,并解壓縮到~/bigdata/目錄下,命令如下:

(5)在master節點上配置Flink。
所有的配置都在conf/flink-conf.yaml文件中。在實際應用中,以下幾個配置項是非常重要的。
①jobmanager.heap.mb:每個JobManager的可用內存量,以MB為單位。
②taskmanager.heap.mb:每個TaskManager的可用內存量,以MB為單位。
③taskmanager.numberOfTaskSlots:每臺機器上可用的CPU數量,默認為1。
④parallelism.default:集群中CPU的總數。
⑤io.tmp.dirs:臨時目錄。
首先用編輯器nano打開該配置文件(讀者也可以用任何自己喜歡的編輯器,如vim),命令如下:

編輯以下內容(注意,冒號后面一定要有一個空格):

(6)每個節點下的Flink必須保持相同的目錄內容,因此將配置好的Flink復制到集群中的另外兩個節點worker01和worker02,使用的命令如下:

(7)最后,必須提供集群中所有用作worker節點的列表,每個worker節點稍后將運行一個TaskManager。在conf/slaves文件中添加每個slave節點信息(IP或hostname均可),每個節點一行,節點信息如下:

(8)啟動Flink集群,命令如下:

這個腳本會在本地節點啟動一個JobManager并通過SSH連接到所有的worker節點(在slaves文件中列出的)以啟動每個節點上的TaskManager。注意觀察啟動過程中的輸出信息,輸出信息如下:

可以看出,Flink先在master上啟動standalonesession進程,然后依次在master、worker1和worker2上啟動taskexecutor進程。
啟動以后,可以分別在master、worker1和worker2節點上執行jps命令,查看各節點上的進程是否正常啟動了。
(9)關閉集群。執行的命令如下:

也可以分別停止JobManager和TaskManager,命令如下:

2.運行Flink自帶的實時單詞計數流程序
Flink安裝包自帶了一個以Socket作為數據源的實時統計單詞計數的流程序,位于Flink下的example/streaming/SocketWindowWordCount.jar包中。可以通過運行這個流程序來測試Flink集群的使用方法。建議按以下步驟執行。
(1)首先,啟動netcat服務器,運行在9000端口,使用的命令如下:

(2)打開另一個終端啟動Flink集群,執行的命令如下:

(3)在另一個終端,啟動Flink示例程序,監聽netcat服務器。它將從套接字中讀取文本,并每5s打印前5s內每個不同單詞出現的次數,即處理時間的滾動窗口,命令如下:

(4)回到第1個正在運行netcat的終端窗口,隨意輸入一些單詞,單詞之間用空格分隔,Flink將會處理這些單詞。例如,輸入以下文本內容:

(5)分別使用SSH登錄master、worker01和worker02節點,并執行以下命令,查看日志中的輸出:

可以看到輸出結果如下:

(6)還可以檢查Flink Web UI來查看Job是怎樣執行的。
打開瀏覽器,輸入地址http://localhost:8081,可查看調度程序的Web前端。Web前端應該報告有3個可用的TaskManager實例,以及正在執行的作業。Flink Web UI包含許多關于Flink集群及其作業(JobGraph、指標、檢查點統計、TaskManager狀態等)的有用且有趣的信息,如圖1-31所示。

圖1-31 在Flink Web UI中查看作業的執行
單擊圖中的Running Job List下正在運行的作業列表,查看某個正在運行的作業執行情況,如圖1-32所示。

圖1-32 在Flink Web UI中查看正在運行的作業執行情況
(7)最后停止Flink集群,使用的命令如下:

3.運行Flink自帶的單詞計數批處理程序
Flink安裝包自帶了一個以文本文件作為數據源的單詞計數批處理程序,位于Flink下的example/batch/目錄下的WordCount.jar包中。下面演示如何在Flink集群上執行該程序,讀取HDFS上的輸入數據文件進行處理,并將計算結果輸出到HDFS上。
建議按以下步驟執行。
(1)集成Hadoop。因為要讀取HDFS上的源數據文件,所以需要在Flink中集成Hadoop包。從Flink 1.8開始,Hadoop不再包含在Flink的安裝包中,所以需要單獨下載并復制到Flink的lib目錄下。集成過程可參見1.4.1節中的部分內容。
(2)準備環境。在終端窗口中,先啟動Flink集群,再啟動HDFS集群,執行的命令如下:

(3)準備數據文件。編輯數據文件wc.txt,內容如下:

然后將該文件上傳到HDFS的/data/flink/目錄下,使用的命令如下:

(4)執行Flink的批處理程序,命令如下:

執行過程如圖1-33所示。
上面的命令是在運行WordCount時讀寫HDFS中的文件的,其中--input參數用于指定要處理的輸入文件,--output參數用于指定將計算結果輸出到哪個文件(注:如果不加hdfs://前綴,則默認使用本地文件系統)。
(5)查詢輸出結果。在終端窗口中,執行的命令如下:


圖1-33 Flink單詞計數批處理程序執行過程
可以看到計算結果如下:

- Getting Started with Gulp(Second Edition)
- Oracle 11g從入門到精通(第2版) (軟件開發視頻大講堂)
- Learning Apex Programming
- Mastering Spring MVC 4
- Rust編程從入門到實戰
- Quarkus實踐指南:構建新一代的Kubernetes原生Java微服務
- INSTANT Passbook App Development for iOS How-to
- 軟件測試教程
- 匯編語言編程基礎:基于LoongArch
- Regression Analysis with Python
- WCF技術剖析(卷1)
- Web開發的平民英雄:PHP+MySQL
- 輕松學Scratch 3.0 少兒編程(全彩)
- HTML5程序開發范例寶典
- Android項目實戰:博學谷