- Hands-On Reactive Programming with Reactor
- Rahul Sharma
- 263字
- 2021-08-13 15:22:57
SynchronousSink
The sink gets bounded to a subscriber of the publisher. It gets invoked via the consumer function, when a subscriber asks for data. For each invocation, the sink can be used to generate a maximum of one value event at a time. The sink can raise additional onCompletion or error events during the invocation.
It is important to note that the events generated by sink are synchronously consumed at the subscriber end. Let's reflect on the Fibonacci test that we wrote in the previous chapter:
Flux<Long> fibonacciGenerator = Flux.generate(
() -> Tuples.<Long, Long>of(0L, 1L),
(state, sink) -> {
sink.next(state.getT1());
System.out.println("generated "+state.getT1());
return Tuples.of(state.getT2(), state.getT1() + state.getT2());
});
fibonacciGenerator.take(size).subscribe(t -> {
System.out.println("consuming "+t);
fibonacciSeries.add(t);
});
We have added additional print statements while generating and consuming numbers. Let's run our tests to see the output, as follows:

The consumer and producer statements are generated in an alternative manner. We can easily deduce that each number is consumed before the next number is generated. The Generate API is offered in multiple variants, and the sink can be used with or without an initial state. In our FibonacciGenerator, we used this with a state that is initialized on a per-subscriber basis. Optionally, we can also provide a terminal function, which gets invoked upon the termination of the events stream. This means that it will occur after the sink invokes an error or completion event. The terminal function can be used to perform any cleanup associated with the state.