官术网_书友最值得收藏!

2.1 DataStream

在Flink中用DataSet和DataStream來表示數(shù)據(jù)集,DataSet表示有界的數(shù)據(jù),DataStream表示無界的數(shù)據(jù)。當(dāng)然這只是概念層面的抽象,DataStream并沒有真正的數(shù)據(jù)。DataStream通過初始化Source來構(gòu)造,通過一系列的轉(zhuǎn)換來表達(dá)計(jì)算過程,最后通過Sinker把結(jié)果輸出到外部系統(tǒng)。Flink內(nèi)部集成了大量與外部系統(tǒng)交互的Source和Sink,這部分對(duì)應(yīng)Flink中的Connectors模塊;還有大量的Transformation,這部分對(duì)應(yīng)Flink中的算子(Operator)。

我們來看個(gè)官方的例子:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class WindowWordCount {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9999)
                .flatMap(new Splitter())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);

        dataStream.print();

        env.execute("Window WordCount");
    }

    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word: sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }
}

在這個(gè)例子中,env.socketTextStream方法(從socket得到數(shù)據(jù))得到DataStream,然后經(jīng)過DataStream的各種轉(zhuǎn)換,這里有flatMap、keyBy、window等轉(zhuǎn)換,最后通過print把結(jié)果輸出到標(biāo)準(zhǔn)輸出(見圖2-1)。

028-1

圖2-1 Streaming Dataflow

上面的例子是通過socketTextStream從網(wǎng)絡(luò)端口讀取數(shù)據(jù)得到DataStream,還有一些其他方式,比如:通過讀取文件,readFile (fileInputFormat, path);通過讀取集合數(shù)據(jù)集,fromCollection (Collection)。當(dāng)然,也可以通過方法StreamExecutionEnvironment.addSource (sourceFunction)來定制數(shù)據(jù)的讀取,用戶需要實(shí)現(xiàn)SourceFunction接口。我們來看下這個(gè)方法是怎么得到DataStream的,關(guān)鍵代碼如下:

public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function,
        String sourceName, TypeInformation<OUT> typeInfo) {
    // 此處省略不相關(guān)的代碼
    clean(function);
    final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);
    return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName);
}

可以看到該方法新建了一個(gè)DataStreamSource。繼續(xù)看DataStreamSource你會(huì)發(fā)現(xiàn),它繼承了SingleOutputStreamOperator(這個(gè)類從命名看不是很清楚,很容易讓人把它誤認(rèn)為是個(gè)算子,但實(shí)際上它是個(gè)DataStream子類),這樣我們就得到了一個(gè)DataStream。

那么DataStream之間是怎么相互轉(zhuǎn)換的呢?我們來看DataStream的flatMap方法:

public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {

    TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(
            clean(flatMapper), getType(), Utils.getCallLocationName(), true);
    // 這里用FlatMapFunction構(gòu)造了一個(gè)StreamOperator
    return transform("Flat Map", outType, new StreamFlatMap<>(clean(flatMapper)));
}

這里構(gòu)造了一個(gè)StreamFlatMap類型的算子,然后繼續(xù)調(diào)用transform方法。我們接著看transform方法:

public <R> SingleOutputStreamOperator<R> transform(String operatorName,
        TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {

    //構(gòu)造Transformation
    OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
            this.transformation,
            operatorName,
            operator,
            outTypeInfo,
            environment.getParallelism());

    SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(
            environment, resultTransform);
    // 把所有的Transformation都保存到StreamExecutionEnvironment中
    getExecutionEnvironment().addOperator(resultTransform);
    return returnStream;
}

可以看到,其中最主要的工作是基于剛才的算子新建了一個(gè)OneInputTransformation,然后把該Transformation保存下來。那么StreamExecutionEnvironment中保存的Transformation用來做什么呢?實(shí)際上Flink根據(jù)這些Transformation生成整個(gè)運(yùn)行的拓?fù)洌麄€(gè)生成過程大致如下:

1)根據(jù)Transformation生成StreamGraph;

2)根據(jù)StreamGraph生成JobGraph;

3)根據(jù)JobGraph生成可以調(diào)度運(yùn)行的ExecutionGraph。

整個(gè)過程還會(huì)在第5章詳細(xì)介紹,這里可以先大致了解下。這里用戶的執(zhí)行代碼FlatMapFunction實(shí)際上是通過先傳遞給算子,然后由算子來調(diào)用執(zhí)行的。

最后本例通過dataStream.print()將結(jié)果輸出。同樣,F(xiàn)link提供了很多API來把結(jié)果寫到外部系統(tǒng),這里簡單介紹下。

  • writeAsText():輸出字符串到文件。
  • writeAsCsv():輸出CSV格式文本。
  • print()/printToErr():標(biāo)準(zhǔn)輸出/標(biāo)準(zhǔn)錯(cuò)誤輸出。
  • writeToSocket():輸出到socket。
  • addSink():addSink與addSource一樣,提供可以供用戶擴(kuò)展的輸出方式,用戶需要實(shí)現(xiàn)SinkFunction接口。
主站蜘蛛池模板: 五家渠市| 琼海市| 桦甸市| 招远市| 新绛县| 阳东县| 吴堡县| 克山县| 冷水江市| 岳池县| 东光县| 卓尼县| 唐山市| 南宫市| 宜兰市| 玉门市| 稻城县| 封开县| 宁都县| 五常市| 洪洞县| 沿河| 喀什市| 盈江县| 沈丘县| 虎林市| 扶余县| 乐亭县| 上林县| 侯马市| 蒙山县| 武义县| 共和县| 泰来县| 义马市| 宣武区| 北海市| 高清| 渑池县| 申扎县| 油尖旺区|