官术网_书友最值得收藏!

SynchronousSink

The sink gets bounded to a subscriber of the publisher. It gets invoked via the consumer function, when a subscriber asks for data. For each invocation, the sink can be used to generate a maximum of one value event at a time. The sink can raise additional onCompletion or error events during the invocation.

It is important to note that the events generated by sink are synchronously consumed at the subscriber end. Let's reflect on the Fibonacci test that we wrote in the previous chapter:

Flux<Long> fibonacciGenerator = Flux.generate(
() -> Tuples.<Long, Long>of(0L, 1L),
(state, sink) -> {
sink.next(state.getT1());
System.out.println("generated "+state.getT1());
return Tuples.of(state.getT2(), state.getT1() + state.getT2());
});
fibonacciGenerator.take(size).subscribe(t -> {
System.out.println("consuming "+t);
fibonacciSeries.add(t);
});
Generating more that one event in the sink leads to java.lang.IllegalStateException: More than one call to onNext.

We have added additional print statements while generating and consuming numbers. Let's run our tests to see the output, as follows:

The consumer and producer statements are generated in an alternative manner. We can easily deduce that each number is consumed before the next number is generated. The Generate API is offered in multiple variants, and the sink can be used with or without an initial state. In our FibonacciGenerator, we used this with a state that is initialized on a per-subscriber basis. Optionally, we can also provide a terminal function, which gets invoked upon the termination of the events stream. This means that it will occur after the sink invokes an error or completion event. The terminal function can be used to perform any cleanup associated with the state.

主站蜘蛛池模板: 原阳县| 东光县| 屏边| 五河县| 辉南县| 共和县| 崇阳县| 怀仁县| 百色市| 永康市| 安康市| 嘉禾县| 义乌市| 错那县| 庐江县| 宁南县| 新巴尔虎右旗| 金阳县| 宁城县| 胶南市| 新津县| 靖州| 潜山县| 包头市| 冀州市| 郑州市| 台北县| 盘山县| 滦南县| 平遥县| 清河县| 庆阳市| 金昌市| 扬中市| 江源县| 绿春县| 醴陵市| 原阳县| 屏东市| 宁陕县| 资溪县|