- Flink內核原理與實現
- 馮飛 崔鵬云 陳冠華編著
- 4051字
- 2021-04-14 11:30:35
3.5 函數體系
函數在Flink中叫作Function,開發者編寫的函數叫作UDF(User Defined Function),當然Flink對于通用場景也內置了大量的預定義的通用UDF來簡化開發,如Join、GroupBy、Sum等SQL語義等價的UDF。UDF在Flink的DataStream開發和SQL開發中被廣泛使用。開發者使用UDF主要是實現非通用的計算邏輯,一般是業務邏輯。在本書語境中,UDF、Function、用戶自定義函數的含義是相同的。
按照輸入和輸入的不同特點分類,Flink中的UDF大概分為3類(見圖3-23)。
1. SourceFunction
無上游Function,SourceFunction直接從外部數據存儲讀取數據,所以SourceFunction所在的算子是起始,沒有上游算子。

圖3-23 Function分類與關系
2. SinkFunction
無下游Function,SinkFunction直接將數據寫入外部存儲,所以Sink函數所在的算子是作業的重點,沒有下游算子。
3. 一般Function
一般的UDF函數用在作業的中間處理步驟中,其接口定義與SourceFunction和SinkFunction不同。一般UDF所在的算子有上游算子,也有下游算子。
Flink的一般UDF有單流輸入和雙流輸入兩種,從UDF輸入、輸出的模型來說,多流輸入可以通過多個雙流輸入串聯而成,這種設計比較簡單實用,如圖3-24所示。

圖3-24 多流輸入轉換為多層雙流輸入
SourceFunction和SinkFunction主要在Flink中的連接器使用,也會在自定義讀取、寫出數據的時候使用。其余的大量實現邏輯的函數都屬于一般UDF。
3.5.1 函數層次
UDF在DataStream API層使用,Flink提供的函數體系從接口的層級來看,從高階Function到低階Function如圖3-25所示。

圖3-25 Function層次
Flink內置的DataStream上的API接口,如DataStream#map、DataStream#flatMap、DataStreamFilter#filter等,使用的都是高階函數,開發者使用高階函數的時候,無須關心定時器之類的底層概念,只需要關注業務邏輯即可。低階函數即ProcessFunction。
無狀態Function用來做無狀態計算,使用比較簡單,如MapFunction。無狀態Function和RichFunction是一一對應的,如MapFunction對應RichMapFunction,如代碼清單3-5所示。
代碼清單3-5 MapFunction代碼示例

從上邊的代碼可以看出來,使用MapFunction只需實現Map方法即可,所以無狀態Function一般都是直接繼承接口,如Map接口,或者通過匿名類實現接口。
RichFunction相比無狀態Function,有兩方面的增強:
1)增加了open和close方法來管理Function的生命周期,在作業啟動時,Function在open方法中執行初始化,在Function停止時,在close方法中執行清理,釋放占用的資源等。無狀態Function不具備此能力。
2)增加了getRuntimeContext和setRuntimeContext。通過RuntimeContext,RichFunction能夠獲取到執行時作業級別的參數信息,而無狀態Function不具備此能力。
無狀態Function天然是容錯的,作業失敗之后,重新執行即可,但是有狀態的Function(RichFunction)需要處理中間結果的保存和恢復,待有了狀態的訪問能力,也就意味著Function是可以容錯的,執行過程中,狀態會進行快照然后備份,在作業失敗,Function能夠從快照中恢復回來。
3.5.2 處理函數
處理函數(ProcessFunction)可以訪問流應用程序所有(非循環)基本構建塊:
1)事件(數據流元素)。
2)狀態(容錯和一致性)。
3)定時器(事件時間和處理時間)。
ProcessFunction根據場景不同有幾種實現,如圖3-26所示。

圖3-26 ProcessFunction類體系
1)ProcessFunction:單流輸入函數。
2)CoProcessFunction:雙流輸入函數。
3)KeyedProcessFunction:單流輸入函數。
4)KeyedCoProcessFunction:雙流輸入函數。
Kyed ProcessFunction與Non-Keyed ProcessFunction的區別是,Keyed ProcessFunction只能用在KeyedStream上。
ProcessFunction和CoProcessFunction的區別是,CoProcessFunction是雙流輸入,而ProcessFunction是單流輸入。
1.雙流Join
下面是使用CoProcessFunction實現雙流Join的例子。
(1)即時雙流Join
其邏輯如下(見圖3-27)。
1)創建1個State對象。
2)接收到輸入流1事件后更新State。
3)接收到輸入流2的事件后遍歷State,根據Join條件進行匹配,將匹配后的結果發送到下游。

圖3-27 雙流即時Join
(2)延遲雙流Join
在流式數據里,數據可能是亂序的,數據會延遲到達,并且為了提供處理效率,使用小批量計算模式,而不是每個事件觸發一次Join計算,如圖3-28所示。

圖3-28 雙流延遲Join
其邏輯如下。
1)創建2個State對象,分別緩存輸入流1和輸入流2的事件。
2)創建1個定時器,等待數據的到達,定時延遲觸發Join計算。
3)接收到輸入流1事件后更新State。
4)接收到輸入流2事件后更新State。
5)定時器遍歷State1和State2,根據Join條件進行匹配,將匹配后的結果發送到下游。
2.延遲計算
在上面的延遲Join示例中,使用了計時器來暫存一批數據之后再觸發計算,在流計算中這是非常常見的場景。在前面提到的批流合一的關鍵概念中,關鍵是Watermark和Window,在Flink中的窗口計算(WindowOperator)就是典型的延遲計算,使用Window暫存數據,使用Watermark觸發Window的計算,如圖3-29所示。在Blink Table & SQL中也大量使用了定時器。

圖3-29 延遲計算過程
觸發器在算子層面上提供支持,所有支持延遲計算的算子都繼承了Triggerable接口。Triggerable接口主要定義了基于事件時間和基于處理時間的兩種觸發行為,如代碼清單3-6所示。
代碼清單3-6 Triggerable接口

3.5.3 廣播函數
前邊介紹了單輸入Function和雙輸入Function。在Flink 1.5.0版本中引入了廣播狀態模式,將一個數據流的內容廣播到另一個流中,同時也引入了新的函數類型:廣播函數。
廣播函數的體系如圖3-30所示。
在圖3-30中可以看到,廣播函數有BroadcastProcessFunction和KeyedBroadcastProcess Function,廣播函數跟雙流輸入的處理函數類似,也有兩個數據元素處理的接口,processElement()負責處理一般的數據流,processBroadcastElement()負責處理廣播數據流。完整定義如代碼清單3-7和代碼清單3-8所示。

圖3-30 廣播函數體系
代碼清單3-7 BroadcastProcessFunction抽象類

代碼清單3-8 BroadcastProcessFunction類

上面的兩個廣播函數BroadcastProcessFunction和KeyedBroadcastProcessFunction都是抽象類,所以在實際使用中,開發者需要實現其定義的抽象方法。
processElement()方法和processBroadcastElement()方法的區別在于:processElement只能使用只讀的上下文ReadOnlyContext,而processBroadcastElement()方法則可以使用支持讀寫的上下文Context。這么設計看起來很奇怪,但是合理的。廣播狀態模式下,要求所有算子上的廣播狀態應完全一致,如果也允許processElement方法更新、刪除廣播狀態中的數據,那么會使得算子之間的廣播狀態變得不一致,導致系統行為不可預測。在后邊會介紹數據分區,數據分區會將數據流進行分流,交給下游的不同算子,那么不同算子接收的數據流就是不同的,如果開發者在processElement方法中更新了廣播狀態,必然會導致廣播狀態變得不一致。也許會有人說,在算子更新廣播狀態的時候,通知其他算子不就可以了嗎?但是Flink中的平行算子之間沒有通信接口,所以此處的設計強制要求processElement()不能更新廣播狀態。
注意,只有設計的強制要求還不夠,processBroadcastElement()必須確保行為的不可變性,即無論什么時間、在哪個物理機器、廣播數據是否亂序,都必須保證執行結果完全相同。比較典型的破壞不可變性的例子包括處理邏輯依賴于當前時間,不同的節點當前時間并不完全一致,而且還要考慮到作業恢復執行的情況,因此跟恢復之前的當前時間更是不可能相同。
3.5.4 異步函數
在介紹異步算子的時候提到了異步函數(AsyncFunction),異步函數就是對Java異步編程框架的封裝。
異步函數的類體系如圖3-31所示。

圖3-31 異步函數類體系
如圖3-31所示,異步函數的抽象類RichAsyncFunction實現AsyncFunction接口,繼承AbstractRichFunction獲得了生命周期管理和FunctionContext的訪問能力。
異步函數的接口中定義了兩種行為,異步調用行為將調用結果封裝到ResultFutrue中,同時提供了調用超時的處理,防止不釋放資源,如代碼清單3-9所示。
代碼清單3-9 AsyncFunction接口


3.5.5 數據源函數
數據源函數在Flink中叫作SourceFunction,Flink是一個計算引擎,其需要從外部讀取數據,所以在Flink中設計了SourceFunction體系,專門用來從外部存儲讀取數據。SourceFunction是Flink作業的起點,一個作業中可以有多個起點,即讀取多個數據源的數據。
SourceFunction體系如圖3-32所示。

圖3-32 SourceFunction體系
SourceFunction接口本身只定義了接口的業務邏輯相關行為,在實際使用中,一般會繼承抽象類RichSourceFunction或RichParallelSourceFunction。這兩個抽象類通過繼承AbstractRichFunction獲得了Function的生命周期管理、訪問RuntimeContext的能力。
從類的定義上來說,RichSourceFunction和RichParallelSourceFunction的代碼完全相同,甚至代碼中的注釋都基本相同,但是為什么要設計這兩個類呢?
其實這兩個類的差異在運行層面上,RichSourceFunction是不可并行的,并行度限定為1,超過1則會報錯。而RichParallelSourceFunction是可并行的,并行度可以根據需要設定,并沒有限制。差異體現在StreamExecutionEnvironment#addSource方法中,其對Function的類型進行了判斷,如果是ParallelSourceFunction類型,則是可并行的。如代碼清單3-10所示。
代碼清單3-10 構造DataStreamSource


SourceFunction有幾個比較關鍵的行為。
1)生命周期管理:在實際中,一般SourceFunction的實現類會同時繼承AbstractRichFunction,所以其生命周期包含open、close、cancle三種方法,在生命周期方法中可以包含相應的初始化、清理等。
2)讀取數據: 持續地從外部存儲讀取數據,不同的外部存儲有不同的實現,如從Kafka讀取數據依賴于Kafka Producer等。
3)向下游發送數據。
4)發送Watermark:生成Watermark并向下游發送,Watermark的生成參見“時間與窗口”章節。
5)空閑標記:如果讀取不到數據,則將該Task標記為空閑,向下游發送Status#Idle,阻止Watermark向下游傳遞。
SourceFunction接口定義如代碼清單3-11所示,SourceFunction中內嵌了SourceContext接口。
代碼清單3-11 SourceFunction接口

上邊SourceFunction定義的數據發送、Watermark發送、空閑標記實際上都定義在SourceContext中。
StreamSourceContexts中提供了生成不同類型SourceContext的實例的方法,從總體上按照帶不帶時間分為兩類SourceContext如圖3-33所示。
1. NonTimestampContext
NonTimestampContext為所有的元素賦予-1作為時間戳,也就意味著永遠不會向下游發送Watermark。
使用Processing Time時使用此Context,使用Processing Time的時候向下游發送Watermark沒有意義,在實際處理中,各個計算節點會根據本地時間定義觸發器,觸發執行Window類計算,而不是根據Watermark來觸發。

圖3-33 SourceContext類體系
2. WatermarkContext
WatermarkContext定義了與Watermark相關的行為:
1)負責管理當前的StreamStatus,確保StreamStatus向下游傳遞。
2)負責空閑檢測的邏輯,當超過設定的事件間隔而沒有收到數據或者Watermark時,認為Task處于空閑狀態。
WatermarkContext有兩個實現類。
(1)AutomaticWatermarkContext
使用攝取時間(Ingestion Time)的時候,AutomaticWatermarkContext自動生成Watermark。在該Context中,啟動WatermarkEmittingTask向下游發送Watermark,使用了一個定時器,其觸發時間=(作業啟動的時刻+Watermark周期×n),一旦啟動之后,WatermarkEmittingTask會持續地自動注冊定時器,向下游發送Watermark。
(2)ManualWatermarkContext
使用事件時間(Event Time)的時候,ManualWatermarkContext不會產生Watermark,而是向下游發送透傳上游的Watermark。
3.5.6 輸出函數
輸出函數在Flink中叫作SinkFunction,負責將計算結果寫入外部存儲中,是作業終點,一個作業可以有多個Sink,即將數據寫入不同的外部存儲中。
SinkFunction類體系如圖3-34所示。
SinkFunction只是單純地定義了數據寫出到外部存儲的行為,并沒有Function的生命周期管理行為,函數的生命周期定義在AbstractRichFunction中。在Connector中實際實現Sink的時候,基本都是從RickSinkFunction和TwoPhaseCommitSinkFunction繼承。
TowPhaseCommitSinkFunction是Flink中實現端到端Exactly-Once的關鍵函數,提供框架級別的端到端Exactly-Once的支持,其在實現過程中與Flink檢查點機制結合,在第13章有詳細介紹。

圖3-34 SinkFunction類體系
3.5.7 檢查點函數
檢查點函數就是在Flink中支持函數級別狀態的保存和恢復的函數。為了實現函數級別的State管理,Flink中設計了CheckpointedFunction和ListCheckpointed接口。在檢查點函數接口中主要設計了狀態快照的備份和恢復兩種行為。
CheckpointedFunction雖然已經標記為廢棄,但仍然是現在用得最多的接口。當保存狀態之后,其snapshotStat()會被調用,用于備份保存狀態到外部存儲。當恢復狀態的時候,其initializeState()方法負責初始化State,執行從上一個檢查點恢復狀態的邏輯。
CheckpointedFunction接口定義如代碼清單3-12所示。
代碼清單3-12 CheckpointedFunction接口

ListCheckpointed接口的行為跟Checkpointed行為類似,除了提供狀態管理能力之外,修改作業并行度的時候,還提供了狀態重分布的支持。ListCheckpointed接口定義如代碼清單3-13所示。
代碼清單3-13 ListCheckpointed接口

- 面向STEM的mBlock智能機器人創新課程
- Visualforce Development Cookbook(Second Edition)
- Ansible Quick Start Guide
- VMware Performance and Capacity Management(Second Edition)
- JMAG電機電磁仿真分析與實例解析
- 空間傳感器網絡復雜區域智能監測技術
- 精通LabVIEW程序設計
- Artificial Intelligence By Example
- 從零開始學JavaScript
- Java求職寶典
- 傳感技術基礎與技能實訓
- 軟件需求十步走
- 數據庫技術:Access 2003計算機網絡技術
- 深度剖析:硬盤固件級數據恢復
- 智能移動機器人的設計、制作與應用