- Flink原理深入與編程實戰(zhàn):Scala+Java(微課視頻版)
- 辛立偉編著
- 1869字
- 2023-07-17 18:54:42
3.1.3 流應(yīng)用程序剖析
所有的Flink應(yīng)用程序都以特定的步驟來工作,這些工作步驟如圖3-2所示。

圖3-2 Flink應(yīng)用程序工作步驟
也就是說,每個Flink程序都由相同的基本部分組成:
(1)獲取一個執(zhí)行環(huán)境。
(2)加載/創(chuàng)建初始數(shù)據(jù)。
(3)指定對該數(shù)據(jù)的轉(zhuǎn)換。
(4)指定將計算結(jié)果放在哪里。
(5)觸發(fā)流程序執(zhí)行。
1.獲取一個執(zhí)行環(huán)境
Flink應(yīng)用程序從其main()方法中生成一個或多個Flink作業(yè)(job)。這些作業(yè)可以在本地JVM(LocalEnvironment)中執(zhí)行,也可以在具有多臺機器的集群的遠程設(shè)置中執(zhí)行(RemoteEnvironment)。對于每個程序,ExecutionEnvironment提供了控制作業(yè)執(zhí)行(例如設(shè)置并行性或容錯/檢查點參數(shù))和與外部環(huán)境交互(數(shù)據(jù)訪問)的方法。
每個Flink應(yīng)用程序都需要一個執(zhí)行環(huán)境(本例中為env)。流應(yīng)用程序需要的執(zhí)行環(huán)境使用的是StreamExecutionEnvironment。為了開始編寫Flink程序,用戶首先需要獲得一個現(xiàn)有的執(zhí)行環(huán)境,如果沒有,就需要先創(chuàng)建一個。根據(jù)目的不同,F(xiàn)link支持以下幾種方式:
(1)獲得一個已經(jīng)存在的Flink環(huán)境。
(2)創(chuàng)建本地環(huán)境。
(3)創(chuàng)建遠程環(huán)境。
Flink流程序的入口是StreamExecutionEnvironment類的一個實例,它定義了程序執(zhí)行的上下文。StreamExecutionEnvironment是所有Flink程序的基礎(chǔ)。可以通過一些靜態(tài)方法獲得一個StreamExecutionEnvironment的實例,代碼如下:

要獲得執(zhí)行環(huán)境,通常只需調(diào)用getExecutionEnvironment()方法。這將根據(jù)上下文選擇正確的執(zhí)行環(huán)境。如果正在IDE中的本地環(huán)境上執(zhí)行,則它將啟動一個本地執(zhí)行環(huán)境。如果是從程序中創(chuàng)建了一個JAR文件,并通過命令行調(diào)用它,則Flink集群管理器將執(zhí)行main()方法,getExecutionEnvironment()將返回用于在集群上以分布式方式執(zhí)行程序的執(zhí)行環(huán)境。
在上面的示例程序中,使用以下語句來獲得流程序的執(zhí)行環(huán)境。
Scala代碼如下:

Java代碼如下:

StreamExecutionEnvironment包含ExecutionConfig,可使用它為運行時設(shè)置特定于作業(yè)的配置值。例如,如果要設(shè)置自動水印發(fā)送間隔,則可以像下面這樣在代碼進行配置。
Scala代碼如下:

Java代碼如下:

2.加載/創(chuàng)建初始數(shù)據(jù)
執(zhí)行環(huán)境可以從多種數(shù)據(jù)源讀取數(shù)據(jù),包括文本文件、CSV文件、Socket套接字數(shù)據(jù)等,也可以使用自定義的數(shù)據(jù)輸入格式。例如,要將文本文件讀取為行序列,代碼如下:

數(shù)據(jù)被逐行讀到內(nèi)存后,F(xiàn)link會將它們組織到DataStream中,這是Flink中用來表示流數(shù)據(jù)的特殊類。
在3.1.2節(jié)的示例程序【示例3-1】中,使用fromElements()方法讀取集合數(shù)據(jù),并將讀取的數(shù)據(jù)存儲為DataStream類型。
Scala代碼如下:

Java代碼如下:

3.對數(shù)據(jù)進行轉(zhuǎn)換
每個Flink程序都對分布式數(shù)據(jù)集合執(zhí)行轉(zhuǎn)換。Flink的DataStream API提供了多種數(shù)據(jù)轉(zhuǎn)換功能,包括過濾、映射、連接、分組和聚合。例如,下面是一個map轉(zhuǎn)換應(yīng)用,通過將原始集合中的每個字符串轉(zhuǎn)換為整數(shù)來創(chuàng)建一個新的DataStream,代碼如下:

在3.1.2節(jié)的示例程序【示例3-1】中使用了filter過濾轉(zhuǎn)換,將原始數(shù)據(jù)集轉(zhuǎn)換為只包含成年人信息的新DataStream流。
Scala代碼如下:

Java代碼如下:

這里不必了解每個轉(zhuǎn)換的具體含義,后面會詳細介紹它們。需要強調(diào)的是,F(xiàn)link中的轉(zhuǎn)換是惰性的,在調(diào)用sink操作之前不會真正執(zhí)行。
4.指定將計算結(jié)果放在哪里
一旦有了包含最終結(jié)果的DataStream,就可以通過創(chuàng)建接收器(sink)將其寫入外部系統(tǒng)。例如,將計算結(jié)果輸出到屏幕上。
Scala代碼如下:

Java代碼如下:

Flink中的接收器操作觸發(fā)流的執(zhí)行,以生成程序所需的結(jié)果,例如將結(jié)果保存到文件系統(tǒng)或?qū)⑵浯蛴〉綐藴瘦敵觥I厦娴氖纠褂胊dults.print()將結(jié)果打印到任務(wù)管理器日志中(在IDE中運行時,任務(wù)管理器日志將顯示在IDE的控制臺中)。這將對流的每個元素調(diào)用其toString()方法。
5.觸發(fā)流程序執(zhí)行
一旦寫好了程序處理邏輯,就需要通過調(diào)用StreamExecutionEnvironment上的execute()來觸發(fā)程序執(zhí)行。所有的Flink程序都是時延執(zhí)行的:當程序的主方法執(zhí)行時,數(shù)據(jù)加載和轉(zhuǎn)換不會直接發(fā)生,而是創(chuàng)建每個運算并添加到程序的執(zhí)行計劃中。當執(zhí)行環(huán)境上的execute()調(diào)用顯式觸發(fā)執(zhí)行時,這些操作才真正被執(zhí)行。程序是在本地執(zhí)行還是提交到集群中執(zhí)行取決于ExecutionEnvironment的類型。
時延計算可以讓用戶構(gòu)建復雜的程序,然后Flink將其作為一個整體計劃的單元執(zhí)行。在3.1.2節(jié)的示例程序【示例3-1】中,使用如下代碼來觸發(fā)流處理程序的執(zhí)行。
Scala代碼如下:

Java代碼如下:

在應(yīng)用程序中執(zhí)行的DataStream API調(diào)用將構(gòu)建一個附加到StreamExecutionEnvironment的作業(yè)圖(Job Graph)。調(diào)用env.execute()時,此圖被打包并發(fā)送到Flink Master,該Master并行化作業(yè)并將其片段分發(fā)給TaskManagers以供執(zhí)行。作業(yè)的每個并行片段將在一個Task Slot(任務(wù)槽)中執(zhí)行,如圖3-3所示。

圖3-3 Flink流應(yīng)用程序執(zhí)行原理
這個分布式運行時要求Flink應(yīng)用程序是可序列化的。它還要求集群中的每個節(jié)點都可以使用所有依賴項。
StreamExecutionEnvironment上的execute()方法將等待作業(yè)完成,然后返回一個JobExecutionResult,其中包含執(zhí)行時間和累加器結(jié)果。注意,如果不調(diào)用execute(),則應(yīng)用程序?qū)⒉粫\行。
如果不想等待作業(yè)完成,則可以通過調(diào)用StreamExecutionEnvironment上的executeAysnc()來觸發(fā)異步作業(yè)執(zhí)行。它將返回一個JobClient,可以使用它與剛才提交的作業(yè)進行通信。例如,下面的示例代碼演示了如何通過executeAsync()實現(xiàn)execute()的語義。
Scala代碼如下:

Java代碼如下:

- 深入核心的敏捷開發(fā):ThoughtWorks五大關(guān)鍵實踐
- GitLab Cookbook
- Java開發(fā)入行真功夫
- 區(qū)塊鏈:以太坊DApp開發(fā)實戰(zhàn)
- 零基礎(chǔ)輕松學SQL Server 2016
- Building RESTful Python Web Services
- C語言從入門到精通
- 計算機應(yīng)用基礎(chǔ)案例教程
- Programming with CodeIgniterMVC
- jQuery從入門到精通(微課精編版)
- Visual Basic語言程序設(shè)計上機指導與練習(第3版)
- Jakarta EE Cookbook
- Python Penetration Testing Essentials
- Developer,Advocate!
- Node.js Web Development