官术网_书友最值得收藏!

3.2.4 自定義數據源

除了內置數據源,用戶還可以編寫自己的定制數據源。對于非并行數據源,實現SourceFunction;對于并行數據源,實現ParallelSourceFunction接口或擴展(繼承)自RichParallelSourceFunction。

1.了解SourceFunction接口

SourceFunction是Flink中所有流數據源的基本接口。SourceFunction接口的定義如下:

從上面的接口定義中可知,在SourceFunction接口中定義了run()和cancel()兩種方法以及一個內部接口SourceContext,其中run(SourceContex)方法用來實現數據獲取邏輯,并可以通過傳入的參數ctx向下游節點進行數據轉發。cancel()方法用來取消數據源,一般在run()方法中會存在一個循環來持續產生數據,cancel()方法則可以使該循環終止。SourceContext內部接口用于發出元素和可能的watermark的接口。

2.run()方法

在run()方法中實現了數據源向下游發送數據的主要邏輯。編寫模式如下:

(1)不斷調用,以便實現循環發送數據。

(2)使用一種狀態變量控制循環的執行。當cancel()方法執行后必須能夠跳出循環,以便停止發送數據。

(3)使用SourceContext的collect()等方法將元素發送至下游。

(4)如果使用檢查點,在SourceContext收集數據時必須加鎖。防止checkpoint操作和發送數據操作同時進行。

3.cancel()方法

在數據源停止時調用cancel()方法。cancel()方法必須能夠控制run()方法中的循環,即停止循環的運行,并做一些狀態清理操作。

4.SourceContext類

SourceContext在SourceFunction中使用,用于向下游發送數據,或者發送水印。SourceContext的方法包括

(1)collect()方法:向下游發送數據。有以下3種情況:

①如果使用ProcessingTime,則該元素不攜帶timestamp。

②如果使用IngestionTime,則元素使用系統當前時間作為timestamp。

③如果使用EventTime,則元素不攜帶timestamp。需要在數據流后續為元素指定timestamp(assignTimestampAndWatermark)。

(2)collectWithTimestamp()方法:向下游發送帶有timestamp的數據。和collect()方法一樣也有以下3種情況:

①如果使用ProcessingTime,則timestamp會被忽略。

②如果使用IngestionTime,則使用系統時間覆蓋timestamp。

③如果使用EventTime,則使用指定的timestamp。

(3)emitWatermark()方法:向下游發送watermark。watermark也包含一個timestamp。向下游發送watermark意味著所有在watermark的timestamp之前的數據已經到齊。如果在watermark之后,收到了timestamp比該watermark的timestamp小的元素,該元素會被認為遲到,將會被系統忽略,或者進入側輸出(Side Output)。

(4)markAsTemporarilyIdle()方法:將此數據源暫時標記為閑置。該數據源暫時不會發送任何數據和watermark。僅對IngestionTime和EventTime生效。下游任務前移watermark時將不會再等待被標記為閑置的數據源的watermark。

5.CheckpointedFunction

如果數據源需要保存狀態,就需要實現CheckpointedFunction中的相關方法。

CheckpointedFunction包含的方法如下。

(1)snapshotState():保存checkpoint時調用。需要在此方法中編寫狀態保存邏輯。

(2)initializeState():在數據源創建或者從checkpoint恢復時調用。此方法包含數據源的狀態恢復邏輯。

對于自定義的數據源,需要使用StreamExecutionEnvironment.addSource(sourceFunction)方法將指定數據源附加到程序中。例如,如果要從Apache Kafka中讀取數據,則可以使用的模板代碼如下:

【示例3-5】?。ê唵伟姹荆┦褂米远x數據源,模擬信用卡交易流數據生成器。

建議按以下步驟操作:

(1)在IntelliJ IDEA中創建一個Flink項目,使用flink-quickstart-scala/flink-quickstart-Java項目模板。

(2)設置依賴。在pom.xml文件中添加如下依賴:

(3)創建POJO類,表示信用卡交易數據結構,代碼如下:

(4)創建自定義的數據源類,繼承自SourceFunction,代碼如下:

(5)創建一個測試類(帶有main方法的主程序),使用addSource()方法添加自定義數據源,編輯代碼。

Scala代碼如下:

Java代碼如下:

(6)執行以上程序,查看控制臺,輸出結果如下(部分結果):

可以對上面的代碼加以修改,創建更加通用的自定義數據源類。

【示例3-6】?。◤碗s版本)使用自定義數據源,模擬信用卡交易流數據生成器:

建議按以下步驟操作:

(1)在IntelliJ IDEA中創建一個Flink項目,使用flink-quickstart-Java項目模板。

(2)設置依賴。在pom.xml文件中添加的依賴如下:

(3)創建POJO類,表示信用卡交易數據結構,代碼如下:

(4)定義一個迭代器類,實現Iterator接口,封裝了對Transaction信用卡交易數據列表的迭代方法。這是一個通用的數據迭代器類,既可以迭代生成批數據,也可以循環迭代生成流數據,由其中的標志變量bounded來控制,代碼如下:

(5)創建自定義的數據源類,繼承自FromIteratorFunction類(這是Flink自帶的一個SourceFunction的子類),并封裝了一個數據生成器RateLimitedIterator,代碼如下:

(6)創建一個測試類(帶有main方法的主程序),使用addSource()方法添加自定義數據源,編輯代碼。

Scala代碼如下:

Java代碼如下:

(7)執行以上程序,查看控制臺,輸出結果如下(部分結果):

在上面的示例代碼中,自定義的數據源類TransactionSource并不是直接創建SourceFunction類的子類,而是從FromIteratorFunction類繼承的。FromIteratorFunction是Flink的org.apache.flink.streaming.api.functions.source包中自帶的一個類,它已經實現了接口SourceFunction<T>,包裝了一個具體的泛型迭代器對象屬性iterator。通常使用時從它派生一個子類,并在構造時傳入一個具體的迭代數據生成器類即可,其源碼如下:

下面這個參考示例是Flink官方給出的自定義數據源樣例。這個數據源會將0~999發送到下游系統,代碼如下:

主站蜘蛛池模板: 洪湖市| 积石山| 论坛| 淳化县| 五原县| 临江市| 兖州市| 泸溪县| 诸城市| 福州市| 肥东县| 安化县| 汝阳县| 喀喇沁旗| 普安县| 阜新| 万山特区| 呼和浩特市| 株洲市| 清涧县| 永川市| 乐安县| 石林| 徐水县| 威信县| 长丰县| 德昌县| 阿瓦提县| 会同县| 鲜城| 西充县| 明水县| 麦盖提县| 来安县| 平远县| 神木县| 揭东县| 崇州市| 汉阴县| 凤山市| 黑河市|