官术网_书友最值得收藏!

3.5.1 內置數據接收器

Flink提供了多種內置的Data Sink API,用于日常的開發,具體如下。

1writeAsText/path/to/file"

將計算結果以字符串的方式并行地寫入指定文件夾下。這些字符串是通過調用每個元素的toString()方法獲得的,使用的輸出類是TextOutputFormat。這種方法除了路徑參數是必選外,還可以通過指定第2個參數來定義輸出模式。輸出模式有以下兩個可選值。

(1)WriteMode.NO_OVERWRITE:當指定路徑上不存在任何文件時才執行寫出操作。

(2)WriteMode.OVERWRITE:不論指定路徑上是否存在文件,都執行寫出操作;如果原來已有文件,則進行覆蓋。

(3)以上的寫出是以并行的方式寫出到多個文件,如果想要將輸出結果全部寫出到一個文件,則需要將其并行度設置為1:

2writeAsCsv/path/to/file"

將計算結果以CSV文件格式寫到指定目錄。行和字段分隔符是可配置的。每個字段的值來自對象的toString()方法。使用的輸出類是CsvOutputFormat。該方法除了路徑參數是必選外,還支持傳入輸出模式、行分隔符和字段分隔符3個額外的參數,其方法定義如下:

3print()/printToErr()

在標準輸出/標準錯誤流上打印輸出每個元素的toString()值。可選地,可以提供輸出的前綴,這有助于區分不同的print調用。如果并行度大于1,則輸出也將以產生輸出的任務的id作為前綴。print()\printToErr()是測試當中最常用的方式,用于將計算結果以標準輸出流或錯誤輸出流的方式打印到控制臺上。

4writeUsingOutputFormat()

自定義文件輸出的基類和方法。支持自定義對象到字節的轉換。在定義自定義格式時,需要繼承自FileOutputFormat,它負責序列化和反序列化。上面介紹的writeAsText和writeAsCsv其底層調用的都是該方法,其方法簽名如下:

5writeToSockethostportSerializationSchema

將計算結果以指定的格式寫到指定的socket套接字。為了正確地序列化和格式化,需要定義SerializationSchema。使用示例如下:

6addSink

調用自定義接收器函數。Flink與作為接收器函數實現的其他系統(如Apache Kafka)的連接器捆綁在一起。

注意:在以上方法中,以writeAs?開頭的方法在Flink API文檔中已經被標識為"Deprecated",即棄用狀態,在未來的版本中有可能被刪除,因此使用時要慎重。

在生產中,常用的sinks(接收器)包括Kafka及各種數據庫和文件系統。下面通過示例來掌握Flink Data Sink的常用用法。

【示例3-9】 分析流數據,并將分析結果寫到CSV文件中。

建議按以下步驟操作:

(1)在IntelliJ IDEA中創建一個Flink項目,使用flink-quickstart-scala/flink-quickstart-Java項目模板(Flink項目的創建過程可參考2.2節)。

(2)設置依賴。在pom.xml文件中添加依賴。

Scala Maven依賴:

Java Maven依賴:

(3)創建流應用程序類。

Scala代碼如下:

Java代碼如下:

(4)執行以上程序,可以看到,在項目的根目錄下生成了一個結果文件result。查看輸出的結果文件result,內容如下:

主站蜘蛛池模板: 五莲县| 文成县| 大足县| 临湘市| 囊谦县| 青铜峡市| 安西县| 呼图壁县| 荃湾区| 岑巩县| 白银市| 武威市| 德州市| 天津市| 高平市| 彩票| 井陉县| 韩城市| 昭觉县| 鄱阳县| 苏尼特左旗| 镇康县| 湘潭县| 昌平区| 盐边县| 新泰市| 钟山县| 浮山县| 大邑县| 赤水市| 海林市| 兴义市| 沂源县| 东莞市| 阜新| 南漳县| 泸水县| 藁城市| 商都县| 绥宁县| 孟州市|