- Flink內(nèi)核原理與實現(xiàn)
- 馮飛 崔鵬云 陳冠華編著
- 3860字
- 2021-04-14 11:30:31
2.4 數(shù)據(jù)流API
DataStream API是Flink流計算應用中最常用的API,相比Table & SQL API更加底層、更加靈活。
2.4.1 數(shù)據(jù)讀取
數(shù)據(jù)讀取的API定義在StreamExecutionEnvironment,這是Flink流計算應用的起點,第一個DataStream就是從數(shù)據(jù)讀取API中構造出來的。在Flink中,除了內(nèi)置的數(shù)據(jù)讀取API外,還針對不同類型的外部存儲系統(tǒng)提供了對應的Connector連接器,使用連接器也能夠實現(xiàn)數(shù)據(jù)讀取的目的。
1.從內(nèi)存讀取數(shù)據(jù)
Flink提供了一系列的方法,直接在內(nèi)存中生成數(shù)據(jù),方便測試和演示。API如圖2-4所示。

圖2-4 內(nèi)存數(shù)據(jù)讀取API
2.文件讀取數(shù)據(jù)
內(nèi)置的從文件中讀取數(shù)據(jù)的API如圖2-5所示。

圖2-5 讀取文件API
從文件中讀取分為讀取文本文件和一般文件兩類,文本文件無須多說,一般文件指的是帶有結構的文件,如Avro、Parquet等。
文件讀取的模式有一次性讀取FileProcessingMode.PROCESS_ONCE和持續(xù)讀取FileProcessingMode.PROCESS_CONTINUOUSLY。如果不指定則默認為一次性讀取。使用持續(xù)讀取模式時,可以設定讀取間隔,單位為ms。間隔越小實時性越高,資源消耗相應變多,反之則實時性越低,資源消耗降低。
3. Socket接入數(shù)據(jù)
Socket接入數(shù)據(jù)即從網(wǎng)絡端口接收數(shù)據(jù)。內(nèi)置的從Socket接入數(shù)據(jù)的API如圖2-6所示。

圖2-6 Socket接入數(shù)據(jù)API
socketTextStream()的參數(shù)比較簡單,需要提供hostname(主機名)、port(端口號)、delimiter(分隔符)和maxRetry(最大重試次數(shù))。
4.自定義讀取
自定義數(shù)據(jù)讀取就是使用Flink連接器、自定義數(shù)據(jù)讀取函數(shù),與外部存儲交互,讀取數(shù)據(jù),如從Kafka、JDBC、HDFS等讀取。自定義數(shù)據(jù)讀取的API如圖2-7所示。

圖2-7 自定義數(shù)據(jù)讀取API
addSource()方法本質上來說依賴于Flink的SourceFunction體系,與外部的存儲進行交互。createInput()方法底層調(diào)用的是addSource()方法,封裝為InputFormatSourceFunction,所以自定義讀取方式的本質就是實現(xiàn)自定義的SourceFunction。關于SourceFunction,將在第3章進行詳細介紹。
2.4.2 處理數(shù)據(jù)
DataStream API使用Fluent風格處理數(shù)據(jù),在開發(fā)的時候其實是在編寫一個DataStream轉換過程,形成了DataStream處理鏈,在Flink開發(fā)章節(jié)有過闡述。調(diào)用DataStream API生成新的DataStream的轉換關系如圖2-8所示。

圖2-8 DataStream相互轉換關系
從圖中可以看到,并不是所有的DataStream都可以相互轉換。
1. Map
接收1個元素,輸出1個元素。Map應用在DataStream上,輸出結果為DataStream。
DataStream#map運算對應的是MapFunction,其類泛型為MapFunction<T,O>,T代表輸入數(shù)據(jù)類型(Map方法的參數(shù)類型),O代表操作結果輸出類型(Map方法返回的數(shù)據(jù)類型),如代碼清單2-1所示。
代碼清單2-1 Map代碼示例

2. FlatMap
接收1個元素,輸出0、1、…、N個元素。該類運算應用在DataStream上,輸出結果為DataStream。
DataStream#flatMap接口對應的是FlatMapFunction,其類泛型為FlatMapFunction<T,O>,T代表輸入數(shù)據(jù)類型(FlatMap方法的參數(shù)類型),O代表操作結果輸出類型,如代碼清單2-2所示。
代碼清單2-2 FlatMap接口示例

3. Filter
過濾數(shù)據(jù),如果返回true則該元素繼續(xù)向下傳遞,如果為false則將該元素過濾掉。該類運算應用在DataStream上,輸出結果為DataStream。
DataStream#filter接口對應的是FilterFunction,其類泛型為FilterFunction<T>,T代表輸入和輸出元素的數(shù)據(jù)類型,如代碼清單2-3所示。
代碼清單2-3 Filter代碼示例


4. KeyBy
將數(shù)據(jù)流元素進行邏輯上的分組,具有相同Key的記錄將被劃分到同一分組。KeyBy()使用Hash Partitioner實現(xiàn)。該運算應用在DataStream上,輸出結果為KeyedStream。
輸出的數(shù)據(jù)流的類型為KeyedStream<T,KEY>,其中T代表KeyedStream中元素數(shù)據(jù)類型,KEY代表邏輯Key的數(shù)據(jù)類型,如代碼清單2-4所示。
代碼清單2-4 KeyBy代碼示例

以下兩種數(shù)據(jù)不能作為Key。
1)POJO類未重寫hashCode(),使用了默認的Object.hashCode()。
2)數(shù)組類型。
5. Reduce
按照KeyedStream中的邏輯分組,將當前數(shù)據(jù)與最后一次的Reduce結果進行合并,合并邏輯由開發(fā)者自己實現(xiàn)。該類運算應用在KeyedStream上,輸出結果為DataStream。
ReduceFunction<T>中的T代表KeyedStream中元素的數(shù)據(jù)類型,如代碼清單2-5所示。
代碼清單2-5 Reduce代碼示例

6. Fold
Fold與Reduce類似,區(qū)別在于Fold是一個提供了初始值的Reduce,用初始值進行合并運算。該類運算應用在KeyedStream上,輸出結果為DataStream。
Folder接口對應的是FoldFunction,其類泛型為FoldFunction<O, T>,O為KeyStream中的數(shù)據(jù)類型,T為初始值類型和Fold方法返回值類型,如代碼清單2-6所示。
代碼清單2-6 Fold代碼示例

FoldFunction<O, T>已經(jīng)被標記為Deprecated廢棄,替代接口是AggregateFunction<IN, ACC, OUT>。
7. Aggregation
漸進聚合具有相同Key的數(shù)據(jù)流元素,以min和minBy為例,min返回的是整個KeyedStream的最小值,minBy按照Key進行分組,返回每個分組的最小值。在KeyedStream上應用聚合運算輸出結果為DataStream,如代碼清單2-7所示。
代碼清單2-7 內(nèi)置聚合運算代碼示例

8. Window
對KeyedStream的數(shù)據(jù),按照Key進行時間窗口切分,如每5秒鐘一個滾動窗口,每個key都有自己的窗口。該類運算應用在KeyedStream上,輸出結果為WindowedStream。
輸出結果的類泛型為WindowedStream<T, K, W extends Window>,T為KeyedStream中的元素數(shù)據(jù)類型,K為指定Key的數(shù)據(jù)類型,W為窗口類型,如代碼清單2-8所示。
代碼清單2-8 Window代碼示例

關于窗口,第4章會有詳細講解。
9. WindowAll
對一般的DataStream進行時間窗口切分,即全局1個窗口,如每5秒鐘一個滾動窗口。應用在DataStream上,輸出結果為AllWindowedStream,如代碼清單2-9所示。
代碼清單2-9 WindowAll代碼示例

注意:在一般的DataStream上進行窗口切分,往往會導致無法并行計算,所有的數(shù)據(jù)會集中到WindowAll算子的一個Task上。
關于窗口請參照Window原理和機制章節(jié)。
10. Window Apply
將Window函數(shù)應用到窗口上,Window函數(shù)將一個窗口的數(shù)據(jù)作為整體進行處理。Window Stream有兩種:分組后的WindowedStream和未分組的AllWindowedStream。
(1)WindowedStream
在WindowedStream上應用的是WindowFunction,在WindowStream應用此類運算,輸出結果為DataStream。WindowFunction<IN, OUT, KEY, W extends Window>中的IN表示輸入值的類型,OUT表示輸出值的類型,KEY表示Key的類型,W表示窗口的類型,如代碼清單2-10所示。
代碼清單2-10 WindowFunction代碼示例

(2)AllWindowedStream
在AllWindowedStream上應用的是AllWindowFunction,輸出結果為DataStream。該類運算對應的是AllWindowFunction,其類泛型定義為AllWindowFunction<IN, OUT, W extends Window>,IN表示輸入值的類型,OUT表示輸出值的類型,W表示窗口的類型,如代碼清單2-11所示。
代碼清單2-11 AllWindowFunction代碼示例

11. Window Reduce
在WindowedStream上應用ReduceFunction,輸出結果為DataStream。參見前面的Reduce章節(jié),如代碼清單2-12所示。
代碼清單2-12 Window Reduce代碼示例


12. Window Fold
在WindowedStream上應用FoldFunction,輸出結果為DataStream,參見前面的Fold章節(jié),如代碼清單2-13所示。
代碼清單2-13 Window Fold代碼示例

13. Window Aggregation
統(tǒng)計聚合運算,在WindowedStream應用該運算,輸出結果為DataStream。
在WindowedStream上應用AggregationFunction,參見前面的Aggregations章節(jié),如代碼清單2-14所示。
代碼清單2-14 內(nèi)置的Window聚合運算代碼示例

14. Union
把兩個或多個DataStream合并,所有DataStream中的元素都會組合成一個新的DataStream,但是不去重。如果在自身上應用Union運算,則每個元素在新的DataStream出現(xiàn)兩次,如代碼清單2-15所示。
代碼清單2-15 Union運算示例

15. Window Join
在相同時間范圍的窗口上Join兩個DataStream數(shù)據(jù)流,輸出結果為DataStream。
Join核心邏輯在JoinFunction<IN1,IN2,OUT>中實現(xiàn),IN1為第一個DataStream中的數(shù)據(jù)類型,IN2為第二個DataStream中的數(shù)據(jù)類型,OUT為Join結果的數(shù)據(jù)類型,如代碼清單2-16所示。
代碼清單2-16 Join代碼示例


16. Interval Join
對兩個KeyedStream進行Join,需要指定時間范圍和Join時使用的Key,輸出結果為DataStream。
例如對于事件e1和e2,Key相同,時間判斷條件為:

Join的核心邏輯在ProcessJoinFunction<IN1,IN2,OUT>中實現(xiàn),IN1為第一個DataStream中元素數(shù)據(jù)類型,IN2為第二個DataStream中的元素數(shù)據(jù)類型,OUT為結果輸出類型,如代碼清單2-17所示。
代碼清單2-17 Interval Join代碼示例

17. WindowCoGroup
兩個DataStream在相同時間窗口上應用CoGroup運算,輸出結果為DataStream,CoGroup和Join功能類似,但是更加靈活。
CoGroup接口對應的是CoGroupFunction,其類泛型為CoGroupFunction<IN1, IN2, O>,IN1代表第一個DataStream中的元素類型,IN2代表第二個DataStream中的元素類型,O為輸出結果類型,如代碼清單2-18所示。
代碼清單2-18 CoGroup代碼示例

18. Connect
連接(connect)兩個DataStream輸入流,并且保留其類型,輸出流為ConnectedStream。兩個數(shù)據(jù)流之間可以共享狀態(tài)。
輸出數(shù)據(jù)流的類泛型為ConnectedStreams<IN1,IN2>,IN1代表第1個數(shù)據(jù)流中的數(shù)據(jù)類型,IN2表示第2個數(shù)據(jù)流中的數(shù)據(jù)類型,如代碼清單2-19所示。
代碼清單2-19 Connect代碼示例

19. CoMap和CoFlatMap
在ConnectedStream上應用Map和FlatMap運算,輸出流為DataStream。其基本邏輯類似于在一般DataStream上的Map和FlatMap運算,區(qū)別在于CoMap轉換有2個輸入,Map轉換有1個輸入,CoFlatMap同理,如代碼清單2-20所示。
代碼清單2-20 CoMap和CoFlatMap代碼示例

20. Split
將DataStream按照條件切分為多個DataStream,輸出流為SplitDataStream。該方法已經(jīng)標記為Deprecated廢棄,推薦使用SideOutput,如代碼清單2-21所示。
代碼清單2-21 Split代碼示例


21. Select
Select與Split運算配合使用,在Split運算中切分的多個DataStream中,Select用來選擇其中某一個具體的DataStream,如代碼清單2-22所示。
代碼清單2-22 Select代碼示例

22. Iterate
在API層面上,對DataStream應用迭代會生成1個IteractiveStream,然后在IteractiveStream上應用業(yè)務處理邏輯,最終生成1個新的DataStream,IteractiveStream本質上來說是一種中間數(shù)據(jù)流對象。
在數(shù)據(jù)流中創(chuàng)建一個迭代循環(huán),即將下游的輸出發(fā)送給上游重新處理。如果一個算法會持續(xù)地更新模型,這種情況下反饋循環(huán)比較有用,如代碼清單2-23所示。
代碼清單2-23 Iterate代碼示例

23. Extract Timestamps
從記錄中提取時間戳,并生成Watermark。該類運算不會改變DataStream,如代碼清單2-24所示。
代碼清單2-24 提取時間戳代碼示例

24. Project
該類運算只適用于Tuple類型的DataStream,使用Project選取子Tuple,可以選擇Tuple的部分元素,可以改變元素順序,類似于SQL語句中的Select子句,輸出流仍然是DataStream,如代碼清單2-25所示。
代碼清單2-25 Project代碼示例

2.4.3 數(shù)據(jù)寫出
數(shù)據(jù)讀取的API綁定在StreamExecutionEnvironment上,數(shù)據(jù)寫出的API綁定在DataStream對象上。在現(xiàn)在的版本中,只有寫到Console控制臺、Socket網(wǎng)絡端口、自定義三類,寫入文本文件、CSV文件等文件接口都已被標記為廢棄了。接口使用的詳細介紹參照官方文檔即可。
自定義數(shù)據(jù)寫出接口是DataStream.addSink,對于Sink的詳細介紹參見連接器和輸出函數(shù)章節(jié)。
2.4.4 旁路輸出
旁路輸出在Flink中叫作SideOutput,用途類似于DataStream#split,本質上是一個數(shù)據(jù)流的切分行為,按照條件將DataStream切分為多個子數(shù)據(jù)流,子數(shù)據(jù)流叫作旁路輸出數(shù)據(jù)流,每個旁路輸出數(shù)據(jù)流可以有自己的下游處理邏輯。如圖2-9所示,通過旁路輸出將正常和異常的數(shù)據(jù)分別記錄到不同的外部存儲中。

圖2-9 旁路輸出示意
旁路輸出數(shù)據(jù)流的元素的數(shù)據(jù)類型可以與上游數(shù)據(jù)流不同,多個旁路輸出數(shù)據(jù)流的數(shù)據(jù)類型也不必相同。
當使用旁路輸出的時候,首先需要定義OutputTag,OutputTag是每一個下游分支的標識,其定義如代碼清單2-26所示。
代碼清單2-26 OutputTag定義

OutputTag<String>表示該旁路輸出的數(shù)據(jù)類型為String。"side-output-name"是給定該旁路輸出的名稱。
定義好OutputTag之后,只有在特定的函數(shù)中才能使用旁路輸出,具體如下。
1)ProcessFunction。
2)KeyedProcessFunction。
3)CoProcessFunction。
4)ProcessWindowFunction。
5)ProcessAllWindowFunction。
6)ProcessJoinFunction。
7)KeyedCoProcessFunction。
只有在上述函數(shù)中才可以通過Context上下文對象,向OutputTag定義的旁路中輸出emit數(shù)據(jù)。
旁路輸出的使用如代碼清單2-27所示。
代碼清單2-27 旁路輸出代碼示例

旁路輸出的數(shù)據(jù)(DataStream)可以被下游獲取,還可以將旁路輸出DataStream當作一般的DataStream進行處理。按照不同的分支進行不同的業(yè)務處理,獲取旁路數(shù)據(jù)的方法如代碼清單2-28所示。
代碼清單2-28 獲取旁路輸出


Table & SQL的語義中多條Insert語句一起執(zhí)行,使用不同的Where條件輸出到不同的目的地,這就是SideOutput旁路輸出的適用場景。
- 大數(shù)據(jù)技術基礎
- 網(wǎng)上沖浪
- 機器學習與大數(shù)據(jù)技術
- MCSA Windows Server 2016 Certification Guide:Exam 70-741
- 網(wǎng)絡綜合布線技術
- 塊數(shù)據(jù)5.0:數(shù)據(jù)社會學的理論與方法
- 貫通Java Web開發(fā)三劍客
- Hands-On Data Warehousing with Azure Data Factory
- 學練一本通:51單片機應用技術
- 網(wǎng)絡服務器搭建與管理
- 計算機應用基礎實訓(職業(yè)模塊)
- Data Analysis with R(Second Edition)
- 計算機應用基礎實訓·職業(yè)模塊
- 菜鳥起飛電腦組裝·維護與故障排查
- 從零開始學ASP.NET