官术网_书友最值得收藏!

  • Flink內核原理與實現
  • 馮飛 崔鵬云 陳冠華編著
  • 4051字
  • 2021-04-14 11:30:35

3.5 函數體系

函數在Flink中叫作Function,開發者編寫的函數叫作UDF(User Defined Function),當然Flink對于通用場景也內置了大量的預定義的通用UDF來簡化開發,如Join、GroupBy、Sum等SQL語義等價的UDF。UDF在Flink的DataStream開發和SQL開發中被廣泛使用。開發者使用UDF主要是實現非通用的計算邏輯,一般是業務邏輯。在本書語境中,UDF、Function、用戶自定義函數的含義是相同的。

按照輸入和輸入的不同特點分類,Flink中的UDF大概分為3類(見圖3-23)。

1. SourceFunction

無上游Function,SourceFunction直接從外部數據存儲讀取數據,所以SourceFunction所在的算子是起始,沒有上游算子。

圖3-23 Function分類與關系

2. SinkFunction

無下游Function,SinkFunction直接將數據寫入外部存儲,所以Sink函數所在的算子是作業的重點,沒有下游算子。

3. 一般Function

一般的UDF函數用在作業的中間處理步驟中,其接口定義與SourceFunction和SinkFunction不同。一般UDF所在的算子有上游算子,也有下游算子。

Flink的一般UDF有單流輸入和雙流輸入兩種,從UDF輸入、輸出的模型來說,多流輸入可以通過多個雙流輸入串聯而成,這種設計比較簡單實用,如圖3-24所示。

圖3-24 多流輸入轉換為多層雙流輸入

SourceFunction和SinkFunction主要在Flink中的連接器使用,也會在自定義讀取、寫出數據的時候使用。其余的大量實現邏輯的函數都屬于一般UDF。

3.5.1 函數層次

UDF在DataStream API層使用,Flink提供的函數體系從接口的層級來看,從高階Function到低階Function如圖3-25所示。

圖3-25 Function層次

Flink內置的DataStream上的API接口,如DataStream#map、DataStream#flatMap、DataStreamFilter#filter等,使用的都是高階函數,開發者使用高階函數的時候,無須關心定時器之類的底層概念,只需要關注業務邏輯即可。低階函數即ProcessFunction。

無狀態Function用來做無狀態計算,使用比較簡單,如MapFunction。無狀態Function和RichFunction是一一對應的,如MapFunction對應RichMapFunction,如代碼清單3-5所示。

代碼清單3-5 MapFunction代碼示例

從上邊的代碼可以看出來,使用MapFunction只需實現Map方法即可,所以無狀態Function一般都是直接繼承接口,如Map接口,或者通過匿名類實現接口。

RichFunction相比無狀態Function,有兩方面的增強:

1)增加了open和close方法來管理Function的生命周期,在作業啟動時,Function在open方法中執行初始化,在Function停止時,在close方法中執行清理,釋放占用的資源等。無狀態Function不具備此能力。

2)增加了getRuntimeContext和setRuntimeContext。通過RuntimeContext,RichFunction能夠獲取到執行時作業級別的參數信息,而無狀態Function不具備此能力。

無狀態Function天然是容錯的,作業失敗之后,重新執行即可,但是有狀態的Function(RichFunction)需要處理中間結果的保存和恢復,待有了狀態的訪問能力,也就意味著Function是可以容錯的,執行過程中,狀態會進行快照然后備份,在作業失敗,Function能夠從快照中恢復回來。

3.5.2 處理函數

處理函數(ProcessFunction)可以訪問流應用程序所有(非循環)基本構建塊:

1)事件(數據流元素)。

2)狀態(容錯和一致性)。

3)定時器(事件時間和處理時間)。

ProcessFunction根據場景不同有幾種實現,如圖3-26所示。

圖3-26 ProcessFunction類體系

1)ProcessFunction:單流輸入函數。

2)CoProcessFunction:雙流輸入函數。

3)KeyedProcessFunction:單流輸入函數。

4)KeyedCoProcessFunction:雙流輸入函數。

Kyed ProcessFunction與Non-Keyed ProcessFunction的區別是,Keyed ProcessFunction只能用在KeyedStream上。

ProcessFunction和CoProcessFunction的區別是,CoProcessFunction是雙流輸入,而ProcessFunction是單流輸入。

1.雙流Join

下面是使用CoProcessFunction實現雙流Join的例子。

(1)即時雙流Join

其邏輯如下(見圖3-27)。

1)創建1個State對象。

2)接收到輸入流1事件后更新State。

3)接收到輸入流2的事件后遍歷State,根據Join條件進行匹配,將匹配后的結果發送到下游。

圖3-27 雙流即時Join

(2)延遲雙流Join

在流式數據里,數據可能是亂序的,數據會延遲到達,并且為了提供處理效率,使用小批量計算模式,而不是每個事件觸發一次Join計算,如圖3-28所示。

圖3-28 雙流延遲Join

其邏輯如下。

1)創建2個State對象,分別緩存輸入流1和輸入流2的事件。

2)創建1個定時器,等待數據的到達,定時延遲觸發Join計算。

3)接收到輸入流1事件后更新State。

4)接收到輸入流2事件后更新State。

5)定時器遍歷State1和State2,根據Join條件進行匹配,將匹配后的結果發送到下游。

2.延遲計算

在上面的延遲Join示例中,使用了計時器來暫存一批數據之后再觸發計算,在流計算中這是非常常見的場景。在前面提到的批流合一的關鍵概念中,關鍵是Watermark和Window,在Flink中的窗口計算(WindowOperator)就是典型的延遲計算,使用Window暫存數據,使用Watermark觸發Window的計算,如圖3-29所示。在Blink Table & SQL中也大量使用了定時器。

圖3-29 延遲計算過程

觸發器在算子層面上提供支持,所有支持延遲計算的算子都繼承了Triggerable接口。Triggerable接口主要定義了基于事件時間和基于處理時間的兩種觸發行為,如代碼清單3-6所示。

代碼清單3-6 Triggerable接口

3.5.3 廣播函數

前邊介紹了單輸入Function和雙輸入Function。在Flink 1.5.0版本中引入了廣播狀態模式,將一個數據流的內容廣播到另一個流中,同時也引入了新的函數類型:廣播函數。

廣播函數的體系如圖3-30所示。

在圖3-30中可以看到,廣播函數有BroadcastProcessFunction和KeyedBroadcastProcess Function,廣播函數跟雙流輸入的處理函數類似,也有兩個數據元素處理的接口,processElement()負責處理一般的數據流,processBroadcastElement()負責處理廣播數據流。完整定義如代碼清單3-7和代碼清單3-8所示。

圖3-30 廣播函數體系

代碼清單3-7 BroadcastProcessFunction抽象類

代碼清單3-8 BroadcastProcessFunction類

上面的兩個廣播函數BroadcastProcessFunction和KeyedBroadcastProcessFunction都是抽象類,所以在實際使用中,開發者需要實現其定義的抽象方法。

processElement()方法和processBroadcastElement()方法的區別在于:processElement只能使用只讀的上下文ReadOnlyContext,而processBroadcastElement()方法則可以使用支持讀寫的上下文Context。這么設計看起來很奇怪,但是合理的。廣播狀態模式下,要求所有算子上的廣播狀態應完全一致,如果也允許processElement方法更新、刪除廣播狀態中的數據,那么會使得算子之間的廣播狀態變得不一致,導致系統行為不可預測。在后邊會介紹數據分區,數據分區會將數據流進行分流,交給下游的不同算子,那么不同算子接收的數據流就是不同的,如果開發者在processElement方法中更新了廣播狀態,必然會導致廣播狀態變得不一致。也許會有人說,在算子更新廣播狀態的時候,通知其他算子不就可以了嗎?但是Flink中的平行算子之間沒有通信接口,所以此處的設計強制要求processElement()不能更新廣播狀態。

注意,只有設計的強制要求還不夠,processBroadcastElement()必須確保行為的不可變性,即無論什么時間、在哪個物理機器、廣播數據是否亂序,都必須保證執行結果完全相同。比較典型的破壞不可變性的例子包括處理邏輯依賴于當前時間,不同的節點當前時間并不完全一致,而且還要考慮到作業恢復執行的情況,因此跟恢復之前的當前時間更是不可能相同。

3.5.4 異步函數

在介紹異步算子的時候提到了異步函數(AsyncFunction),異步函數就是對Java異步編程框架的封裝。

異步函數的類體系如圖3-31所示。

圖3-31 異步函數類體系

如圖3-31所示,異步函數的抽象類RichAsyncFunction實現AsyncFunction接口,繼承AbstractRichFunction獲得了生命周期管理和FunctionContext的訪問能力。

異步函數的接口中定義了兩種行為,異步調用行為將調用結果封裝到ResultFutrue中,同時提供了調用超時的處理,防止不釋放資源,如代碼清單3-9所示。

代碼清單3-9 AsyncFunction接口

3.5.5 數據源函數

數據源函數在Flink中叫作SourceFunction,Flink是一個計算引擎,其需要從外部讀取數據,所以在Flink中設計了SourceFunction體系,專門用來從外部存儲讀取數據。SourceFunction是Flink作業的起點,一個作業中可以有多個起點,即讀取多個數據源的數據。

SourceFunction體系如圖3-32所示。

圖3-32 SourceFunction體系

SourceFunction接口本身只定義了接口的業務邏輯相關行為,在實際使用中,一般會繼承抽象類RichSourceFunction或RichParallelSourceFunction。這兩個抽象類通過繼承AbstractRichFunction獲得了Function的生命周期管理、訪問RuntimeContext的能力。

從類的定義上來說,RichSourceFunction和RichParallelSourceFunction的代碼完全相同,甚至代碼中的注釋都基本相同,但是為什么要設計這兩個類呢?

其實這兩個類的差異在運行層面上,RichSourceFunction是不可并行的,并行度限定為1,超過1則會報錯。而RichParallelSourceFunction是可并行的,并行度可以根據需要設定,并沒有限制。差異體現在StreamExecutionEnvironment#addSource方法中,其對Function的類型進行了判斷,如果是ParallelSourceFunction類型,則是可并行的。如代碼清單3-10所示。

代碼清單3-10 構造DataStreamSource

SourceFunction有幾個比較關鍵的行為。

1)生命周期管理:在實際中,一般SourceFunction的實現類會同時繼承AbstractRichFunction,所以其生命周期包含open、close、cancle三種方法,在生命周期方法中可以包含相應的初始化、清理等。

2)讀取數據: 持續地從外部存儲讀取數據,不同的外部存儲有不同的實現,如從Kafka讀取數據依賴于Kafka Producer等。

3)向下游發送數據。

4)發送Watermark:生成Watermark并向下游發送,Watermark的生成參見“時間與窗口”章節。

5)空閑標記:如果讀取不到數據,則將該Task標記為空閑,向下游發送Status#Idle,阻止Watermark向下游傳遞。

SourceFunction接口定義如代碼清單3-11所示,SourceFunction中內嵌了SourceContext接口。

代碼清單3-11 SourceFunction接口

上邊SourceFunction定義的數據發送、Watermark發送、空閑標記實際上都定義在SourceContext中。

StreamSourceContexts中提供了生成不同類型SourceContext的實例的方法,從總體上按照帶不帶時間分為兩類SourceContext如圖3-33所示。

1. NonTimestampContext

NonTimestampContext為所有的元素賦予-1作為時間戳,也就意味著永遠不會向下游發送Watermark。

使用Processing Time時使用此Context,使用Processing Time的時候向下游發送Watermark沒有意義,在實際處理中,各個計算節點會根據本地時間定義觸發器,觸發執行Window類計算,而不是根據Watermark來觸發。

圖3-33 SourceContext類體系

2. WatermarkContext

WatermarkContext定義了與Watermark相關的行為:

1)負責管理當前的StreamStatus,確保StreamStatus向下游傳遞。

2)負責空閑檢測的邏輯,當超過設定的事件間隔而沒有收到數據或者Watermark時,認為Task處于空閑狀態。

WatermarkContext有兩個實現類。

(1)AutomaticWatermarkContext

使用攝取時間(Ingestion Time)的時候,AutomaticWatermarkContext自動生成Watermark。在該Context中,啟動WatermarkEmittingTask向下游發送Watermark,使用了一個定時器,其觸發時間=(作業啟動的時刻+Watermark周期×n),一旦啟動之后,WatermarkEmittingTask會持續地自動注冊定時器,向下游發送Watermark。

(2)ManualWatermarkContext

使用事件時間(Event Time)的時候,ManualWatermarkContext不會產生Watermark,而是向下游發送透傳上游的Watermark。

3.5.6 輸出函數

輸出函數在Flink中叫作SinkFunction,負責將計算結果寫入外部存儲中,是作業終點,一個作業可以有多個Sink,即將數據寫入不同的外部存儲中。

SinkFunction類體系如圖3-34所示。

SinkFunction只是單純地定義了數據寫出到外部存儲的行為,并沒有Function的生命周期管理行為,函數的生命周期定義在AbstractRichFunction中。在Connector中實際實現Sink的時候,基本都是從RickSinkFunction和TwoPhaseCommitSinkFunction繼承。

TowPhaseCommitSinkFunction是Flink中實現端到端Exactly-Once的關鍵函數,提供框架級別的端到端Exactly-Once的支持,其在實現過程中與Flink檢查點機制結合,在第13章有詳細介紹。

圖3-34 SinkFunction類體系

3.5.7 檢查點函數

檢查點函數就是在Flink中支持函數級別狀態的保存和恢復的函數。為了實現函數級別的State管理,Flink中設計了CheckpointedFunction和ListCheckpointed接口。在檢查點函數接口中主要設計了狀態快照的備份和恢復兩種行為。

CheckpointedFunction雖然已經標記為廢棄,但仍然是現在用得最多的接口。當保存狀態之后,其snapshotStat()會被調用,用于備份保存狀態到外部存儲。當恢復狀態的時候,其initializeState()方法負責初始化State,執行從上一個檢查點恢復狀態的邏輯。

CheckpointedFunction接口定義如代碼清單3-12所示。

代碼清單3-12 CheckpointedFunction接口

ListCheckpointed接口的行為跟Checkpointed行為類似,除了提供狀態管理能力之外,修改作業并行度的時候,還提供了狀態重分布的支持。ListCheckpointed接口定義如代碼清單3-13所示。

代碼清單3-13 ListCheckpointed接口

主站蜘蛛池模板: 建德市| 防城港市| 赤水市| 苍南县| 浦北县| 阿拉善右旗| 化德县| 仁怀市| 昌黎县| 鄂伦春自治旗| 偏关县| 若尔盖县| 海宁市| 鹤峰县| 延川县| 长岭县| 夏津县| 扶绥县| 丹寨县| 扶余县| 临汾市| 特克斯县| 二连浩特市| 海阳市| 和硕县| 渭南市| 荃湾区| 泸州市| 长沙市| 宝丰县| 泰和县| 饶平县| 田东县| 利川市| 科尔| 广东省| 怀宁县| 广平县| 太保市| 文水县| 丘北县|