- Flink原理深入與編程實戰:Scala+Java(微課視頻版)
- 辛立偉編著
- 1000字
- 2023-07-17 18:54:55
3.5.1 內置數據接收器
Flink提供了多種內置的Data Sink API,用于日常的開發,具體如下。
1)writeAsText("/path/to/file")
將計算結果以字符串的方式并行地寫入指定文件夾下。這些字符串是通過調用每個元素的toString()方法獲得的,使用的輸出類是TextOutputFormat。這種方法除了路徑參數是必選外,還可以通過指定第2個參數來定義輸出模式。輸出模式有以下兩個可選值。
(1)WriteMode.NO_OVERWRITE:當指定路徑上不存在任何文件時才執行寫出操作。
(2)WriteMode.OVERWRITE:不論指定路徑上是否存在文件,都執行寫出操作;如果原來已有文件,則進行覆蓋。
(3)以上的寫出是以并行的方式寫出到多個文件,如果想要將輸出結果全部寫出到一個文件,則需要將其并行度設置為1:

2)writeAsCsv("/path/to/file")
將計算結果以CSV文件格式寫到指定目錄。行和字段分隔符是可配置的。每個字段的值來自對象的toString()方法。使用的輸出類是CsvOutputFormat。該方法除了路徑參數是必選外,還支持傳入輸出模式、行分隔符和字段分隔符3個額外的參數,其方法定義如下:

3)print()/printToErr()
在標準輸出/標準錯誤流上打印輸出每個元素的toString()值。可選地,可以提供輸出的前綴,這有助于區分不同的print調用。如果并行度大于1,則輸出也將以產生輸出的任務的id作為前綴。print()\printToErr()是測試當中最常用的方式,用于將計算結果以標準輸出流或錯誤輸出流的方式打印到控制臺上。
4)writeUsingOutputFormat()
自定義文件輸出的基類和方法。支持自定義對象到字節的轉換。在定義自定義格式時,需要繼承自FileOutputFormat,它負責序列化和反序列化。上面介紹的writeAsText和writeAsCsv其底層調用的都是該方法,其方法簽名如下:

5)writeToSocket(host,port,SerializationSchema)
將計算結果以指定的格式寫到指定的socket套接字。為了正確地序列化和格式化,需要定義SerializationSchema。使用示例如下:

6)addSink
調用自定義接收器函數。Flink與作為接收器函數實現的其他系統(如Apache Kafka)的連接器捆綁在一起。
注意:在以上方法中,以writeAs?開頭的方法在Flink API文檔中已經被標識為"Deprecated",即棄用狀態,在未來的版本中有可能被刪除,因此使用時要慎重。
在生產中,常用的sinks(接收器)包括Kafka及各種數據庫和文件系統。下面通過示例來掌握Flink Data Sink的常用用法。
【示例3-9】 分析流數據,并將分析結果寫到CSV文件中。
建議按以下步驟操作:
(1)在IntelliJ IDEA中創建一個Flink項目,使用flink-quickstart-scala/flink-quickstart-Java項目模板(Flink項目的創建過程可參考2.2節)。
(2)設置依賴。在pom.xml文件中添加依賴。
Scala Maven依賴:

Java Maven依賴:

(3)創建流應用程序類。
Scala代碼如下:

Java代碼如下:

(4)執行以上程序,可以看到,在項目的根目錄下生成了一個結果文件result。查看輸出的結果文件result,內容如下:

- Git Version Control Cookbook
- 大學計算機應用基礎實踐教程
- Visual Studio 2012 Cookbook
- Web Development with MongoDB and Node(Third Edition)
- 組態軟件技術與應用
- RealSenseTM互動開發實戰
- 詳解MATLAB圖形繪制技術
- Vue.js 2 Web Development Projects
- Python入門很輕松(微課超值版)
- C++ Application Development with Code:Blocks
- OpenMP核心技術指南
- 一步一步跟我學Scratch3.0案例
- HTML5移動前端開發基礎與實戰(微課版)
- Spark技術內幕:深入解析Spark內核架構設計與實現原理
- SCRATCH編程課:我的游戲我做主