官术网_书友最值得收藏!

2.4 數(shù)據(jù)流API

DataStream API是Flink流計算應用中最常用的API,相比Table & SQL API更加底層、更加靈活。

2.4.1 數(shù)據(jù)讀取

數(shù)據(jù)讀取的API定義在StreamExecutionEnvironment,這是Flink流計算應用的起點,第一個DataStream就是從數(shù)據(jù)讀取API中構造出來的。在Flink中,除了內(nèi)置的數(shù)據(jù)讀取API外,還針對不同類型的外部存儲系統(tǒng)提供了對應的Connector連接器,使用連接器也能夠實現(xiàn)數(shù)據(jù)讀取的目的。

1.從內(nèi)存讀取數(shù)據(jù)

Flink提供了一系列的方法,直接在內(nèi)存中生成數(shù)據(jù),方便測試和演示。API如圖2-4所示。

圖2-4 內(nèi)存數(shù)據(jù)讀取API

2.文件讀取數(shù)據(jù)

內(nèi)置的從文件中讀取數(shù)據(jù)的API如圖2-5所示。

圖2-5 讀取文件API

從文件中讀取分為讀取文本文件和一般文件兩類,文本文件無須多說,一般文件指的是帶有結構的文件,如Avro、Parquet等。

文件讀取的模式有一次性讀取FileProcessingMode.PROCESS_ONCE和持續(xù)讀取FileProcessingMode.PROCESS_CONTINUOUSLY。如果不指定則默認為一次性讀取。使用持續(xù)讀取模式時,可以設定讀取間隔,單位為ms。間隔越小實時性越高,資源消耗相應變多,反之則實時性越低,資源消耗降低。

3. Socket接入數(shù)據(jù)

Socket接入數(shù)據(jù)即從網(wǎng)絡端口接收數(shù)據(jù)。內(nèi)置的從Socket接入數(shù)據(jù)的API如圖2-6所示。

圖2-6 Socket接入數(shù)據(jù)API

socketTextStream()的參數(shù)比較簡單,需要提供hostname(主機名)、port(端口號)、delimiter(分隔符)和maxRetry(最大重試次數(shù))。

4.自定義讀取

自定義數(shù)據(jù)讀取就是使用Flink連接器、自定義數(shù)據(jù)讀取函數(shù),與外部存儲交互,讀取數(shù)據(jù),如從Kafka、JDBC、HDFS等讀取。自定義數(shù)據(jù)讀取的API如圖2-7所示。

圖2-7 自定義數(shù)據(jù)讀取API

addSource()方法本質上來說依賴于Flink的SourceFunction體系,與外部的存儲進行交互。createInput()方法底層調(diào)用的是addSource()方法,封裝為InputFormatSourceFunction,所以自定義讀取方式的本質就是實現(xiàn)自定義的SourceFunction。關于SourceFunction,將在第3章進行詳細介紹。

2.4.2 處理數(shù)據(jù)

DataStream API使用Fluent風格處理數(shù)據(jù),在開發(fā)的時候其實是在編寫一個DataStream轉換過程,形成了DataStream處理鏈,在Flink開發(fā)章節(jié)有過闡述。調(diào)用DataStream API生成新的DataStream的轉換關系如圖2-8所示。

圖2-8 DataStream相互轉換關系

從圖中可以看到,并不是所有的DataStream都可以相互轉換。

1. Map

接收1個元素,輸出1個元素。Map應用在DataStream上,輸出結果為DataStream。

DataStream#map運算對應的是MapFunction,其類泛型為MapFunction<T,O>,T代表輸入數(shù)據(jù)類型(Map方法的參數(shù)類型),O代表操作結果輸出類型(Map方法返回的數(shù)據(jù)類型),如代碼清單2-1所示。

代碼清單2-1 Map代碼示例

2. FlatMap

接收1個元素,輸出0、1、…、N個元素。該類運算應用在DataStream上,輸出結果為DataStream。

DataStream#flatMap接口對應的是FlatMapFunction,其類泛型為FlatMapFunction<T,O>,T代表輸入數(shù)據(jù)類型(FlatMap方法的參數(shù)類型),O代表操作結果輸出類型,如代碼清單2-2所示。

代碼清單2-2 FlatMap接口示例

3. Filter

過濾數(shù)據(jù),如果返回true則該元素繼續(xù)向下傳遞,如果為false則將該元素過濾掉。該類運算應用在DataStream上,輸出結果為DataStream。

DataStream#filter接口對應的是FilterFunction,其類泛型為FilterFunction<T>,T代表輸入和輸出元素的數(shù)據(jù)類型,如代碼清單2-3所示。

代碼清單2-3 Filter代碼示例

4. KeyBy

將數(shù)據(jù)流元素進行邏輯上的分組,具有相同Key的記錄將被劃分到同一分組。KeyBy()使用Hash Partitioner實現(xiàn)。該運算應用在DataStream上,輸出結果為KeyedStream。

輸出的數(shù)據(jù)流的類型為KeyedStream<T,KEY>,其中T代表KeyedStream中元素數(shù)據(jù)類型,KEY代表邏輯Key的數(shù)據(jù)類型,如代碼清單2-4所示。

代碼清單2-4 KeyBy代碼示例

以下兩種數(shù)據(jù)不能作為Key。

1)POJO類未重寫hashCode(),使用了默認的Object.hashCode()。

2)數(shù)組類型。

5. Reduce

按照KeyedStream中的邏輯分組,將當前數(shù)據(jù)與最后一次的Reduce結果進行合并,合并邏輯由開發(fā)者自己實現(xiàn)。該類運算應用在KeyedStream上,輸出結果為DataStream。

ReduceFunction<T>中的T代表KeyedStream中元素的數(shù)據(jù)類型,如代碼清單2-5所示。

代碼清單2-5 Reduce代碼示例

6. Fold

Fold與Reduce類似,區(qū)別在于Fold是一個提供了初始值的Reduce,用初始值進行合并運算。該類運算應用在KeyedStream上,輸出結果為DataStream。

Folder接口對應的是FoldFunction,其類泛型為FoldFunction<O, T>,O為KeyStream中的數(shù)據(jù)類型,T為初始值類型和Fold方法返回值類型,如代碼清單2-6所示。

代碼清單2-6 Fold代碼示例

FoldFunction<O, T>已經(jīng)被標記為Deprecated廢棄,替代接口是AggregateFunction<IN, ACC, OUT>。

7. Aggregation

漸進聚合具有相同Key的數(shù)據(jù)流元素,以min和minBy為例,min返回的是整個KeyedStream的最小值,minBy按照Key進行分組,返回每個分組的最小值。在KeyedStream上應用聚合運算輸出結果為DataStream,如代碼清單2-7所示。

代碼清單2-7 內(nèi)置聚合運算代碼示例

8. Window

對KeyedStream的數(shù)據(jù),按照Key進行時間窗口切分,如每5秒鐘一個滾動窗口,每個key都有自己的窗口。該類運算應用在KeyedStream上,輸出結果為WindowedStream。

輸出結果的類泛型為WindowedStream<T, K, W extends Window>,T為KeyedStream中的元素數(shù)據(jù)類型,K為指定Key的數(shù)據(jù)類型,W為窗口類型,如代碼清單2-8所示。

代碼清單2-8 Window代碼示例

關于窗口,第4章會有詳細講解。

9. WindowAll

對一般的DataStream進行時間窗口切分,即全局1個窗口,如每5秒鐘一個滾動窗口。應用在DataStream上,輸出結果為AllWindowedStream,如代碼清單2-9所示。

代碼清單2-9 WindowAll代碼示例

注意:在一般的DataStream上進行窗口切分,往往會導致無法并行計算,所有的數(shù)據(jù)會集中到WindowAll算子的一個Task上。

關于窗口請參照Window原理和機制章節(jié)。

10. Window Apply

將Window函數(shù)應用到窗口上,Window函數(shù)將一個窗口的數(shù)據(jù)作為整體進行處理。Window Stream有兩種:分組后的WindowedStream和未分組的AllWindowedStream。

(1)WindowedStream

在WindowedStream上應用的是WindowFunction,在WindowStream應用此類運算,輸出結果為DataStream。WindowFunction<IN, OUT, KEY, W extends Window>中的IN表示輸入值的類型,OUT表示輸出值的類型,KEY表示Key的類型,W表示窗口的類型,如代碼清單2-10所示。

代碼清單2-10 WindowFunction代碼示例

(2)AllWindowedStream

在AllWindowedStream上應用的是AllWindowFunction,輸出結果為DataStream。該類運算對應的是AllWindowFunction,其類泛型定義為AllWindowFunction<IN, OUT, W extends Window>,IN表示輸入值的類型,OUT表示輸出值的類型,W表示窗口的類型,如代碼清單2-11所示。

代碼清單2-11 AllWindowFunction代碼示例

11. Window Reduce

在WindowedStream上應用ReduceFunction,輸出結果為DataStream。參見前面的Reduce章節(jié),如代碼清單2-12所示。

代碼清單2-12 Window Reduce代碼示例

12. Window Fold

在WindowedStream上應用FoldFunction,輸出結果為DataStream,參見前面的Fold章節(jié),如代碼清單2-13所示。

代碼清單2-13 Window Fold代碼示例

13. Window Aggregation

統(tǒng)計聚合運算,在WindowedStream應用該運算,輸出結果為DataStream。

在WindowedStream上應用AggregationFunction,參見前面的Aggregations章節(jié),如代碼清單2-14所示。

代碼清單2-14 內(nèi)置的Window聚合運算代碼示例

14. Union

把兩個或多個DataStream合并,所有DataStream中的元素都會組合成一個新的DataStream,但是不去重。如果在自身上應用Union運算,則每個元素在新的DataStream出現(xiàn)兩次,如代碼清單2-15所示。

代碼清單2-15 Union運算示例

15. Window Join

在相同時間范圍的窗口上Join兩個DataStream數(shù)據(jù)流,輸出結果為DataStream。

Join核心邏輯在JoinFunction<IN1,IN2,OUT>中實現(xiàn),IN1為第一個DataStream中的數(shù)據(jù)類型,IN2為第二個DataStream中的數(shù)據(jù)類型,OUT為Join結果的數(shù)據(jù)類型,如代碼清單2-16所示。

代碼清單2-16 Join代碼示例

16. Interval Join

對兩個KeyedStream進行Join,需要指定時間范圍和Join時使用的Key,輸出結果為DataStream。

例如對于事件e1和e2,Key相同,時間判斷條件為:

Join的核心邏輯在ProcessJoinFunction<IN1,IN2,OUT>中實現(xiàn),IN1為第一個DataStream中元素數(shù)據(jù)類型,IN2為第二個DataStream中的元素數(shù)據(jù)類型,OUT為結果輸出類型,如代碼清單2-17所示。

代碼清單2-17 Interval Join代碼示例

17. WindowCoGroup

兩個DataStream在相同時間窗口上應用CoGroup運算,輸出結果為DataStream,CoGroup和Join功能類似,但是更加靈活。

CoGroup接口對應的是CoGroupFunction,其類泛型為CoGroupFunction<IN1, IN2, O>,IN1代表第一個DataStream中的元素類型,IN2代表第二個DataStream中的元素類型,O為輸出結果類型,如代碼清單2-18所示。

代碼清單2-18 CoGroup代碼示例

18. Connect

連接(connect)兩個DataStream輸入流,并且保留其類型,輸出流為ConnectedStream。兩個數(shù)據(jù)流之間可以共享狀態(tài)。

輸出數(shù)據(jù)流的類泛型為ConnectedStreams<IN1,IN2>,IN1代表第1個數(shù)據(jù)流中的數(shù)據(jù)類型,IN2表示第2個數(shù)據(jù)流中的數(shù)據(jù)類型,如代碼清單2-19所示。

代碼清單2-19 Connect代碼示例

19. CoMap和CoFlatMap

在ConnectedStream上應用Map和FlatMap運算,輸出流為DataStream。其基本邏輯類似于在一般DataStream上的Map和FlatMap運算,區(qū)別在于CoMap轉換有2個輸入,Map轉換有1個輸入,CoFlatMap同理,如代碼清單2-20所示。

代碼清單2-20 CoMap和CoFlatMap代碼示例

20. Split

將DataStream按照條件切分為多個DataStream,輸出流為SplitDataStream。該方法已經(jīng)標記為Deprecated廢棄,推薦使用SideOutput,如代碼清單2-21所示。

代碼清單2-21 Split代碼示例

21. Select

Select與Split運算配合使用,在Split運算中切分的多個DataStream中,Select用來選擇其中某一個具體的DataStream,如代碼清單2-22所示。

代碼清單2-22 Select代碼示例

22. Iterate

在API層面上,對DataStream應用迭代會生成1個IteractiveStream,然后在IteractiveStream上應用業(yè)務處理邏輯,最終生成1個新的DataStream,IteractiveStream本質上來說是一種中間數(shù)據(jù)流對象。

在數(shù)據(jù)流中創(chuàng)建一個迭代循環(huán),即將下游的輸出發(fā)送給上游重新處理。如果一個算法會持續(xù)地更新模型,這種情況下反饋循環(huán)比較有用,如代碼清單2-23所示。

代碼清單2-23 Iterate代碼示例

23. Extract Timestamps

從記錄中提取時間戳,并生成Watermark。該類運算不會改變DataStream,如代碼清單2-24所示。

代碼清單2-24 提取時間戳代碼示例

24. Project

該類運算只適用于Tuple類型的DataStream,使用Project選取子Tuple,可以選擇Tuple的部分元素,可以改變元素順序,類似于SQL語句中的Select子句,輸出流仍然是DataStream,如代碼清單2-25所示。

代碼清單2-25 Project代碼示例

2.4.3 數(shù)據(jù)寫出

數(shù)據(jù)讀取的API綁定在StreamExecutionEnvironment上,數(shù)據(jù)寫出的API綁定在DataStream對象上。在現(xiàn)在的版本中,只有寫到Console控制臺、Socket網(wǎng)絡端口、自定義三類,寫入文本文件、CSV文件等文件接口都已被標記為廢棄了。接口使用的詳細介紹參照官方文檔即可。

自定義數(shù)據(jù)寫出接口是DataStream.addSink,對于Sink的詳細介紹參見連接器和輸出函數(shù)章節(jié)。

2.4.4 旁路輸出

旁路輸出在Flink中叫作SideOutput,用途類似于DataStream#split,本質上是一個數(shù)據(jù)流的切分行為,按照條件將DataStream切分為多個子數(shù)據(jù)流,子數(shù)據(jù)流叫作旁路輸出數(shù)據(jù)流,每個旁路輸出數(shù)據(jù)流可以有自己的下游處理邏輯。如圖2-9所示,通過旁路輸出將正常和異常的數(shù)據(jù)分別記錄到不同的外部存儲中。

圖2-9 旁路輸出示意

旁路輸出數(shù)據(jù)流的元素的數(shù)據(jù)類型可以與上游數(shù)據(jù)流不同,多個旁路輸出數(shù)據(jù)流的數(shù)據(jù)類型也不必相同。

當使用旁路輸出的時候,首先需要定義OutputTag,OutputTag是每一個下游分支的標識,其定義如代碼清單2-26所示。

代碼清單2-26 OutputTag定義

OutputTag<String>表示該旁路輸出的數(shù)據(jù)類型為String。"side-output-name"是給定該旁路輸出的名稱。

定義好OutputTag之后,只有在特定的函數(shù)中才能使用旁路輸出,具體如下。

1)ProcessFunction。

2)KeyedProcessFunction。

3)CoProcessFunction。

4)ProcessWindowFunction。

5)ProcessAllWindowFunction。

6)ProcessJoinFunction。

7)KeyedCoProcessFunction。

只有在上述函數(shù)中才可以通過Context上下文對象,向OutputTag定義的旁路中輸出emit數(shù)據(jù)。

旁路輸出的使用如代碼清單2-27所示。

代碼清單2-27 旁路輸出代碼示例

旁路輸出的數(shù)據(jù)(DataStream)可以被下游獲取,還可以將旁路輸出DataStream當作一般的DataStream進行處理。按照不同的分支進行不同的業(yè)務處理,獲取旁路數(shù)據(jù)的方法如代碼清單2-28所示。

代碼清單2-28 獲取旁路輸出

Table & SQL的語義中多條Insert語句一起執(zhí)行,使用不同的Where條件輸出到不同的目的地,這就是SideOutput旁路輸出的適用場景。

主站蜘蛛池模板: 安庆市| 泸水县| 新安县| 红安县| 仙居县| 曲周县| 彭州市| 永清县| 靖安县| 宕昌县| 集贤县| 泌阳县| 澎湖县| 洪洞县| 高安市| 七台河市| 南漳县| 海伦市| 静安区| 宁国市| 永年县| 长寿区| 舟曲县| 民权县| 繁昌县| 枞阳县| 江津市| 隆昌县| 巩义市| 同江市| 德州市| 新郑市| 乌鲁木齐市| 黄大仙区| 长沙市| 阿拉尔市| 论坛| 张北县| 绥阳县| 桐乡市| 四会市|