- Hands-On Reactive Programming in Spring 5
- Oleh Dokuka Igor Lozynskyi
- 376字
- 2021-07-23 16:36:21
Generating an asynchronous sequence
RxJava makes it possible to generate not only one event in the future, but an asynchronous sequence of events based, for example, on a time interval, as shown in the following code:
Observable.interval(1, TimeUnit.SECONDS)
.subscribe(e -> System.out.println("Received: " + e));
Thread.sleep(5000); // (1)
In that case, the output is as following:
Received: 0
Received: 1
Received: 2
Received: 3
Received: 4
Also, if we remove Thread.sleep(...) (1), our application will exit without any output. This happens because events would be generated and therefore consumed in a separate daemon thread. So, to prevent the main thread from finishing the execution, we may sleep() or do some other useful tasks.
Of course, there is something that controls the Observer-Subscriber cooperation. This is called Subscription, and has the following interface declaration:
interface Subscription {
void unsubscribe();
boolean isUnsubscribed();
}
The unsubscribe() method allows the Subscriber to inform Observable that there is no need to send new events. In other words, the aforementioned code is a subscription cancellation. On the other hand, Observable uses isUnsubscribed() to check that the Subscriber is still waiting for events.
To understand the mentioned unsubscribe functionality, let's consider the case where a subscriber is the only party interested in the events, and consumes them until an external signal is propagated by CountDawnLatch (1). The incoming stream generates a new event every 100 milliseconds, and these events produce the endless sequence—0, 1, 2, 3... (3). The following code demonstrates how to get a Subscription (2) when defining a reactive stream. It also shows how to unsubscribe from a stream (4):
CountDownLatch externalSignal = ...; // (1)
Subscription subscription = Observable // (2)
.interval(100, MILLISECONDS) // (3)
.subscribe(System.out::println);
externalSignal.await();
subscription.unsubscribe(); // (4)
So here, the subscriber receives the events 0, 1, 2, 3, and then the externalSignal invocation occurs, which leads to the subscription cancellation.
At this point, we have already learned that reactive programming consists of an Observable stream, a Subscriber, and some sort of Subscription that communicates the intention of the Subscriber to receive events from the Observable producer. It is now time to transform the data flowing through the reactive streams.
- 物聯網安全(原書第2版)
- 網絡互聯技術(實踐篇)
- Windows Server 2003 Active Directory Design and Implementation: Creating, Migrating, and Merging Networks
- Wireshark網絡分析就這么簡單
- 中國互聯網發展報告2018
- 物聯網之霧:基于霧計算的智能硬件快速反應與安全控制
- WordPress Web Application Development
- Windows Server 2012 Hyper-V虛擬化管理實踐
- 異構蜂窩網絡關鍵理論與技術
- 精通SEO:100%網站流量提升密碼
- 圖解物聯網
- 網絡基本通信約束下的系統性能極限分析與設計
- 智能家庭網絡:技術、標準與應用實踐
- 夢工廠之沸騰關鍵幀:Maya動畫手冊
- EtherCAT工業以太網應用技術