官术网_书友最值得收藏!

3.1.3 流應(yīng)用程序剖析

所有的Flink應(yīng)用程序都以特定的步驟來工作,這些工作步驟如圖3-2所示。

圖3-2 Flink應(yīng)用程序工作步驟

也就是說,每個Flink程序都由相同的基本部分組成:

(1)獲取一個執(zhí)行環(huán)境。

(2)加載/創(chuàng)建初始數(shù)據(jù)。

(3)指定對該數(shù)據(jù)的轉(zhuǎn)換。

(4)指定將計算結(jié)果放在哪里。

(5)觸發(fā)流程序執(zhí)行。

1.獲取一個執(zhí)行環(huán)境

Flink應(yīng)用程序從其main()方法中生成一個或多個Flink作業(yè)(job)。這些作業(yè)可以在本地JVM(LocalEnvironment)中執(zhí)行,也可以在具有多臺機器的集群的遠程設(shè)置中執(zhí)行(RemoteEnvironment)。對于每個程序,ExecutionEnvironment提供了控制作業(yè)執(zhí)行(例如設(shè)置并行性或容錯/檢查點參數(shù))和與外部環(huán)境交互(數(shù)據(jù)訪問)的方法。

每個Flink應(yīng)用程序都需要一個執(zhí)行環(huán)境(本例中為env)。流應(yīng)用程序需要的執(zhí)行環(huán)境使用的是StreamExecutionEnvironment。為了開始編寫Flink程序,用戶首先需要獲得一個現(xiàn)有的執(zhí)行環(huán)境,如果沒有,就需要先創(chuàng)建一個。根據(jù)目的不同,F(xiàn)link支持以下幾種方式:

(1)獲得一個已經(jīng)存在的Flink環(huán)境。

(2)創(chuàng)建本地環(huán)境。

(3)創(chuàng)建遠程環(huán)境。

Flink流程序的入口是StreamExecutionEnvironment類的一個實例,它定義了程序執(zhí)行的上下文。StreamExecutionEnvironment是所有Flink程序的基礎(chǔ)。可以通過一些靜態(tài)方法獲得一個StreamExecutionEnvironment的實例,代碼如下:

要獲得執(zhí)行環(huán)境,通常只需調(diào)用getExecutionEnvironment()方法。這將根據(jù)上下文選擇正確的執(zhí)行環(huán)境。如果正在IDE中的本地環(huán)境上執(zhí)行,則它將啟動一個本地執(zhí)行環(huán)境。如果是從程序中創(chuàng)建了一個JAR文件,并通過命令行調(diào)用它,則Flink集群管理器將執(zhí)行main()方法,getExecutionEnvironment()將返回用于在集群上以分布式方式執(zhí)行程序的執(zhí)行環(huán)境。

在上面的示例程序中,使用以下語句來獲得流程序的執(zhí)行環(huán)境。

Scala代碼如下:

Java代碼如下:

StreamExecutionEnvironment包含ExecutionConfig,可使用它為運行時設(shè)置特定于作業(yè)的配置值。例如,如果要設(shè)置自動水印發(fā)送間隔,則可以像下面這樣在代碼進行配置。

Scala代碼如下:

Java代碼如下:

2.加載/創(chuàng)建初始數(shù)據(jù)

執(zhí)行環(huán)境可以從多種數(shù)據(jù)源讀取數(shù)據(jù),包括文本文件、CSV文件、Socket套接字數(shù)據(jù)等,也可以使用自定義的數(shù)據(jù)輸入格式。例如,要將文本文件讀取為行序列,代碼如下:

數(shù)據(jù)被逐行讀到內(nèi)存后,F(xiàn)link會將它們組織到DataStream中,這是Flink中用來表示流數(shù)據(jù)的特殊類。

在3.1.2節(jié)的示例程序【示例3-1】中,使用fromElements()方法讀取集合數(shù)據(jù),并將讀取的數(shù)據(jù)存儲為DataStream類型。

Scala代碼如下:

Java代碼如下:

3.對數(shù)據(jù)進行轉(zhuǎn)換

每個Flink程序都對分布式數(shù)據(jù)集合執(zhí)行轉(zhuǎn)換。Flink的DataStream API提供了多種數(shù)據(jù)轉(zhuǎn)換功能,包括過濾、映射、連接、分組和聚合。例如,下面是一個map轉(zhuǎn)換應(yīng)用,通過將原始集合中的每個字符串轉(zhuǎn)換為整數(shù)來創(chuàng)建一個新的DataStream,代碼如下:

在3.1.2節(jié)的示例程序【示例3-1】中使用了filter過濾轉(zhuǎn)換,將原始數(shù)據(jù)集轉(zhuǎn)換為只包含成年人信息的新DataStream流。

Scala代碼如下:

Java代碼如下:

這里不必了解每個轉(zhuǎn)換的具體含義,后面會詳細介紹它們。需要強調(diào)的是,F(xiàn)link中的轉(zhuǎn)換是惰性的,在調(diào)用sink操作之前不會真正執(zhí)行。

4.指定將計算結(jié)果放在哪里

一旦有了包含最終結(jié)果的DataStream,就可以通過創(chuàng)建接收器(sink)將其寫入外部系統(tǒng)。例如,將計算結(jié)果輸出到屏幕上。

Scala代碼如下:

Java代碼如下:

Flink中的接收器操作觸發(fā)流的執(zhí)行,以生成程序所需的結(jié)果,例如將結(jié)果保存到文件系統(tǒng)或?qū)⑵浯蛴〉綐藴瘦敵觥I厦娴氖纠褂胊dults.print()將結(jié)果打印到任務(wù)管理器日志中(在IDE中運行時,任務(wù)管理器日志將顯示在IDE的控制臺中)。這將對流的每個元素調(diào)用其toString()方法。

5.觸發(fā)流程序執(zhí)行

一旦寫好了程序處理邏輯,就需要通過調(diào)用StreamExecutionEnvironment上的execute()來觸發(fā)程序執(zhí)行。所有的Flink程序都是時延執(zhí)行的:當程序的主方法執(zhí)行時,數(shù)據(jù)加載和轉(zhuǎn)換不會直接發(fā)生,而是創(chuàng)建每個運算并添加到程序的執(zhí)行計劃中。當執(zhí)行環(huán)境上的execute()調(diào)用顯式觸發(fā)執(zhí)行時,這些操作才真正被執(zhí)行。程序是在本地執(zhí)行還是提交到集群中執(zhí)行取決于ExecutionEnvironment的類型。

時延計算可以讓用戶構(gòu)建復雜的程序,然后Flink將其作為一個整體計劃的單元執(zhí)行。在3.1.2節(jié)的示例程序【示例3-1】中,使用如下代碼來觸發(fā)流處理程序的執(zhí)行。

Scala代碼如下:

Java代碼如下:

在應(yīng)用程序中執(zhí)行的DataStream API調(diào)用將構(gòu)建一個附加到StreamExecutionEnvironment的作業(yè)圖(Job Graph)。調(diào)用env.execute()時,此圖被打包并發(fā)送到Flink Master,該Master并行化作業(yè)并將其片段分發(fā)給TaskManagers以供執(zhí)行。作業(yè)的每個并行片段將在一個Task Slot(任務(wù)槽)中執(zhí)行,如圖3-3所示。

圖3-3 Flink流應(yīng)用程序執(zhí)行原理

這個分布式運行時要求Flink應(yīng)用程序是可序列化的。它還要求集群中的每個節(jié)點都可以使用所有依賴項。

StreamExecutionEnvironment上的execute()方法將等待作業(yè)完成,然后返回一個JobExecutionResult,其中包含執(zhí)行時間和累加器結(jié)果。注意,如果不調(diào)用execute(),則應(yīng)用程序?qū)⒉粫\行。

如果不想等待作業(yè)完成,則可以通過調(diào)用StreamExecutionEnvironment上的executeAysnc()來觸發(fā)異步作業(yè)執(zhí)行。它將返回一個JobClient,可以使用它與剛才提交的作業(yè)進行通信。例如,下面的示例代碼演示了如何通過executeAsync()實現(xiàn)execute()的語義。

Scala代碼如下:

Java代碼如下:

主站蜘蛛池模板: 托克托县| 惠安县| 阳山县| 教育| 门源| 沾益县| 晋江市| 宽城| 南漳县| 辽宁省| 离岛区| 钟祥市| 曲靖市| 昌黎县| 镇雄县| 惠州市| 镇平县| 佳木斯市| 巴青县| 德庆县| 阳曲县| 威海市| 芮城县| 和硕县| 怀宁县| 赫章县| 宝山区| 三江| 梅河口市| 安国市| 鄱阳县| 济源市| 宜良县| 南宫市| 南开区| 唐山市| 丁青县| 和顺县| 柞水县| 彭水| 紫阳县|