官术网_书友最值得收藏!

Handling Disposal with Observable.create()

If your Observable.create() is returning a long-running or infinite Observable, you should ideally check the isDisposed() method of ObservableEmitter regularly, to see whether you should keep sending emissions. This prevents unnecessary work from being done if the subscription is no longer active.

In this case, you should use Observable.range(), but for the sake of the example, let's say we are emitting integers in a for loop in Observable.create(). Before emitting each integer, you should make sure that ObservableEmitter does not indicate that a disposal was called:

    import io.reactivex.Observable;

public class Launcher {
public static void main(String[] args) {
Observable<Integer> source =
Observable.create(observableEmitter -> {
try {
for (int i = 0; i < 1000; i++) {
while (!observableEmitter.isDisposed()) {
observableEmitter.onNext(i);
}
if (observableEmitter.isDisposed())
return;
}
observableEmitter.onComplete();
} catch (Throwable e) {
observableEmitter.onError(e);
}
});
}
}

If your Observable.create() is wrapped around some resource, you should also handle the disposal of that resource to prevent leaks. ObservableEmitter has the setCancellable() and setDisposable() methods for that. In our earlier JavaFX example, we should remove the  ChangeListener from our JavaFX ObservableValue when a disposal occurs. We can provide a lambda to setCancellable(), which will execute the following action for us, which will occur when dispose() is called:

    private static <T> Observable<T> valuesOf(final ObservableValue<T> 
fxObservable) {
return Observable.create(observableEmitter -> {

//emit initial state
observableEmitter.onNext(fxObservable.getValue());

//emit value changes uses a listener
final ChangeListener<T> listener =
(observableValue, prev, current) ->
observableEmitter.onNext(current);

//add listener to ObservableValue
fxObservable.addListener(listener);

//Handle disposing by specifying cancellable
observableEmitter.setCancellable(() ->
fxObservable.removeListener(listener));
});
}
主站蜘蛛池模板: 漳浦县| 南通市| 新津县| 杭锦后旗| 洮南市| 化德县| 温宿县| 泗水县| 金平| 台南市| 贡觉县| 文水县| 新蔡县| 太原市| 中牟县| 保康县| 黔东| 潜山县| 阿坝县| 车致| 沭阳县| 垦利县| 江津市| 邛崃市| 黄陵县| 灵武市| 甘谷县| 尖扎县| 伽师县| 绥德县| 额尔古纳市| 博兴县| 上杭县| 侯马市| 贡觉县| 平顶山市| 锡林郭勒盟| 贡山| 宁化县| 伊通| 合山市|