官术网_书友最值得收藏!

Backpressure

Another issue we might be faced with is observables that produce items faster than we can consume them. The problem that arises in this scenario is to do with the ever-growing backlog of items.

As an example, think about zipping two observables together. The zip operator (or map in RxClojure) will only emit a new value when all observables have emitted an item.

So, if one of these observables is a lot faster at producing items than the others, map will need to buffer these items and wait for the others, which will most likely cause an error, as shown here:

(defn fast-producing-obs [] 
  (rx/map inc (Observable/interval 1 TimeUnit/MILLISECONDS))) 
 
(defn slow-producing-obs [] 
  (rx/map inc (Observable/interval 500 TimeUnit/MILLISECONDS))) 
 
(rx/subscribe (->> (rx/map vector 
                           (fast-producing-obs) 
                           (slow-producing-obs)) 
                   (rx/map (fn [[x y]] 
                             (+ x y))) 
                   (rx/take 10)) 
              prn-to-repl 
              (fn [e] (prn-to-repl "error is " e))) 
 
;; "error is " #<MissingBackpressureException rx.exceptions.MissingBackpressureException> 

As we can see in the preceding code, we have a fast-producing observable that emits items 500 times faster than the slower observable. Clearly, we can't keep up with it and, surely enough, Rx throws MissingBackpressureException.

What this exception is telling us is that the fast-producing observable doesn't support any type of backpressurewhat Rx calls Reactive pull backpressure—that is, consumers can't tell it to go slower. Thankfully, Rx provides us with combinators, which are helpful in these scenarios.

主站蜘蛛池模板: 渭南市| 寻乌县| 巩留县| 英吉沙县| 年辖:市辖区| 柳州市| 洱源县| 纳雍县| 鄢陵县| 合肥市| 卫辉市| 安阳市| 宜昌市| 宣化县| 长子县| 崇阳县| 精河县| 盘锦市| 专栏| 永川市| 泽库县| 墨竹工卡县| 阿拉善盟| 庐江县| 贵定县| 社会| 泾源县| 明星| 中山市| 将乐县| 精河县| 壶关县| 青州市| 将乐县| 图木舒克市| 鄱阳县| 睢宁县| 舟曲县| 延津县| 独山县| 台北市|