官术网_书友最值得收藏!

3.3.4 keyBy轉換

有一些轉換(如join、coGroup、keyBy、groupBy)要求在元素集合上定義一個key。還有一些轉換(如reduce、groupReduce、aggregate、windows)可以應用在按key分組的數據上。關于keyBy轉換的簡單介紹見表3-4。

表3-4 keyBy轉換運算

Flink的數據模型不是基于鍵-值對的,因此,不需要將數據集類型物理打包為key和value。key是“虛擬的”,它們被定義一個函數,該函數可指定數據流中實際數據的哪個字段(或屬性)用作key。需要注意的是,如果流元素是POJO類型,則該POJO類必須重寫hashCode()方法。

最簡單的情況是對元組的一個或多個字段進行分組,參看下面的示例。

Scala代碼如下:

Java代碼如下:

執行以上代碼,輸出結果如下:

1.使用字段表達式來定義key

在Flink 1.11版本之前,也可以使用字段表達式來定義key(從Flink 1.11開始已棄用)。可以使用基于字符串的字段表達式來引用嵌套的字段,并為分組、排序、連接或聯合分組定義key。字段表達式使在(嵌套的)復合類型(如Tuple和POJO類型)中選擇字段變得非常容易。例如,在下面的示例中,按成員性別分區。

Scala代碼如下:

Java代碼如下:

執行以上程序,輸出結果如下:

注意到,相同的key在同一分區內被計算。另外要特別注意的是,對于要充當key的POJO類,必須滿足以下條件:

(1)字段名必須聲明為public。

(2)必須有默認的無參構造器。

(3)所有構造器必須聲明為public。

2.使用key selector函數來定義key

定義key的另一種方法是使用key selector函數。一個key selector函數可接收單個元素作為輸入,并返回該元素的key。返回的key可以是任何類型的,可以從確定性計算中得到。例如,在下面的示例中,根據成員的年齡,將成員分為兩組:成年人和未成年人。

Scala代碼如下:

Java代碼:

執行以下代碼,輸出結果如下:

主站蜘蛛池模板: 武汉市| 额敏县| 即墨市| 邯郸市| 改则县| 广饶县| 张家港市| 阳山县| 栖霞市| 杭锦后旗| 襄汾县| 广宗县| 阿坝| 屏边| 富民县| 丰台区| 寿阳县| 甘德县| 德庆县| 从化市| 西峡县| 青神县| 淄博市| 海晏县| 安多县| 永新县| 社会| 乌什县| 咸阳市| 台州市| 武宣县| 平遥县| 南京市| 阳新县| 肇源县| 海晏县| 达日县| 区。| 蒲城县| 尖扎县| 岑巩县|