官术网_书友最值得收藏!

2.3 窗口

本節主要介紹窗口的原理及實現,幫助讀者深入了解Flink中的窗口是怎么實現的。

2.3.1 窗口的基本概念

窗口是無邊界流式系統中非常重要的概念,窗口把數據切分成一段段有限的數據集,然后進行計算。Flink中窗口按照是否并發執行,分為Keyed Window和Non-Keyed Window,它們的主要區別是有無keyBy動作。Keyed Window可以按照指定的分區方式并發執行,所有相同的鍵會被分配到相同的任務上執行。Non-Keyed Window會把所有數據放到一個任務上執行,并發度為1。我們來看下窗口的相關API。

1. Keyed Window

stream.keyBy(...)
    .window(...)          // 接受WindowAssigner參數,用來分配窗口
    [.trigger(...)]       // 可選的,接受Trigger類型參數,用來觸發窗口
    [.evictor(...)]       // 可選的,接受Evictor類型參數,用來驅逐窗口中的數據
    [.allowedLateness(...)]
    // 可選的,接受Time類型參數,表示窗口允許的最大延遲,超過該延遲,數據會被丟棄
    [.sideOutputLateData(...)]

    // 可選的,接受OutputTag類型參數,用來定義拋棄數據的輸出
    .reduce/aggregate/fold/apply()      // 窗口函數
    [.getSideOutput(...)]               // 可選的,獲取指定的DataStream

2. Non-Keyed Window

stream.windowAll(...)           // 接受WindowAssigner參數,用來分配窗口
    [.trigger(...)]             // 可選的,接受Trigger類型參數,用來觸發窗口
    [.evictor(...)]             // 可選的,接受Evictor類型參數,用來驅逐窗口中的數據
    [.allowedLateness(...)]

    // 可選的,接受Time類型參數,表示窗口允許的最大延遲,超過該延遲,數據會被丟棄
    [.sideOutputLateData(...)]

    // 可選的,接受OutputTag類型參數,用來定義拋棄數據的輸出
    .reduce/aggregate/fold/apply()      // 窗口函數
    [.getSideOutput(...)]               // 可選的,獲取指定的DataStream

因為實際生產中我們大多會使用Keyed Window,所以后續章節的解讀都是針對Keyed Window展開的。我們來看下上面提到的幾個主要概念。

  • WindowAssigner:窗口分配器。我們常說的滾動窗口、滑動窗口、會話窗口等就是由WindowAssigner決定的,比如TumblingEventTimeWindows可以產生基于事件時間的滾動窗口。
  • Trigger:觸發器。Flink根據WindowAssigner把數據分配到不同的窗口,還需要一個執行時機,Trigger就是用來判斷執行時機的。Trigger類中定義了一些返回值類型,根據返回值類型來決定是否觸發及觸發什么動作。
  • Evictor:驅逐器。在窗口觸發之后,在調用窗口函數之前或者之后,Flink允許我們定制要處理的數據集合,Evictor就是用來驅逐或者過濾不需要的數據集的。
  • Allowed Lateness:最大允許延遲。主要用在基于事件時間的窗口,表示在水位線到達之后的最長允許數據延遲時間。在最長允許延遲時間內,窗口都不會銷毀。
  • Window Function:窗口函數。用戶代碼執行函數,用來做真正的業務計算。
  • Side Output:丟棄數據的集合。通過getSideOutput方法可以獲取丟棄數據的DataStream,方便用戶進行擴展。

以上就是窗口的一些主要概念,接下來我們深入分析窗口的每個元素。

2.3.2 窗口的執行流程

在深入介紹窗口之前,我們先從整體上看下窗口的執行過程,以便有個全局的概念。本節從整體上介紹窗口的執行流程,如果其中有細節不清楚的地方,可以繞過本節,直接看后面幾節,再回過頭來看本節內容。

窗口本質上也是一個算子,所以我們直接來看其實現類:EvictingWindowOperator和WindowOperator。這兩個類的區別是前者帶驅逐器,后者不帶。為了覆蓋更多的場景,我們用EvictingWindowOperator來分析。

我們直接從算子最重要的方法processElement開始。如圖2-4所示,整個過程從分配窗口(WindowAssigner的主要作用)開始,分配好窗口后,用當前窗口來設置窗口狀態的命名空間;之后把當前數據加入狀態中(如果是聚合函數的話,還會有計算過程),并用當前數據去判斷觸發器是否觸發,如果觸發,那就調用emitWindowContents方法處理數據,該方法的主要過程是調用驅逐器清除數據;然后調用窗口函數計算結果;最后注冊一個窗口本身的清除定時器。

040-1

圖2-4 窗口算子執行流程

主要源代碼如下:

public void processElement(StreamRecord<IN> element) throws Exception {
    final Collection<W> elementWindows = windowAssigner.assignWindows(
            element.getValue(), element.getTimestamp(), windowAssignerContext);
    if (windowAssigner instanceof MergingWindowAssigner) {
        MergingWindowSet<W> mergingWindows = getMergingWindowSet();
        W actualWindow = mergingWindows.addWindow(...)
    }
    evictingWindowState.setCurrentNamespace(stateWindow);
    evictingWindowState.add(element);

    TriggerResult triggerResult = triggerContext.onElement(element);

    if (triggerResult.isFire()) {
        Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
        if (contents == null) {
            continue;
        }
        emitWindowContents(actualWindow, contents, evictingWindowState);
    }

    registerCleanupTimer(window);
}

2.3.3 窗口分配器

本節主要介紹Flink中窗口分配器的作用及幾種典型實現,這幾種典型的實現實際上對應著幾種典型的窗口。

熟悉流計算的讀者可能知道,窗口(時間窗口)大致可以分為滑動窗口和滾動窗口。那么這個分類是由什么決定的呢?顯然它是由數據分配到不同窗口的方式決定的。在Flink中,這個分配的動作就是由窗口分配器完成的。不同的窗口分配器實現類對應不同的窗口。

窗口分配器的接口定義如下:

public abstract class WindowAssigner<T, W extends Window> implements Serializable {
    private static final long serialVersionUID = 1L;

    public abstract Collection<W> assignWindows(T element,
            long timestamp, WindowAssignerContext context);

    public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);

    public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);

    public abstract boolean isEventTime();

    public abstract static class WindowAssignerContext {
        /**
         * 返回當前的處理時間
         */
        public abstract long getCurrentProcessingTime();
    }
}

其中最關鍵的是assignWindows方法,它用來分配窗口。我們來看幾種常用的實現。

1. 滾動窗口

Flink中有TumblingEventTimeWindows和TumblingProcessingTimeWindows兩種滾動窗口(Tumbling Window),分別對應基于事件時間的滾動窗口和基于系統時間的滾動窗口。這兩種實現分配數據的策略實際上是一樣的,只是基于的時間不同。我們來看下TumblingEventTimeWindows的assignWindows方法:

public Collection<TimeWindow> assignWindows(Object element,
        long timestamp, WindowAssignerContext context) {
    if (timestamp > Long.MIN_VALUE) {
        // 計算窗口開始的時間
        long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
        return Collections.singletonList(new TimeWindow(start, start + size));
    } else {
        throw new RuntimeException(
            "Record has Long.MIN_VALUE timestamp (= no timestamp marker). "
            + "Is the time characteristic set to 'ProcessingTime', " + "or did you forget to call "
            + "'DataStream.assignTimestampsAndWatermarks(...)'?");
    }
}

可以看到,其實現還是比較清楚的,根據窗口的大小(size)、偏移量(offset)、數據時間計算窗口的開始時間。具體的計算方法如下:

public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
    return timestamp - (timestamp - offset + windowSize) % windowSize;
}

返回一個TimeWindow。

2. 滑動窗口

和滾動窗口一樣,滑動窗口(Sliding Window)也有SlidingEventTimeWindows和Sliding-ProcessingTimeWindows兩種實現,兩種實現也基本是一樣的。我們來看SlidingProcessing-TimeWindows的assignWindows方法:

public Collection<TimeWindow> assignWindows(Object element, long timestamp,
        WindowAssignerContext context) {
    timestamp = context.getCurrentProcessingTime();
    List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
    long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
    for (long start = lastStart;
        start > timestamp - size;
        start -= slide) {
            windows.add(new TimeWindow(start, start + size));
        }
    return windows;
}

首先我們看到一個最明顯的區別是返回的TimeWindow個數不同,滾動窗口只返回一個,而滑動窗口返回多個,這也符合我們對滑動窗口的理解:滑動窗口是可以重疊的,一個數據可以落入多個窗口內(可以思考一下一個數據最多可以落入幾個窗口內)。與滾動窗口一樣,計算最后一個窗口的開始時間,然后不斷回溯(前一個窗口的開始時間減去滑動時間)尋找位于時間范圍內的窗口,直到窗口的結束時間早于系統時間(或者事件時間)。

3. 會話窗口

會話窗口(Session Window)是Flink中比較獨特的窗口類型,其他流式系統不支持它,或支持得不夠好。會話窗口可以按照一個會話來分配數據,而會話的長度可以是固定的(EventTimeSessionWindows、ProcessingTimeSessionWindows),也可以是不斷變化的(DynamicProcessingTimeSessionWindows、DynamicEventTimeSessionWindows)。使用過會話的讀者可能知道,只要不過期會話就可以一直存在,新的數據必然會加入某個會話,同時會導致會話的超時時間發生改變。在Flink中,會話的不斷變化就對應著會話窗口的不斷合并。我們以EventTimeSessionWindows為例來看下會話窗口的實現,其中比較復雜的是窗口的合并。

會話窗口中數據的分配和滾動窗口很像,即返回一個計算好的窗口(TimeWindow)。

public Collection<TimeWindow> assignWindows(Object element, long timestamp,
        WindowAssignerContext context) {
    return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout));
}

窗口的分配過程結束后,會得到一個窗口。這個新分配的窗口屬于哪個會話(真正的窗口)呢?我們來看圖2-5中的例子(例子中sessionTimeout=3)。

043-1

圖2-5 會話窗口

假如Flink接收到數據時間為1的數據(圖2-5中的步驟1)(這里我們假設鍵相同或者是Non-Keyed Window),那么這個時候會生成TimeWindow(1,4),并處理數據時間為5的數據,生成TimeWindow(5,8);然后繼續處理時間為3的數據,這個時候應該生成TimeWindow(3,6)的窗口,但是由于TimeWindow(1,4)對應的會話還沒有過期,應該把時間為3的數據歸到這個會話中,所以Flink中進行TimeWindow的合并。同理,當TimeWindow(1,4)和TimeWindow(3,6)合并為TimeWindow(1,6)的時候,也應該將TimeWindow(5,8)同自己合并,這樣最后合并為TimeWindow(1,8)。當然不只是將TimeWindow合并,還需要將窗口對應的觸發器、數據合并。我們來看合并的關鍵代碼,合并發生在數據被WindowOperator處理的過程中:

W actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() {
    @Override
    public void merge(W mergeResult,
            Collection<W> mergedWindows, W stateWindowResult,
            Collection<W> mergedStateWindows) throws Exception {

        if ((windowAssigner.isEventTime() << mergeResult.maxTimestamp() +
                allowedLateness <= internalTimerService.currentWatermark())) {
            throw new UnsupportedOperationException("The end timestamp of an "
                + "event-time window cannot become earlier than the current watermark "
                + "by merging. Current watermark: "
                + internalTimerService.currentWatermark()
                + " window: " + mergeResult);
        } else if (!windowAssigner.isEventTime()) {
            long currentProcessingTime = internalTimerService.currentProcessingTime();
            if (mergeResult.maxTimestamp() <= currentProcessingTime) {
                throw new UnsupportedOperationException("The end timestamp of a "
                + "processing-time window cannot become earlier than "
                + "the current processing time "
                + "by merging. Current processing time: " + currentProcessingTime
                + " window: " + mergeResult);
            }
        }

        triggerContext.key = key;
        triggerContext.window = mergeResult;

        triggerContext.onMerge(mergedWindows);

        for (W m: mergedWindows) {
            triggerContext.window = m;
            triggerContext.clear();
            deleteCleanupTimer(m);
        }

        // 合并狀態
        windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);
    }
});

其中關鍵的方法是MergingWindowSet的addWindow方法,其中TimeWindow合并的細節在其mergeWindows方法中,合并的規則就是我們上面介紹的。

合并的主要過程如下:

1)找出合并之前的窗口集合和合并之后的窗口;

2)找出合并之后的窗口對應的狀態窗口(方式是從合并窗口集合中挑選第一個窗口的狀態窗口);

3)執行merge方法(合并窗口需要做的工作,也就是執行MergingWindowSet的addWindow方法)。

這里不好理解的是合并結果的窗口和結果對應的狀態窗口(用來獲取合并之后的數據),我們來看圖2-6。

045-1

圖2-6 合并窗口

MergingWindowSet(窗口合并的工具類)中有個map,用來保存窗口和狀態窗口的對應關系,那么怎么理解這個狀態窗口呢?如果我們在得到TimeWindow(1,4)時基于TimeWindow(1,4)在狀態中保存了數據(數據A),也就是說狀態的命名空間是TimeWindow(1,4),在得到TimeWindow(5,8)時基于TimeWindow(5,8)在狀態中保存了數據(數據B),當第三個數據(數據C)來的時候,又經過合并窗口得到了TimeWindow(1,8),那么怎么獲取合并窗口的數據集AB呢?顯然我們還需要原來的TimeWindow(1,4)或者TimeWindow(5,8),原來的TimeWindow(1,4)在這里就是狀態窗口。

這里窗口合并的同時會把窗口對應的狀態所保存的數據合并到結果窗口對應的狀態窗口對應的狀態中。這里有點繞,還是看圖2-6,最終合并窗口的結果窗口是TimeWindow(1,8)。我們怎么獲取TimeWindow(1,8)對應的數據集ABC呢?這個時候可以通過MergingWindowSet中保存的TimeWindow(1,8)對應的狀態窗口TimeWindow(1,4)來獲取合并后的狀態,即數據集ABC。

會話窗口的其他過程與滑動窗口及滾動窗口沒有什么區別。

4. 全局窗口

全局窗口(Global Window),顧名思義就是所有的元素都分配到同一個窗口中,我們常用的Count Window就是一種全局窗口。其實現GlobalWindow的主要方法如下:

public Collection<GlobalWindow> assignWindows(Object element, long timestamp,
        WindowAssignerContext context) {
    return Collections.singletonList(GlobalWindow.get());
}

這里需要說明的是全局窗口和Non-Keyed Window是完全不同的概念:Non-Keyed Window是指并發為1的窗口,可以是滾動窗口或者滑動窗口;而全局窗口既可以是Non-Keyed Window,也可以是Keyed Window。

2.3.4 觸發器

本節主要介紹窗口中觸發器的作用以及幾種典型觸發器的實現。

觸發器決定窗口函數什么時候執行以及執行的狀態。觸發器通過返回值來決定什么時候執行,其返回值有如下幾種類型。

  • CONTINUE:什么也不做。
  • FIRE:觸發窗口的計算。
  • PURGE:清除窗口中的數據。
  • FIRE_AND_PURGE:觸發計算并清除數據。

其接口定義如下(列出主要方法):

public abstract class Trigger<T, W extends Window> implements Serializable {
    //每個增加到窗口中的數據都需要調用該方法,根據返回結果判定窗口是否觸發
    public abstract TriggerResult onElement(T element, long timestamp,
            W window, TriggerContext ctx) throws Exception;
    //當注冊的系統時間定時器到期后調用,其調用是通過WindowOperator中的triggerContext進行的
    public abstract TriggerResult onProcessingTime(long time, W window,
            TriggerContext ctx) throws Exception;
    //當注冊的事件時間定時器到期后調用,其調用是通過WindowOperator中的triggerContext進行的
    public abstract TriggerResult onEventTime(long time, W window,
            TriggerContext ctx) throws Exception;
    //主要用在sessionWindow,窗口合并的時候調用
    public void onMerge(W window, OnMergeContext ctx) throws Exception {
        throw new UnsupportedOperationException("This trigger does not support merging.");
    }
}

Flink實現了幾種常用的觸發器。

  • EventTimeTrigger:當水位線大于窗口的結束時間時觸發,一般用在事件時間的語義下。
  • ProcessingTimeTrigger:當系統時間大于窗口結束時間時觸發,一般用在系統時間的語義下。
  • CountTrigger:當窗口中的數據量大于一定值時觸發。
  • DeltaTrigger:根據閾值函數計算出的閾值來判斷窗口是否觸發。

其中經常會用到的是根據系統時間和事件來判斷窗口是否觸發的觸發器,我們來看下其實現過程。

我們先來看ProcessingTimeTrigger是怎么實現的。

@Override
public TriggerResult onElement(Object element, long timestamp,
        TimeWindow window, TriggerContext ctx) {
    ctx.registerProcessingTimeTimer(window.maxTimestamp());
    return TriggerResult.CONTINUE;
}

在onElement方法中,調用triggerContext注冊了窗戶最大時間的定時器,tiggerContext中調用InternalTimerService來進行定時器注冊。InternalTimerService是Flink內部定時器的存儲管理類。整個調用及實現過程如圖2-7所示。

048-1

圖2-7 ProcessingTimeTrigger

InternalTimerServiceImpl內部維護了一個有序的隊列,用來存儲定時器(TimerHeap-InternalTimer),并且利用ProcessingTimeService來延遲調度基于系統時間生成的Trigger-Task。TriggerTask會調用InternalTimerServiceImpl的onProcessingTime方法,onProcessing-Time會調用真正的目標(WindowOperator)onProcessingTime方法,完成一次定時器的觸發。在InternalTimerServiceImpl調用onProcessingTime方法的過程中,會重設上下文(Context)的鍵,確保后續操作都是針對當前鍵對應的數據。

那么EventTimeTrigger和ProcessingTimeTrigger在實現上有什么不一樣呢?

首先我們知道,基于事件時間的觸發器必然與事件時間有關。而事件時間不是有序的,不能像系統時間那樣,用延遲任務來觸發。那么什么時候觸發基于事件時間的定時器呢?水位線(Watermark)在Flink中是用來推動基于事件時間的處理動作執行的,也就是說水位線代表了事件的最晚到達時間。我們就可以采用水位線來觸發基于事件時間的定時器,事實上Flink也是如此實現的,我們來看代碼:

@Override
public TriggerResult onElement(Object element, long timestamp,
        TimeWindow window, TriggerContext ctx) throws Exception {
    if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
        // 如果水位線經過窗口,那么就觸發
        return TriggerResult.FIRE;
    } else {
        ctx.registerEventTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;
    }
}

以上代碼是EventTimeTrigger的onElement方法,與ProcessingTimeTrigger一樣,如果條件不滿足,那就調用TriggerContext來注冊一個事件時間定時器,這里的依據是水位線是否大于窗口最大時間。同樣,TriggerContext會調用InternalTimerServiceImpl的registerEventTimeTimer來真正注冊定時器,InternalTimerServiceImpl注冊的動作也就是把定時器(TimerHeapInternalTimer)放到一個有序隊列中(eventTimeTimersQueue),之后就等水位線來觸發。

如圖2-8所示,整個觸發過程是通過StreamTask處理水位線來驅動的,經過一系列的調用,由InternalTimeServiceManager完成觸發器的觸發,觸發條件是水位線大于定時器的時間。

049-1

圖2-8 EventTimeTrigger

上面分析了EventTimeTrigger和ProcessingTimeTrigger的實現過程,其他觸發器,如CountTrigger相對簡單些,通過條件(數量是否大于閾值)就可以完成是否觸發的判斷,這里不再討論。下一節介紹當窗口完成觸發的時候,窗口函數怎么執行。

2.3.5 窗口函數

上一節分析了觸發器,本節來看下窗口觸發之后的計算過程,也就是窗口函數(Window Function)。

Flink中的窗口函數主要有ReduceFunction、AggregateFunction、ProcessWindow-Function三種(FoldFunction理論上可以通過AggregateFunction實現,并且Flink從1.8版本開始已經把該函數標記為Deprecated,因此該函數我們不再討論)。在實際使用中推薦使用前兩種,因為它們是增量計算,每條數據都會觸發計算,而且窗口狀態中只保留計算結果。而ProcessWindowFunction(或者使用了驅逐器之后)需要窗口把所有的數據保留下來,到窗口觸發的時候,調用窗口函數計算,效率比較低,而且會造成大量狀態緩存。下面我們詳細看下前兩種窗戶函數的實現。

1. ReduceFunction

ReduceFunction的接口定義如下:

public interface ReduceFunction<T> extends Function, Serializable {
    T reduce(T value1, T value2) throws Exception;
}

ReduceFunction是一個輸入、輸出類型一樣的簡單聚合函數,可以用來實現max()、min()、sum()等聚合函數。在WindowOperator中并不直接使用ReduceFunction作為算子的userFunction,而要經過層層包裝。主要包裝類有兩類。一類是WindowFunction,用來指導具體的窗口函數怎么計算。比如PassThroughWindowFunction,它表示不調用用戶的窗口函數,直接輸出結果,用來包裝ReduceFunction和AggregateFunction,因為這兩個函數在窗口觸發的時候已經計算好了結果,只需要發送結果即可。另一類是InternalWindowFunction的實現類,主要用來封裝窗口數據的類型,然后實際調用前面講的第一類包裝窗口類。這么講有點抽象,我們具體來看ReduceFunction函數在Flink中是怎么調用的。

我們看在WindowStream中調用reduce方法之后會發生什么。

public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> function) {
    if (function instanceof RichFunction) {
        throw new UnsupportedOperationException(
            "ReduceFunction of reduce can not be a RichFunction. "
            + "Please use reduce(ReduceFunction, WindowFunction) instead.");
    }
    // 清除閉包
    function = input.getExecutionEnvironment().clean(function);
    return reduce(function, new PassThroughWindowFunction<K, W, T>());
}

接著調用重載的reduce方法(下面只列出關鍵代碼):

public <R> SingleOutputStreamOperator<R> reduce(
        ReduceFunction<T> reduceFunction,
        WindowFunction<T, R, K, W> function,
        TypeInformation<R> resultType) {

    operator = new WindowOperator<>(
            windowAssigner,
            windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
            keySel,
            input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
            stateDesc,
            new InternalSingleValueWindowFunction<>(function),
            trigger,
            allowedLateness,
            lateDataOutputTag);
}

可以看到,最終傳給WindowOperartor的function是一個new InternalSingleValue-WindowFunction (new PassThroughWindowFunction())的實例對象。PassThroughWindow-Function我們在前面講過,該函數什么也不做,只是把輸出發送出去。再看InternalSingle-ValueWindowFunction,它也是基本什么都不做(只是把單個input對象轉為集合對象,這就是我們剛才說的,該類包裝類用來把輸入轉換為合適的類型),只是調用剛才傳入它內部的PassThroughWindowFunction,WindowOperator最終拿到的窗口函數就是把結果發送出去,不進行任何計算。

那么我們傳入的ReduceFunction怎么起作用?什么時候調用呢?我們來看ReduceFunction傳入WindowedStream之后用在了哪里,還是剛才的reduce方法:

ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>(
    "window-contents",
    reduceFunction,
    input.getType().createSerializer(getExecutionEnvironment().getConfig()));

由這樣一段代碼可以看到,reduceFunction被放到了StateDescriptor中,用來生成我們需要的ReducingState,并且reduceFunction被傳遞給ReducingState,用來進行真正的計算。我們來看ReducingState的實現類RocksDBReducingState的add方法:

public void add(V value) throws Exception {
    byte[] key = getKeyBytes();
    V oldValue = getInternal(key);
    // 這里reduceFunction函數被調用
    V newValue = oldValue == null ? value : reduceFunction.reduce(oldValue, value);
    updateInternal(key, newValue);
}

這里可以再看下圖2-4所示的窗口算子執行流程圖,會更清晰易懂。

2. AggregateFunction

AggregateFunction是對ReduceFunction的擴展,可以接受三種類型的參數——輸出、計算和輸出,它的適用范圍比ReduceFunction更廣。其實現過程與ReduceFunction基本一致,這里不再贅述。

到這里窗口的主要概念和設計實現原理都介紹完了,大家如果有興趣,可以根據本章的介紹去分別實現一種自己定制的窗口分配器、觸發器及窗口函數。

主站蜘蛛池模板: 新民市| 龙山县| 浑源县| 鸡泽县| 漳州市| 嘉义市| 尼勒克县| 朝阳区| 将乐县| 许昌县| 东乌珠穆沁旗| 双鸭山市| 桐柏县| 米脂县| 龙州县| 杨浦区| 连州市| 新昌县| 阳原县| 黔西| 邓州市| 安达市| 司法| 清水县| 惠州市| 大埔区| 松江区| 娄底市| 和硕县| 太康县| 车致| 郴州市| 酒泉市| 博野县| 新密市| 措美县| 宁蒗| 盖州市| 东山县| 遂川县| 林州市|