官术网_书友最值得收藏!

Producing and consuming streams

At this point, we should be familiar enough with the RxJava library to create our first small application. Let's define a stream that is represented by the Observable class. At the moment, we may assume that the Observable is a sort of generator that knows how to propagate events for subscribers as soon as they subscribe:

Observable<String> observale = Observable.create(
new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> sub) { // (1)
sub.onNext("Hello, reactive world!"); // (2)
sub.onCompleted(); // (3)
}
}
);

So, here we create an Observable with a callback that will be applied as soon as the Subscriber appears (1). At that moment, our Observer will produce a one string value (2) and then signal the end of the stream to the subscriber (3). We can also improve this code using the Java 8 lambdas:

Observable<String> observable = Observable.create(
sub -> {
sub.onNext("Hello, reactive world!");
sub.onCompleted();
}
);

In contrast with the Java Stream API, Observable is reusable, and each subscriber will receive the Hello, reactive world! event just after the subscription.

Note that, from RxJava 1.2.7 onward, the  Observable creation has been deprecated and treated as unsafe because it may generate too many elements and overload the subscriber. In other words, this approach does not support backpressure, a concept that we are going to examine later in detail. However, that code is still valid for the sake of introduction.

So, now we need a Subscriber, as shown in the following code:

Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) { // (1)
System.out.println(s);
}

@Override
public void onCompleted() { // (2)
System.out.println("Done!");
}

@Override
public void onError(Throwable e) { // (3)
System.err.println(e);
}
};

As we can see, the Subscriber has to implement the Observer methods and define the reactions for new events (1), stream completion (2), and errors (3). Now, let's hook the observable and subscriber instances together:

observable.subscribe(subscriber);

When running the mentioned code, the program generates the following output:

Hello, reactive world!
Done!

Hooray! We have just written a small and simple reactive hello-world application! As we may suspect, we may rewrite this example using lambdas, as shown in the following code:

Observable.create(
sub -> {
sub.onNext("Hello, reactive world!");
sub.onCompleted();
}
).subscribe(
System.out::println,
System.err::println,
() -> System.out.println("Done!")
);

The RxJava library gives a lot of flexibility in order to create Observable and Subscriber instances. It is possible to create an Observable instance just by referencing elements, by using an old-style array, or from the Iterable collection, as follows:

Observable.just("1", "2", "3", "4");
Observable.from(new String[]{"A", "B", "C"});
Observable.from(Collections.emptyList());

It is also possible to reference a Callable (1) or even a Future (2), as shown in the following code:

Observable<String> hello = Observable.fromCallable(() -> "Hello ");  // (1)
Future<String> future =
Executors.newCachedThreadPool().submit(() -> "World");
Observable<String> world = Observable.from(future); // (2)

Moreover, along with the plain creational functionality, the Observable stream may be created by combining other Observable instances, which allows for easy implementation of pretty complicated workflows. For example, the concat() operator for each of the incoming streams consumes all items by re-sending them to the downstream observer. Incoming streams will then be processed until a terminal operation (onComplete(), onError()) occurs, and the order of processing is the same as the order of the concat() arguments. The following code demonstrates an example of the concat() usage:

Observable.concat(hello, world, Observable.just("!"))
.forEach(System.out::print);

Here, as part of a straightforward combination of a few Observable instances that use different origins, we also iterate through the result with the Observable.forEach() method in a way that is similar to the Java 8 Stream API. Such a program generates the following output:

Hello World!
Note that even though it is convenient to not define handlers for exceptions, in the case where an error occurs, the default Subscriber implementation throws rx.exceptions.OnErrorNotImplementedException .

主站蜘蛛池模板: 新宾| 利辛县| 昭通市| 宣威市| 定陶县| 突泉县| 长葛市| 墨玉县| 巴东县| 郓城县| 曲阜市| 东台市| 马尔康县| 望江县| 玛曲县| 祁连县| 民乐县| 青田县| 全椒县| 扶风县| 永嘉县| 多伦县| 鲁山县| 山东省| 和林格尔县| 沧州市| 南汇区| 德安县| 潜山县| 博野县| 通海县| 重庆市| 新晃| 临城县| 赣榆县| 阿巴嘎旗| 方城县| 林甸县| 乐昌市| 阳城县| 岑巩县|