Flux.create is another mechanism for generating events programmatically. It takes a FluxSink, which is capable of generating any number of events. The API is more generic than the Generate methods discussed in the previous section. The FluxSink is capable of generating events asynchronously. Moreover, it does not take subscription cancellation or backpressure into account. This means that even if the subscriber has cancelled its subscription, the create API will continue to generate events. All implementations must listen for the cancel event and explicitly initiate stream closure.
As for backpressure, the producer keeps generating the events without looking into any demand from the subscriber. These events are buffered and dropped by default if the subscription is lost.
To see how the two are different, let's modify our FibonacciGenerator to use a FluxSink. Some of the key differences are highlighted as follows:
There is no initial seed state in the API
TheFluxSinkkeeps generating the events, irrespective of the subscription state
We can generate any number of events in the sink
TheOnDisposeevent can be listened to in order to perform any cleanup, or to stop publishing events
All events that are generated are buffered and dropped once the subscription is cancelled
It is important to note that theFluxSinkprovides lifecycle callback methods, which can be used to perform additional cleanups, or any other action, as follows:
OnCancel: This method gets invoked when the subscription is cancelled.
OnDispose: Thismethod gets invoked when the subscription is closed due to a cancel, close, or error event.
OnRequest: This method is invoked with the value specified by the subscriber. It can be used to build a pull data model. When the method is invoked, the next method can be invoked for the specified number of the values:
@Test public void testFibonacciFluxSink() { Flux<Long> fibonacciGenerator = Flux.create(e -> { long current = 1, prev = 0; AtomicBoolean stop = new AtomicBoolean(false); e.onDispose(()->{ stop.set(true); System.out.println("******* Stop Received ****** "); }); while (current > 0) { e.next(current); System.out.println("generated " + current); long next = current + prev; prev = current; current = next; } e.complete(); }); List<Long> fibonacciSeries = new LinkedList<>(); fibonacciGenerator.take(50).subscribe(t -> { System.out.println("consuming " + t); fibonacciSeries.add(t); }); System.out.println(fibonacciSeries); }
Let's check the output that's generated, as follows:
Flux also provides a Push method. This is similar to the create method, but the process of how error and complete events are invoked varies. These events must be invoked in a synchronous manner, from a single thread producer.