官术网_书友最值得收藏!

  • Learning RxJava
  • Thomas Nield
  • 954字
  • 2021-07-02 22:22:51

Using Observable.create()

Let's start with creating a source Observable using Observable.create(). Relatively speaking, a source Observable is an Observable where emissions originate from and is the starting point of our Observable chain.

The Observable.create() factory allows us to create an Observable by providing a lambda receiving an Observable emitter. We can call the Observable emitter's onNext() method to pass emissions  (one a time) up the chain as well as onComplete() to signal completion and communicate that there will be no more items. These onNext() calls will pass these items up the chain towards the Observer, where it will print each item, as shown in the following code snippet:

    import io.reactivex.Observable;

public class Launcher {

public static void main(String[] args) {

Observable<String> source = Observable.create(emitter -> {
emitter.onNext("Alpha");
emitter.onNext("Beta");
emitter.onNext("Gamma");
emitter.onNext("Delta");
emitter.onNext("Epsilon");
emitter.onComplete();
});

source.subscribe(s -> System.out.println("RECEIVED: " + s));
}
}

The output is as follows:

    RECEIVED: Alpha
RECEIVED: Beta
RECEIVED: Gamma
RECEIVED: Delta
RECEIVED: Epsilon
In RxJava 1.0, ensure that you use Observable.fromEmitter() instead of Observable.create(). The latter is something entirely different in RxJava 1.0 and is only for advanced RxJava users.

The onNext() method is a way to hand each item, starting with Alpha, to the next step in the chain. In this example, the next step is the Observer, which prints the item using the s -> System.out.println("RECEIVED: " + s) lambda. This lambda is invoked in the onNext() call of Observer, and we will  look at Observer more closely in a moment.

Note that the Observable contract ( http://reactivex.io/documentation/contract.html) dictates that emissions must be passed sequentially and one at a time. Emissions cannot be passed by an Observable concurrently or in parallel. This may seem like a limitation, but it does in fact simplify programs and make Rx easier to reason with. We will learn some powerful tricks to effectively leverage concurrency and parallelization in Chapter 6, Concurrency and Parallelization , without breaking the Observable contract.

The onComplete() method is used to communicate up the chain to the Observer that no more items are coming. Observables can indeed be infinite, and if this is the case, the onComplete() event will never be called. Technically, a source could stop emitting onNext() calls and never call onComplete(). This would likely be bad design, though, if the source no longer plans to send emissions.

Although this particular example is unlikely to throw an error, we can catch errors that may occur within our Observable.create() block and emit them through onError(). This way, the error can be pushed up the chain and handled by the Observer. This particular Observer that we have set up does not handle exceptions, but you can do that, as shown here:

    import io.reactivex.Observable;

public class Launcher {

public static void main(String[] args) {

Observable<String> source = Observable.create(emitter -> {
try {
emitter.onNext("Alpha");
emitter.onNext("Beta");
emitter.onNext("Gamma");
emitter.onNext("Delta");
emitter.onNext("Epsilon");
emitter.onComplete();
} catch (Throwable e) {
emitter.onError(e);
}
});

source.subscribe(s -> System.out.println("RECEIVED: " + s), Throwable::printStackTrace);
}
}

Note that onNext(), onComplete(), and onError() do not necessarily push directly to the final Observer. They can also push to an operator serving as the next step in the chain. In the following code, we derive new Observables with the map() and filter() operators, which will act between the source Observable and final Observer printing the items:

    import io.reactivex.Observable;

public class Launcher {
public static void main(String[] args) {
Observable<String> source = Observable.create(emitter -> {
try {
emitter.onNext("Alpha");
emitter.onNext("Beta");
emitter.onNext("Gamma");
emitter.onNext("Delta");
emitter.onNext("Epsilon");
emitter.onComplete();
} catch (Throwable e) {
emitter.onError(e);
}
});
Observable<Integer> lengths = source.map(String::length);

Observable<Integer> filtered = lengths.filter(i -> i >= 5);

filtered.subscribe(s -> System.out.println("RECEIVED: " +
s));
}
}

This is the output after running the code: 

    RECEIVED: 5
RECEIVED: 5
RECEIVED: 5
RECEIVED: 7

With the map() and filter() operators between the source Observable and Observer,  onNext() will hand each item to the  map() operator. Internally, it will act as an intermediary Observer and convert each string to its length(). This, in turn, will call onNext() on filter() to pass that integer, and the lambda condition i -> i >= 5 will suppress emissions that fail to be at least five characters in length. Finally, the filter() operator will call onNext() to hand each item to the final Observer where they will be printed.

It is critical to note that the map() operator will yield a new Observable<Integer> derived off the original Observable<String>. The filter()will also return an Observable<Integer> but ignore emissions that fail to meet the criteria. Since operators such as map() and filter() yield new Observables (which internally use Observer implementations to receive emissions), we can chain all our returned Observables with the next operator rather than unnecessarily saving each one to an intermediary variable:

    import io.reactivex.Observable;

public class Launcher {
public static void main(String[] args) {
Observable<String> source = Observable.create(emitter -> {
try {
emitter.onNext("Alpha");
emitter.onNext("Beta");
emitter.onNext("Gamma");
emitter.onNext("Delta");
emitter.onNext("Epsilon");
emitter.onComplete();
} catch (Throwable e) {
emitter.onError(e);
}
});
source.map(String::length)
.filter(i -> i >= 5)
.subscribe(s -> System.out.println("RECEIVED: " + s));
}
}

The output is as follows:

    RECEIVED: 5
RECEIVED: 5
RECEIVED: 5
RECEIVED: 7

Chaining operators in this way is common (and encouraged) in reactive programming. It has a nice quality of being readable from left to right and top to bottom much like a book, and this helps in maintainability and legibility.

In RxJava 2.0, Observables no longer support emitting null values. You will immediately get a non-null exception if you create an Observable that attempts to emit a null value. If you need to emit a null, consider wrapping it in a Java 8 or Google Guava Optional.
主站蜘蛛池模板: 成武县| 梅州市| 河曲县| 涟源市| 天水市| 永定县| 鱼台县| 余姚市| 白水县| 麟游县| 云阳县| 太白县| 南丰县| 太原市| 汽车| 扎鲁特旗| 呼伦贝尔市| 肃北| 阿图什市| 湟源县| 大姚县| 上饶县| 上饶县| 军事| 普宁市| 鄢陵县| 哈尔滨市| 西充县| 建宁县| 花莲县| 威海市| 黑水县| 定襄县| 平顶山市| 赣州市| 五河县| 柳河县| 分宜县| 托里县| 东乌珠穆沁旗| 桓台县|