- Flink原理深入與編程實戰:Scala+Java(微課視頻版)
- 辛立偉編著
- 1803字
- 2023-07-17 18:54:44
3.2.4 自定義數據源
除了內置數據源,用戶還可以編寫自己的定制數據源。對于非并行數據源,實現SourceFunction;對于并行數據源,實現ParallelSourceFunction接口或擴展(繼承)自RichParallelSourceFunction。
1.了解SourceFunction接口
SourceFunction是Flink中所有流數據源的基本接口。SourceFunction接口的定義如下:

從上面的接口定義中可知,在SourceFunction接口中定義了run()和cancel()兩種方法以及一個內部接口SourceContext,其中run(SourceContex)方法用來實現數據獲取邏輯,并可以通過傳入的參數ctx向下游節點進行數據轉發。cancel()方法用來取消數據源,一般在run()方法中會存在一個循環來持續產生數據,cancel()方法則可以使該循環終止。SourceContext內部接口用于發出元素和可能的watermark的接口。
2.run()方法
在run()方法中實現了數據源向下游發送數據的主要邏輯。編寫模式如下:
(1)不斷調用,以便實現循環發送數據。
(2)使用一種狀態變量控制循環的執行。當cancel()方法執行后必須能夠跳出循環,以便停止發送數據。
(3)使用SourceContext的collect()等方法將元素發送至下游。
(4)如果使用檢查點,在SourceContext收集數據時必須加鎖。防止checkpoint操作和發送數據操作同時進行。
3.cancel()方法
在數據源停止時調用cancel()方法。cancel()方法必須能夠控制run()方法中的循環,即停止循環的運行,并做一些狀態清理操作。
4.SourceContext類
SourceContext在SourceFunction中使用,用于向下游發送數據,或者發送水印。SourceContext的方法包括
(1)collect()方法:向下游發送數據。有以下3種情況:
①如果使用ProcessingTime,則該元素不攜帶timestamp。
②如果使用IngestionTime,則元素使用系統當前時間作為timestamp。
③如果使用EventTime,則元素不攜帶timestamp。需要在數據流后續為元素指定timestamp(assignTimestampAndWatermark)。
(2)collectWithTimestamp()方法:向下游發送帶有timestamp的數據。和collect()方法一樣也有以下3種情況:
①如果使用ProcessingTime,則timestamp會被忽略。
②如果使用IngestionTime,則使用系統時間覆蓋timestamp。
③如果使用EventTime,則使用指定的timestamp。
(3)emitWatermark()方法:向下游發送watermark。watermark也包含一個timestamp。向下游發送watermark意味著所有在watermark的timestamp之前的數據已經到齊。如果在watermark之后,收到了timestamp比該watermark的timestamp小的元素,該元素會被認為遲到,將會被系統忽略,或者進入側輸出(Side Output)。
(4)markAsTemporarilyIdle()方法:將此數據源暫時標記為閑置。該數據源暫時不會發送任何數據和watermark。僅對IngestionTime和EventTime生效。下游任務前移watermark時將不會再等待被標記為閑置的數據源的watermark。
5.CheckpointedFunction
如果數據源需要保存狀態,就需要實現CheckpointedFunction中的相關方法。
CheckpointedFunction包含的方法如下。
(1)snapshotState():保存checkpoint時調用。需要在此方法中編寫狀態保存邏輯。
(2)initializeState():在數據源創建或者從checkpoint恢復時調用。此方法包含數據源的狀態恢復邏輯。
對于自定義的數據源,需要使用StreamExecutionEnvironment.addSource(sourceFunction)方法將指定數據源附加到程序中。例如,如果要從Apache Kafka中讀取數據,則可以使用的模板代碼如下:

【示例3-5】?。ê唵伟姹荆┦褂米远x數據源,模擬信用卡交易流數據生成器。
建議按以下步驟操作:
(1)在IntelliJ IDEA中創建一個Flink項目,使用flink-quickstart-scala/flink-quickstart-Java項目模板。
(2)設置依賴。在pom.xml文件中添加如下依賴:

(3)創建POJO類,表示信用卡交易數據結構,代碼如下:



(4)創建自定義的數據源類,繼承自SourceFunction,代碼如下:




(5)創建一個測試類(帶有main方法的主程序),使用addSource()方法添加自定義數據源,編輯代碼。
Scala代碼如下:

Java代碼如下:

(6)執行以上程序,查看控制臺,輸出結果如下(部分結果):

可以對上面的代碼加以修改,創建更加通用的自定義數據源類。
【示例3-6】?。◤碗s版本)使用自定義數據源,模擬信用卡交易流數據生成器:
建議按以下步驟操作:
(1)在IntelliJ IDEA中創建一個Flink項目,使用flink-quickstart-Java項目模板。
(2)設置依賴。在pom.xml文件中添加的依賴如下:

(3)創建POJO類,表示信用卡交易數據結構,代碼如下:



(4)定義一個迭代器類,實現Iterator接口,封裝了對Transaction信用卡交易數據列表的迭代方法。這是一個通用的數據迭代器類,既可以迭代生成批數據,也可以循環迭代生成流數據,由其中的標志變量bounded來控制,代碼如下:




(5)創建自定義的數據源類,繼承自FromIteratorFunction類(這是Flink自帶的一個SourceFunction的子類),并封裝了一個數據生成器RateLimitedIterator,代碼如下:

(6)創建一個測試類(帶有main方法的主程序),使用addSource()方法添加自定義數據源,編輯代碼。
Scala代碼如下:

Java代碼如下:

(7)執行以上程序,查看控制臺,輸出結果如下(部分結果):

在上面的示例代碼中,自定義的數據源類TransactionSource并不是直接創建SourceFunction類的子類,而是從FromIteratorFunction類繼承的。FromIteratorFunction是Flink的org.apache.flink.streaming.api.functions.source包中自帶的一個類,它已經實現了接口SourceFunction<T>,包裝了一個具體的泛型迭代器對象屬性iterator。通常使用時從它派生一個子類,并在構造時傳入一個具體的迭代數據生成器類即可,其源碼如下:

下面這個參考示例是Flink官方給出的自定義數據源樣例。這個數據源會將0~999發送到下游系統,代碼如下:

- Boost程序庫完全開發指南:深入C++”準”標準庫(第5版)
- Node.js 10實戰
- LabVIEW2018中文版 虛擬儀器程序設計自學手冊
- 構建移動網站與APP:HTML 5移動開發入門與實戰(跨平臺移動開發叢書)
- Java虛擬機字節碼:從入門到實戰
- Learning ArcGIS Pro
- Data Analysis with Stata
- C語言課程設計
- RESTful Java Web Services(Second Edition)
- LabVIEW虛擬儀器程序設計從入門到精通(第二版)
- Building Serverless Web Applications
- Nagios Core Administration Cookbook(Second Edition)
- Extending Unity with Editor Scripting
- Data Manipulation with R(Second Edition)
- Spring Boot從入門到實戰