- Flink原理深入與編程實戰(zhàn):Scala+Java(微課視頻版)
- 辛立偉編著
- 494字
- 2023-07-17 18:54:50
3.3.10 iterate轉(zhuǎn)換
這個轉(zhuǎn)換操作通過將一個算子的輸出重定向到某個先前的算子在流中創(chuàng)建反饋循環(huán)。這對于定義不斷更新模型的算法特別有用。關(guān)于iterate轉(zhuǎn)換的簡單介紹見表3-10。
表3-10 iterate轉(zhuǎn)換運算

對DataStream使用iterate()方法創(chuàng)建IterativeStream,使用IterativeStream的closeWith()方法來關(guān)閉feedbackStream。
DataStream提供了兩個iterate()方法,它們創(chuàng)建并返回IterativeStream,無參的iterate()方法其maxWaitTimeMillis為0。模板代碼如下:

IterativeStream主要提供了兩種方法,一個是closeWith()方法,用于關(guān)閉迭代,它主要用于定義要被反饋到iteration頭部的這部分iteration(可以理解為回流,或者類似遞歸的操作,filter控制的是遞歸的條件,通過filter的elements會重新進入IterativeStream的頭部繼續(xù)參與后面的運算操作);withFeedbackType()方法創(chuàng)建了ConnectedIterativeStreams。這個過程如圖3-8所示。

圖3-8 iterate轉(zhuǎn)換操作
下面通過一個示例來理解這個運算。在下面的示例中,生成“-2,3,4,5”這樣的數(shù)據(jù)流。流程序的目標是只輸出非負整數(shù)。如果遇到負整數(shù),則被發(fā)送回反饋通道,并不斷地應用迭代體(每次加1,直到非負為止);對于遇到的是非負整數(shù),則被向下轉(zhuǎn)發(fā)。
Scala代碼如下:

Java代碼如下:

執(zhí)行以上程序,輸出結(jié)果如下:

默認情況下,帶有迭代的DataStream將永遠不會終止,但用戶可以使用maxWaitTime參數(shù)設(shè)置迭代頭的最大等待時間。如果在設(shè)置的時間內(nèi)沒有接收到數(shù)據(jù),則流將終止,代碼如下:

- Visual Basic .NET程序設(shè)計(第3版)
- JavaScript+DHTML語法與范例詳解詞典
- Getting started with Google Guava
- 深度學習經(jīng)典案例解析:基于MATLAB
- JavaScript+jQuery網(wǎng)頁特效設(shè)計任務驅(qū)動教程(第2版)
- Mastering Selenium WebDriver
- 匯編語言程序設(shè)計(第2版)
- PHP+MySQL網(wǎng)站開發(fā)項目式教程
- 微信小程序開發(fā)解析
- Reactive Android Programming
- Integrating Facebook iOS SDK with Your Application
- Unity 2D Game Development Cookbook
- 精通Python自動化編程
- Building Wireless Sensor Networks Using Arduino
- ArcGIS for Desktop Cookbook