官术网_书友最值得收藏!

Sample

One such combinator is sample, which allows us to sample an observable at a given interval, thus throttling the source observable's output. Let's apply it to our previous example:

(rx/subscribe (->> (rx/map vector 
                           (.sample (fast-producing-obs) 200 
                                    TimeUnit/MILLISECONDS) 
                           (slow-producing-obs)) 
                   (rx/map (fn [[x y]] 
                             (+ x y))) 
                   (rx/take 10)) 
              prn-to-repl 
              (fn [e] (prn-to-repl "error is " e))) 
 
;; 204 
;; 404 
;; 604 
;; 807 
;; 1010 
;; 1206 
;; 1407 
;; 1613 
;; 1813 
;; 2012 

The only change is that we call sample on our fast producing observable before calling map. We will sample it every 200 milliseconds.

By ignoring all other items emitted in this time slice, we have mitigated our initial problem, even though the original observable doesn't support any form of backpressure.

The sample combinator is only one of the combinators that's useful in such cases. Others include throttleFirst, debounce, buffer, and window. One drawback of this approach, however, is that a lot of the items generated end up being ignored.

Depending on the type of application we are building, this might be an acceptable compromise. But what if we are interested in all of the items?

主站蜘蛛池模板: 古田县| 西宁市| 靖安县| 加查县| 游戏| 梅河口市| 萨嘎县| 陇西县| 平潭县| 嫩江县| 仪陇县| 锦州市| 夹江县| 富顺县| 东阿县| 无极县| 苏尼特右旗| 新巴尔虎右旗| 阳泉市| 抚顺市| 武胜县| 利辛县| 商丘市| 卫辉市| 武鸣县| 玉山县| 芒康县| 洛阳市| 玛曲县| 太仓市| 长汀县| 四会市| 蒙阴县| 广河县| 昌江| 崇仁县| 潮安县| 双江| 邢台县| 双峰县| 开鲁县|