- Flink技術(shù)內(nèi)幕:架構(gòu)設(shè)計(jì)與實(shí)現(xiàn)原理
- 羅江宇 趙士杰等
- 966字
- 2021-12-29 15:54:17
2.1 DataStream
在Flink中用DataSet和DataStream來表示數(shù)據(jù)集,DataSet表示有界的數(shù)據(jù),DataStream表示無界的數(shù)據(jù)。當(dāng)然這只是概念層面的抽象,DataStream并沒有真正的數(shù)據(jù)。DataStream通過初始化Source來構(gòu)造,通過一系列的轉(zhuǎn)換來表達(dá)計(jì)算過程,最后通過Sinker把結(jié)果輸出到外部系統(tǒng)。Flink內(nèi)部集成了大量與外部系統(tǒng)交互的Source和Sink,這部分對(duì)應(yīng)Flink中的Connectors模塊;還有大量的Transformation,這部分對(duì)應(yīng)Flink中的算子(Operator)。
我們來看個(gè)官方的例子:
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; public class WindowWordCount { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String, Integer>> dataStream = env .socketTextStream("localhost", 9999) .flatMap(new Splitter()) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1); dataStream.print(); env.execute("Window WordCount"); } public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception { for (String word: sentence.split(" ")) { out.collect(new Tuple2<String, Integer>(word, 1)); } } } }
在這個(gè)例子中,env.socketTextStream方法(從socket得到數(shù)據(jù))得到DataStream,然后經(jīng)過DataStream的各種轉(zhuǎn)換,這里有flatMap、keyBy、window等轉(zhuǎn)換,最后通過print把結(jié)果輸出到標(biāo)準(zhǔn)輸出(見圖2-1)。

圖2-1 Streaming Dataflow
上面的例子是通過socketTextStream從網(wǎng)絡(luò)端口讀取數(shù)據(jù)得到DataStream,還有一些其他方式,比如:通過讀取文件,readFile (fileInputFormat, path);通過讀取集合數(shù)據(jù)集,fromCollection (Collection)。當(dāng)然,也可以通過方法StreamExecutionEnvironment.addSource (sourceFunction)來定制數(shù)據(jù)的讀取,用戶需要實(shí)現(xiàn)SourceFunction接口。我們來看下這個(gè)方法是怎么得到DataStream的,關(guān)鍵代碼如下:
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) { // 此處省略不相關(guān)的代碼 clean(function); final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function); return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName); }
可以看到該方法新建了一個(gè)DataStreamSource。繼續(xù)看DataStreamSource你會(huì)發(fā)現(xiàn),它繼承了SingleOutputStreamOperator(這個(gè)類從命名看不是很清楚,很容易讓人把它誤認(rèn)為是個(gè)算子,但實(shí)際上它是個(gè)DataStream子類),這樣我們就得到了一個(gè)DataStream。
那么DataStream之間是怎么相互轉(zhuǎn)換的呢?我們來看DataStream的flatMap方法:
public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) { TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes( clean(flatMapper), getType(), Utils.getCallLocationName(), true); // 這里用FlatMapFunction構(gòu)造了一個(gè)StreamOperator return transform("Flat Map", outType, new StreamFlatMap<>(clean(flatMapper))); }
這里構(gòu)造了一個(gè)StreamFlatMap類型的算子,然后繼續(xù)調(diào)用transform方法。我們接著看transform方法:
public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) { //構(gòu)造Transformation OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>( this.transformation, operatorName, operator, outTypeInfo, environment.getParallelism()); SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator( environment, resultTransform); // 把所有的Transformation都保存到StreamExecutionEnvironment中 getExecutionEnvironment().addOperator(resultTransform); return returnStream; }
可以看到,其中最主要的工作是基于剛才的算子新建了一個(gè)OneInputTransformation,然后把該Transformation保存下來。那么StreamExecutionEnvironment中保存的Transformation用來做什么呢?實(shí)際上Flink根據(jù)這些Transformation生成整個(gè)運(yùn)行的拓?fù)洌麄€(gè)生成過程大致如下:
1)根據(jù)Transformation生成StreamGraph;
2)根據(jù)StreamGraph生成JobGraph;
3)根據(jù)JobGraph生成可以調(diào)度運(yùn)行的ExecutionGraph。
整個(gè)過程還會(huì)在第5章詳細(xì)介紹,這里可以先大致了解下。這里用戶的執(zhí)行代碼FlatMapFunction實(shí)際上是通過先傳遞給算子,然后由算子來調(diào)用執(zhí)行的。
最后本例通過dataStream.print()將結(jié)果輸出。同樣,F(xiàn)link提供了很多API來把結(jié)果寫到外部系統(tǒng),這里簡單介紹下。
- writeAsText():輸出字符串到文件。
- writeAsCsv():輸出CSV格式文本。
- print()/printToErr():標(biāo)準(zhǔn)輸出/標(biāo)準(zhǔn)錯(cuò)誤輸出。
- writeToSocket():輸出到socket。
- addSink():addSink與addSource一樣,提供可以供用戶擴(kuò)展的輸出方式,用戶需要實(shí)現(xiàn)SinkFunction接口。
- Testing with JUnit
- Power Up Your PowToon Studio Project
- Web Scraping with Python
- Interactive Data Visualization with Python
- Visual FoxPro 程序設(shè)計(jì)
- JavaScript+jQuery開發(fā)實(shí)戰(zhàn)
- INSTANT CakePHP Starter
- 基于Swift語言的iOS App 商業(yè)實(shí)戰(zhàn)教程
- Python Web數(shù)據(jù)分析可視化:基于Django框架的開發(fā)實(shí)戰(zhàn)
- Learning OpenStack Networking(Neutron)
- Learning Concurrent Programming in Scala
- 微服務(wù)架構(gòu)深度解析:原理、實(shí)踐與進(jìn)階
- 人工智能算法(卷1):基礎(chǔ)算法
- Mastering Concurrency Programming with Java 9(Second Edition)
- 深入理解BootLoader