官术网_书友最值得收藏!

4.5 時間服務

時間是Flink中極其重要的概念,在Flink的開發層面上,會在KeyedProcessFunction中和Window中使用到時間概念。一般情況下,在KeyedProcessFunction#processElement()方法中會用到Timer,注冊Timer然后重寫其onTimer()方法,在Watermark超過Timer的時間點之后,觸發回調onTimer()。根據時間類型的不同,可以注冊事件時間和處理時間兩種Timer。

說到此處,不得不提到Timer注冊過程中使用到的定時器服務(TimerService)。TimeService是在算子中提供定時器的管理行為,包含定時器的注冊和刪除。TimerService在DataStream、State、Blink中都有應用。在DataStream和State模塊中,一般會在Keyed算子中使用,在引入Blink之后,在Blink的一般算子中也會使用,如BaseTemporalSortOperator算子。

那么在執行層面上,時間服務TimerService具體是怎么發揮其作用的呢?

簡單來講,在算子中使用時間服務來創建定時器(Timer),并且在Timer觸發的時候進行回調,從而進行業務邏輯處理。前邊章節中延遲Join的示例中使用過Timer。

4.5.1 定時器服務

定時器服務在Flink中叫作TimerService,窗口算子(WindowOperator)中使用了InternalTimerService來管理定時器(Timer),其初始化是在WindowOperator#open()中實現的。

對于InternalTimerService而言,有幾個元素比較重要:名稱、命名空間類型N(及其序列化器)、鍵類型K(及其序列化器)和Triggerable對象(支持延時計算的算子,繼承了Triggerable接口來實現回調)。

如代碼清單4-5所示。

代碼清單4-5 注冊Timer入口

一個算子中可以有多個InternalTimeService,通過名稱進行區分,如在WindowOperator中,InternalTimeService的名稱是“window-timers”,在KeyedProcessOperator中名稱是“user-timers”,在CepOperator中名稱是“watermark-callbacks”。

InternalTimerService接口的實現類是InternalTimerServiceImpl,Timer的實現類是InternalTimer。InternalTimerServiceImpl使用了兩個TimerHeapInternalTimer的優先隊列(HeapPriorityQueueSet,該優先隊列是Flink自己實現的),分別用于維護事件時間和處理時間的Timer。

InternalTimeServiceManager是Task級別提供的InternalTimeService集中管理器,其使用Map保存了當前所有的InternalTimeService,Map的Key是InternalTimerService的名字。

4.5.2 定時器

定時器在Flink中叫作Timer。窗口的觸發器與定時器是緊密聯系的。

Flink的定時器使用InternalTimer接口定義行為,如代碼清單4-6所示。

代碼清單4-6 InternalTimer接口

前面提到了Timer的注冊和保存,那么Timer到底是如何觸發然后回調用戶邏輯的呢?答案在InternalTimerServiceImpl中。

對于事件時間,會根據Watermark的時間,從事件時間的定時器隊列中找到比給定時間小的所有定時器,觸發該Timer所在的算子,然后由算子去調用UDF中的onTime()方法,如代碼清單4-7所示。

代碼清單4-7 事件時間觸發與回調

處理時間也是類似的邏輯,區別在于,處理時間是從處理時間Timer優先級隊列中找到Timer。處理時間因為依賴于當前系統,所以其使用的是周期性調度。

4.5.3 優先級隊列

直接使用Java的PriorityQueue看起來也能實現InternalTimer的需求,但是Flink在優先級隊列中使用了KeyGroup,是按照KeGroup去重的,并不是按照全局的Key去重,如圖4-13所示。

圖4-13 Flink實現的優先級隊列體系

Flink自己實現了優先級隊列來管理Timer,共有2種實現。

1)基于堆內存的優先級隊列HeapPriorityQueueSet:基于Java堆內存的優先級隊列,其實現思路與Java的PriorityQueue類似,使用了二叉樹。

2)基于RocksDB的優先級隊列:分為Cache+RocksDB量級,Cache中保存了前N個元素,其余的保存在RocksDB中。寫入的時候采用Write-through策略,即寫入Cache的同時要更新RocksDB中的數據,可能需要訪問磁盤。

基于堆內存的優先級隊列比基于RocksDB的優先級隊列性能好,但是受限于內存大小,無法容納太多的數據;基于RocksDB的優先級隊列犧牲了部分性能,可以容納大量的數據。

主站蜘蛛池模板: 苍山县| 乌鲁木齐市| 亚东县| 东莞市| 务川| 政和县| 广昌县| 元谋县| 彰化县| 汉沽区| 达尔| 靖西县| 封丘县| 嘉兴市| 淮南市| 东宁县| 石景山区| 绥芬河市| 荆州市| 团风县| 中牟县| 田林县| 盈江县| 古蔺县| 昌宁县| 准格尔旗| 湖北省| 时尚| 同心县| 稻城县| 交城县| 孟津县| 文安县| 美姑县| 赤城县| 南京市| 万全县| 德格县| 阿拉善左旗| 什邡市| 曲靖市|