官术网_书友最值得收藏!

Flux

Let's describe how data flows through the Flux class with the following marble diagram:

Diagram 4.2 An example of the Flux stream transformed into another Flux stream

Flux defines a usual reactive stream that can produce zero, one, or many elements; even potentially an infinite amount of elements. It has the following formula:

onNext x 0..N [onError | onComplete]

It is not very common to work with infinite data containers in the imperative world, but it is pretty common with functional programming. The following code may produce a simple endless Reactive Stream:

Flux.range(1, 5).repeat()

This stream repeatedly produces numbers from 1 to 5 (the sequence would look like—1, 2, 3, 4, 5, 1, 2,...). This is not a problem and it will not blow up the memory as each element can be transformed and consumed without the need to finish creating the whole stream. Furthermore, the subscriber can cancel the subscription at any time and effectively transform an endless stream into a finite stream.

Beware: an attempt to collect all elements emitted by an endless stream may cause an OutOfMemoryException. It is not recommended to do so in production applications, but the simplest way to reproduce such behavior may be with the following code:

Flux.range(1, 100)                                                  // (1)
.repeat() // (2)
.collectList() // (3) .block(); // (4)

In the preceding code, we do the following:

  1. The range operator creates a sequence of integers starting from 1 up to 100 (inclusive).
  2. The repeat operator subscribes to the source reactive stream again and again after the source stream finishes. So, the repeat operator subscribes to the results of the stream operator, receives elements 1 to 100 and the onComplete signal, and then subscribes again, receives elements 1 to 100, and so on, without stopping.
  1. With the collectList operator, we are trying to gather all produced elements into a single list. Of course, because the repeat operator generates an endless stream, elements arrive and increase the size of the list so it consumes all the memory and causes the application to fail with the following error—java.lang.OutOfMemoryError: Java heap space. Our application has just run out of free heap memory.
  2. The block operator triggers an actual subscription and blocks the running thread until the final result arrives, which, in the current case, cannot happen as the reactive stream is endless.
主站蜘蛛池模板: 离岛区| 藁城市| 夹江县| 石屏县| 册亨县| 平陆县| 井研县| 瓦房店市| 台南市| 顺平县| 哈巴河县| 夏河县| 巴中市| 铜陵市| 昌吉市| 平阴县| 永善县| 东宁县| 上思县| 景德镇市| 莲花县| 望城县| 凉山| 博白县| 湟源县| 金坛市| 丰顺县| 台湾省| 黎城县| 香格里拉县| 报价| 临清市| 皋兰县| 浦县| 灵寿县| 商都县| 荣成市| 新密市| 宜兰市| 平遥县| 宣武区|