官术网_书友最值得收藏!

Backpressure strategies

If an observable doesn't support backpressure but we are still interested in all of the items it emits, we can use one of the built-in backpressure combinators provided by Rx.

As an example, we will look at one such combinator, onBackpressureBuffer:

(rx/subscribe (->> (rx/map vector 
(.onBackpressureBuffer (fast-producing-obs))
(slow-producing-obs))
(rx/map (fn [[x y]]
(+ x y)))
(rx/take 10))
prn-to-repl
(fn [e] (prn-to-repl "error is " e))) ;; 2 ;; 4 ;; 6 ;; 8 ;; 10 ;; 12 ;; 14 ;; 16 ;; 18 ;; 20

This example is very similar to the one where we used sample, but the output is fairly different. This time, we get all of the items that have been emitted by both observables.

The onBackpressureBuffer strategy implements a strategy that simply buffers all of the items that are emitted by the slower Observable, emitting them whenever the consumer is ready. In our case, this happens every 500 milliseconds.

Other strategies include onBackpressureDrop and onBackpressureBlock.

It's worth noting that Reactive pull backpressure is still a work in progress and the best way to keep up to date with progress is on the RxJava wiki on the subject: https://github.com/ReactiveX/RxJava/wiki/Backpressure.

主站蜘蛛池模板: 湘阴县| 丰县| 修水县| 杭锦后旗| 白水县| 伊川县| 江门市| 安泽县| 洛宁县| 科技| 镇沅| 巧家县| 红原县| 方城县| 镇安县| 汉川市| 大余县| 崇礼县| 葵青区| 电白县| 方正县| 云霄县| 阳原县| 宜州市| 嘉峪关市| 南阳市| 家居| 游戏| 新巴尔虎右旗| 仁怀市| 蒲城县| 宣恩县| 信宜市| 华蓥市| 昆明市| 巫溪县| 宝丰县| 齐齐哈尔市| 扶沟县| 新绛县| 津南区|