官术网_书友最值得收藏!

3.3 數據轉換

數據轉換在Flink中叫作Transformation,是銜接DataStream API和Flink內核的邏輯結構。DataStream面向開發者,Transformation面向Flink內核,調用DataStream API的數據處理流水線,最終會轉換為Transformation流水線,Flink從Transformation流水線開始執行,后邊章節會詳細介紹。

從DataStream流水線到Transformation流水線的轉換示意如圖3-7所示。

Transformation有兩大類:物理Transformation和虛擬Transformation。在運行時刻,DataStream的API調用都會被轉換為Transformation,然后從Transformation轉換為實際運行的算子,而虛擬的Transformation則不會轉換為具體的算子,如圖3-8所示。

Reblance、Union、Split、Select最終并沒有形成實體的算子,那么它們去哪兒了?在后邊的作業提交章節中會闡述。

Flink內置的Transformation類體系如圖3-9所示。

從類圖中可以看到,Transformation類是頂層的抽象,所有的物理Transformation繼承了PhysicalTransformation,其他類型的Transformation均為虛擬Transformation。

Transformation包含了Flink運行時的一些關鍵參數:

1)name:轉換器的名稱,主要用于可視化。

2)uid: 用戶指定的uid,該uid的主要目的是在job重啟時再次分配跟之前相同的uid,可以持久保存狀態。

圖3-7 DataStream流水線到Transformation流水線

圖3-8 虛擬Transformation被優化后的算子樹

3)bufferTimeout:buffer超時時間。

4)parallelism:并行度。

5)id:跟屬性uid無關,生成方式是基于一個靜態累加器。

6)outputType: 輸出類型,用來進行序列化數據。

7)slotSharingGroup: 給當前的Transformation設置Slot共享組。Slot共享參見第9章。

1.物理Transformation

物理Transformation一共有4種,具體如下。

(1)SourceTransformation

從數據源讀取數據的Transformation,是Flink作業的起點。SourceTransformation只有下游Transformation,沒有上游輸入Transformation。

圖3-9 Transformation類體系

一個作業可以有多個SourceTransformation,從多個數據源讀取數據,如多流Join、維表Join、BroadcastState等場景。

(2)SinkTransformation

將數據寫到外部存儲的Transformation,是Flink作業的終點。SinkTransformation只有上游Transformation,下游就是外部存儲了。

一個作業內可以有多個SinkTransformation,將數據寫入不同的外部存儲匯總,如計算結果的熱數據寫入Redis實時查詢,歷史數據寫入HDFS。

(3)OneInputTransformation

單流輸入的Transformation(只接收一個輸入流),跟上面的SinkTransformation構造器類似,同樣需要input和operator參數。

(4)TwoInputTransformation

雙流輸入的Transformation(接收兩種流作為輸入),分別叫作第1輸入和第2輸入。其他的實現同OneInputTransformation,如圖3-10所示。

從上圖示例中可以看到,TwoInputTransformation的兩個上游輸出的類型不同。此類型的兩個上游輸入的數據類型可以相同也可以不同。

2.虛擬Transformation

(1)SideOutputTransformation

SideOutputTransformation在旁路輸出中轉換而來,表示上游Transformation的一個分流,上游Transformation可以有多個下游SideOutputTransformation,如圖3-11所示。

上游的OneInputTransformation分流給下游的多個SideOutputTransformation,每一個SideOutput通過OutputTag進行標識。

圖3-10 TowInputTransformation示例

圖3-11 SideOutput示例

(2)SplitTransformation

用來按條件切分數據流,該轉換用于將一個流拆分成多個流(通過OutputSelector來達到這個目的),當然這個操作只是邏輯上的拆分(它只影響上游的流如何跟下游的流連接)。

構造該轉換器,同樣也依賴于其輸入轉換器(input)以及一個輸出選擇器(outputSelector),但會實例化其父類(StreamTransformation,沒有提供自定義的名稱,而是固定的常量值Split)。

(3)SelectTransformation

與SplitTransformation配合使用,用來在下游選擇SplitTransformation切分的數據流。

(4)PartitionTransformation

該轉換器用于改變輸入元素的分區,其名稱為Partition。因此,工作時除了提供一個StreamTransformation作為輸入外,還需要提供一個StreamPartitioner的實例來進行分區。

PartitionTransformation需要特別提及一下,它在Flink DataStream Runtime中和Blink的流處理、批處理的都被使用了。其有一個ShuffleMode,用來統一表示流、批數據Shuffle的模式。對于流而言,ShuffleMode是ShuffleMode.PIPELINED;對于批而言,ShuffleMode是ShuffleMode.BATCH。

(5)UnionTransformation

合并轉換器,該轉換器用于將多個輸入StreamTransformation進行合并,因此該轉換器接收StreamTransformation的集合,其名稱也在內部被固定為Union,如圖3-12所示。

圖3-12 UnionTransformation示例

Union運算要求其直接上游輸入的數據的結構必須是完全相同的。

(6)FeedbackTransformation

表示Flink DAG中的一個反饋點。簡單來說,反饋點就是把符合條件的數據重新發回上游Transformation處理,一個反饋點可以連接一個或者多個上游的Transformation,這些連接關系叫作反饋邊。處于反饋點下游的Transformation將可以從反饋點和反饋邊獲得元素輸入。符合反饋條件并交給上游的Transformation的數據流叫作反饋流(Feedback DataStream),如圖3-13所示。

圖3-13 FeedbackTransformation示意圖

FeedbackTransformation的固定名稱為Feedback,有兩個重要參數:

1)input: 上游輸入StreamTransformation。

2)waitTime: 默認為0,即永遠等待,如果設置了等待時間,一旦超過該等待時間,則計算結束并且不再接收數據。

實例化FeedbackTransformation時,會自動創建一個用于存儲反饋邊的集合feedbackEdges。那么反饋邊如何收集呢?FeedbackTransformation通過定義一個實例方法——addFeedbackEdge來進行收集。而這里所謂的“收集”就是將下游StreamTransformation的實例加入feedbackEdges集合中(這里可以理解為將兩個點建立連接關系,也就形成了邊)。不過,這里加入的StreamTransformation的實例有一個要求:當前FeedbackTransformation的實例跟待加入StreamTransformation實例的并行度應一致。

(7)CoFeedbackTransformation

CoFeedbackTransformation與FeedbackTransformation類似,也是Flink DAG中的一個反饋點。兩者的不同之處在于,CoFeedBackTransformation反饋給上游的數據流與上游Transformation的輸入類型不同,所以要求上游的Transformation必須是TwoInputTransformation。CoFeedbackTransformation是從ConnectedIterativeStreams創建而來的,而ConnectedIterativeStreams由IterativeStream#withFeedbackType設置新的反饋流的數據類型而來,如圖3-14所示。

圖3-14 CoFeedbackTransformation示意圖

在圖3-14中,可以看到TwoInputTransformation有兩個輸入,第1輸入的類型為Tuple<String,Long>,反饋流的輸入類型為Tuple<String,Long,Integer>。

Transformation作為中介,負責將執行時刻初始化作業所需要的StreamTask類和算子工程(StreamOperatorFactory)構建好,算子作為UDF的執行時容器,這樣就能將作業開發和作業運行聯系起來了。Transformation、算子、UDF的關系如圖3-15所示。

在本例中,OneInputTransformation包裝了算子StreamMap,算子StreamMap又包裝了UDF。在邏輯上,下游Transformation連接上游Transformation,邏輯上數據是從上游Trans formation流向下游Transformation,實際上是從算子流向算子,在算子內部交給UDF處理。

圖3-15 Transformation、算子、UDF的關系

圖3-15是OneInputTransformation的示例,TwoInputTransformation與此相同,接下來介紹算子、UDF。

主站蜘蛛池模板: 东宁县| 宁阳县| 兴和县| 九龙坡区| 江城| 玛纳斯县| 南华县| 宣恩县| 新建县| 微山县| 绥滨县| 英超| 漯河市| 栖霞市| 胶南市| 兴化市| 平遥县| 德格县| 延吉市| 会东县| 梅河口市| 商洛市| 遂川县| 集贤县| 远安县| 彩票| 新蔡县| 钦州市| 尖扎县| 将乐县| 富民县| 德惠市| 胶南市| 黔西县| 波密县| 靖边县| 麟游县| 乌鲁木齐县| 白玉县| 信宜市| 平利县|