官术网_书友最值得收藏!

3.3.10 iterate轉(zhuǎn)換

這個轉(zhuǎn)換操作通過將一個算子的輸出重定向到某個先前的算子在流中創(chuàng)建反饋循環(huán)。這對于定義不斷更新模型的算法特別有用。關(guān)于iterate轉(zhuǎn)換的簡單介紹見表3-10。

表3-10 iterate轉(zhuǎn)換運算

對DataStream使用iterate()方法創(chuàng)建IterativeStream,使用IterativeStream的closeWith()方法來關(guān)閉feedbackStream。

DataStream提供了兩個iterate()方法,它們創(chuàng)建并返回IterativeStream,無參的iterate()方法其maxWaitTimeMillis為0。模板代碼如下:

IterativeStream主要提供了兩種方法,一個是closeWith()方法,用于關(guān)閉迭代,它主要用于定義要被反饋到iteration頭部的這部分iteration(可以理解為回流,或者類似遞歸的操作,filter控制的是遞歸的條件,通過filter的elements會重新進入IterativeStream的頭部繼續(xù)參與后面的運算操作);withFeedbackType()方法創(chuàng)建了ConnectedIterativeStreams。這個過程如圖3-8所示。

圖3-8 iterate轉(zhuǎn)換操作

下面通過一個示例來理解這個運算。在下面的示例中,生成“-2,3,4,5”這樣的數(shù)據(jù)流。流程序的目標是只輸出非負整數(shù)。如果遇到負整數(shù),則被發(fā)送回反饋通道,并不斷地應用迭代體(每次加1,直到非負為止);對于遇到的是非負整數(shù),則被向下轉(zhuǎn)發(fā)。

Scala代碼如下:

Java代碼如下:

執(zhí)行以上程序,輸出結(jié)果如下:

默認情況下,帶有迭代的DataStream將永遠不會終止,但用戶可以使用maxWaitTime參數(shù)設(shè)置迭代頭的最大等待時間。如果在設(shè)置的時間內(nèi)沒有接收到數(shù)據(jù),則流將終止,代碼如下:

主站蜘蛛池模板: 中西区| 江永县| 手机| 苍溪县| 武定县| 华宁县| 留坝县| 温泉县| 荔波县| 大足县| 永川市| 邹平县| 长泰县| 哈巴河县| 台湾省| 黎川县| 山东省| 抚州市| 濉溪县| 大埔区| 资中县| 五台县| 胶南市| 八宿县| 溆浦县| 麻栗坡县| 汕头市| 荔浦县| 揭阳市| 呼伦贝尔市| 长海县| 南靖县| 黎平县| 元氏县| 兰溪市| 德江县| 略阳县| 繁昌县| 平南县| 康定县| 雷山县|