- Learning RxJava
- Thomas Nield
- 159字
- 2021-07-02 22:22:55
Handling Disposal with Observable.create()
If your Observable.create() is returning a long-running or infinite Observable, you should ideally check the isDisposed() method of ObservableEmitter regularly, to see whether you should keep sending emissions. This prevents unnecessary work from being done if the subscription is no longer active.
In this case, you should use Observable.range(), but for the sake of the example, let's say we are emitting integers in a for loop in Observable.create(). Before emitting each integer, you should make sure that ObservableEmitter does not indicate that a disposal was called:
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable<Integer> source =
Observable.create(observableEmitter -> {
try {
for (int i = 0; i < 1000; i++) {
while (!observableEmitter.isDisposed()) {
observableEmitter.onNext(i);
}
if (observableEmitter.isDisposed())
return;
}
observableEmitter.onComplete();
} catch (Throwable e) {
observableEmitter.onError(e);
}
});
}
}
If your Observable.create() is wrapped around some resource, you should also handle the disposal of that resource to prevent leaks. ObservableEmitter has the setCancellable() and setDisposable() methods for that. In our earlier JavaFX example, we should remove the ChangeListener from our JavaFX ObservableValue when a disposal occurs. We can provide a lambda to setCancellable(), which will execute the following action for us, which will occur when dispose() is called:
private static <T> Observable<T> valuesOf(final ObservableValue<T>
fxObservable) {
return Observable.create(observableEmitter -> {
//emit initial state
observableEmitter.onNext(fxObservable.getValue());
//emit value changes uses a listener
final ChangeListener<T> listener =
(observableValue, prev, current) ->
observableEmitter.onNext(current);
//add listener to ObservableValue
fxObservable.addListener(listener);
//Handle disposing by specifying cancellable
observableEmitter.setCancellable(() ->
fxObservable.removeListener(listener));
});
}
- Java程序設計實戰教程
- Android項目開發入門教程
- Vue.js快跑:構建觸手可及的高性能Web應用
- 算法大爆炸:面試通關步步為營
- MATLAB應用與實驗教程
- 深入淺出RxJS
- C程序設計實踐教程
- 實戰Python網絡爬蟲
- Python Django Web從入門到項目實戰(視頻版)
- 川哥教你Spring Boot 2實戰
- Learning Azure DocumentDB
- 大象:Thinking in UML(第二版)
- Getting Started with Backbone Marionette
- 編程改變生活:用PySide6/PyQt6創建GUI程序(進階篇·微課視頻版)
- INSTANT Eclipse Application Testing How-to