官术网_书友最值得收藏!

  • Spring 5.0 Projects
  • Nilang Patel
  • 439字
  • 2021-07-02 12:35:04

Hot Observable

Hot Observable, on the other hand, has the producer created or activated outside of it. Hot Observable emits the stream that is shared by all observers. Let's see the example, as follows:

public class RxJavaHotObservable1 {
public static void main(String args[]) {
Observable<Long> observableInterval = Observable.interval(2, TimeUnit.SECONDS);
PublishSubject<Long> publishSubject = PublishSubject.create();
observableInterval.subscribe(publishSubject);
publishSubject.subscribe(i -> System.out.println("Observable #1 : "+i));
addDelay(4000);
publishSubject.subscribe(i -> System.out.println("Observable #2 : "+i));
addDelay(10000);
}
private static void addDelay(int miliseconds) {
try {
Thread.sleep(miliseconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

The observableInterval observable emits the event instead of data in this example. The interval method is used to emit sequential numbers at given intervals. We have used PublishSubject to make this observable as a hot type.  It can be behave as either Observable or Observer. It is part of the Observable chain in this case. We then simply add two subscribers to PublishSubject with some delay in between. You will get an output as follows:

The second Observer is subscribed after some delay to the first Observer. The Observable emits the sequential number every two seconds. The second Observer starts at the fourth second. Hot Observable emits just a single stream, which is shared across all Observers. So, in the case of the second Observer, the actual value is started from 2 instead of 0 as it subscribes after some time. 

In this sense, hot Observable can be compared with a subscription to a radio station. A person who starts listening will not be able to hear what was played before he subscribed, as it is common to all subscribers (or say Observers in Reactive language). There are other ways to create hot Observable. We will see one of them as follows:

public class RxJavaHotObservable2 {
public static void main(String args[]) {
Observable<Long> observableInt = Observable.interval(2, TimeUnit.SECONDS);
ConnectableObservable<Long> connectableIntObservable = observableInt.publish();
connectableIntObservable.subscribe(i -> System.out.println("Observable #1 : "+i));
connectableIntObservable.connect();
addDelay(7000);
connectableIntObservable.
subscribe(i -> System.out.println("Observable #2 : "+i));
addDelay(10000);
}

private static void addDelay(int miliseconds) {
try {
Thread.sleep(miliseconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

In this code, hot Observable is created with ConnectableObservable. It will not start emitting the data until the connect method is called on it, making it more controllable. Soon after the connect method is called, it will start a single stream, which is shared across the Observers. You will get an output as follows:

You can see how the second Observer missed the first few items as it was subscribed with some delay. You can convert any cold Observable to ConnectableObservable by calling the publish method on it. 

主站蜘蛛池模板: 竹北市| 长治市| 通化县| 黄骅市| 句容市| 界首市| 榆林市| 中宁县| 赤壁市| 揭东县| 通化市| 合江县| 康保县| 林西县| 郓城县| 石河子市| 勐海县| 广宁县| 稷山县| 安泽县| 屏东县| 大同市| 田林县| 滦南县| 金平| 精河县| 思南县| 蚌埠市| 遂川县| 平乐县| 澄迈县| 琼结县| 珠海市| 云阳县| 乃东县| 开远市| 陕西省| 汾西县| 东乌珠穆沁旗| 肇州县| 西峡县|