- Flink內核原理與實現
- 馮飛 崔鵬云 陳冠華編著
- 1967字
- 2021-04-14 11:30:39
4.4 水印
水印(Watermark)用于處理亂序事件,而正確地處理亂序事件,通常用Watermark機制結合窗口來實現。
從流處理原始設備產生事件,到Flink讀取到數據,再到Flink多個算子處理數據,在這個過程中,會受到網絡延遲、數據亂序、背壓、Failover等多種情況的影響,導致數據是亂序的。雖然大部分情況下沒有問題,但是不得不在設計上考慮此類異常情況,為了保證計算結果的正確性,需要等待數據,這帶來了計算的延遲。對于延遲太久的數據,不能無限期地等下去,所以必須有一個機制,來保證特定的時間后一定會觸發窗口進行計算,這個觸發機制就是Watermark。
在DataStream和Flink Table & SQL模塊中,使用了各自的Watermark生成體系。
4.4.1 DataStream Watermark生成
通常Watermark在Source Function中生成,如果是并行計算的任務,在多個并行執行的Source Function中,相互獨立產生各自的Watermark。而Flink提供了額外的機制,允許在調用DataStream API操作(如map、filter等)之后,根據業務邏輯的需要,使用時間戳和Watermark生成器修改數據記錄的時間戳和Watermark。
1. Source Function中生成Watermark
Source Function可以直接為數據元素分配時間戳,同時也會向下游發送Watermark。在Source Function中為數據分配了時間戳和Watermark就不必在DataStream API中使用了。需要注意的是:如果一個timestamp分配器被使用的話,由源提供的任何Timestamp和Watermark都會被重寫。
為了通過SourceFunction直接為一個元素分配一個時間戳,SourceFunction需要調用SourceContext中的collectWithTimestamp(...)方法。為了生成Watermark,源需要調用emitWatermark(Watermark)方法,如代碼清單4-3所示。
代碼清單4-3 SourceFunction中為數據元素分配時間戳和生成Watermark示例

2. DataStream API中生成Watermark
DataStream API中使用的TimestampAssigner接口定義了時間戳的提取行為,其有兩個不同接口AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks,分別代表了不同的Watermark生成策略。TimestampAssigner接口體系如圖4-9所示。

圖4-9 DataStream中TimestampAssigner接口體系
AssignerWithPeriodicWatermarks是周期性生成Watermark策略的頂層抽象接口,該接口的實現類周期性地生成Watermark,而不會針對每一個事件都生成。
AssignerWithPunctuatedWatermarks對每一個事件都會嘗試進行Watermark的生成,但是如果生成的Watermark是null或者Watermark小于之前的Watermark,則該Watermark不會發往下游,因為發往下游也不會有任何效果,不會觸發任何窗口的執行。
4.4.2 Flink SQL Watermark生成
Flink SQL沒有DataStram API開發那么靈活,其Watermark的生成主要是在TableSource中完成的,其定義了3類Watermark生成策略。其Watermark生成策略體系如圖4-10所示。

圖4-10 Flink SQL的Watermark生成策略體系
Watermark的生成機制分為如下3類。
(1)周期性Watermark策略
周期性Watermark策略在Flink中叫作PeriodicWatermarkAssigner,周期性(一定時間間隔或者達到一定的記錄條數)地產生一個Watermark。在實際的生產中使用周期性Watermark策略的時候,必須注意時間和數據量,結合時間和積累條數兩個維度繼續周期性產生Watermark,否則在極端情況下會有很大的延時。
1) AscendingTimestamps:遞增Watermark,作用在Flink SQL中的Rowtime屬性上,Watermark=當前收到的數據元素的最大時間戳-1,此處減1的目的是確保有最大時間戳的事件不會被當做遲到數據丟棄。
2)BoundedOutOfOrderTimestamps:固定延遲Watermark,作用在Flink SQL的Rowtime屬性上,Watermark=當前收到的數據元素的最大時間戳-固定延遲。
(2)每事件Watermark策略
每事件Watermark策略在Flink中叫作PuntuatedWatamarkAssigner,數據流中每一個遞增的EventTime都會產生一個Watermark。在實際的生產中Punctuated方式在TPS很高的場景下會產生大量的Watermark,在一定程度上會對下游算子造成壓力,所以只有在實時性要求非常高的場景下才會選擇Punctuated的方式進行Watermark的生成。
(3)無為策略
無為策略在Flink中叫作PreserveWatermark。在Flink中可以使用DataStream API和Table & SQL混合編程,所以Flink SQL中不設定Watermark策略,使用底層DataStream中的Watermark策略也是可以的,這時Flink SQL的Table Source中不做處理。
4.4.3 多流的Watermark
在實際的流計算中一個作業中往往會處理多個Source的數據,對Source的數據進行GroupBy分組,那么來自不同Source的相同key值會shuffle到同一個處理節點,并攜帶各自的Watermark,Apache Flink內部要保證Watermark保持單調遞增,多個Source的Watermark匯聚到一起時可能不是單調自增的,對于這樣的情況,ApacheFlink的內部處理如圖4-11所示。

圖4-11 Watermark處理邏輯
Apache Flink內部實現每一個邊上只能有一個遞增的Watermark,當出現多流攜帶EventTime匯聚到一起(GroupBy或Union)時,Apache Flink會選擇所有流入的EventTime中最小的一個向下游流出,從而保證Watermark的單調遞增和數據的完整性。
Watermark是在Source Function中生成或者在后續的DataStream API中生成的。Flink作業一般是并行執行的,作業包含多個Task,每個Task運行一個或一組算子(OperatorChain)實例,Task在生成Watermark的時候是相互獨立的,也就是說在作業中存在多個并行的Watermark。
Watermark在作業的DAG從上游向下游傳遞,算子收到上游Watermark后會更新其Watermark。如果新的Watermark大于算子的當前Watermark,則更新算子的Watermark為新Watermark,并發送給下游算子。
某些算子會有多個上游輸入,如Union或keyBy、partition之后的算子。在Flink的底層執行模型上,多流輸入會被分解為多個雙流輸入,所以對于多流Watermark的處理也就是雙流Watermark的處理,無論是哪一個流的Watermark進入算子,都需要跟另一個流的當前算子進行比較,選擇較小的Watermark,即Min(input1Watermark,intput2Watermark),與算子當前的Watermark比較,如果大于算子當前的Watermark,則更新算子的Watermark為新的Watermark,并發送給下游,如代碼清單4-4所示。
代碼清單4-4 雙流輸入的StreamOperator Watermark處理


如圖4-12所示,多流Watermark中使用了事件時間。

圖4-12 多流Watermark示例
在圖4-12中,Source算子產生各自的Watermark,并隨著數據流流向下游的map算子,map算子是無狀態計算,所以會將Watermark向下透傳。window算子收到上游兩個輸入的Watermark后,選擇其中較小的一個發送給下游,window(1)算子比較Watermark 29和Watermark 14,選擇Watermark 14作為算子當前Watermark,并將Watermark 14發往下游,window(2)算子也采用相同的邏輯。
- Mastering Proxmox(Third Edition)
- Project 2007項目管理實用詳解
- Python Artificial Intelligence Projects for Beginners
- Learning Apache Spark 2
- 大數據改變世界
- Effective DevOps with AWS
- Photoshop CS3特效處理融會貫通
- 變頻器、軟啟動器及PLC實用技術260問
- 大數據時代
- 菜鳥起飛系統安裝與重裝
- Deep Reinforcement Learning Hands-On
- 單片機C語言應用100例
- PVCBOT機器人控制技術入門
- Java Deep Learning Projects
- 百度智能小程序:AI賦能新機遇