官术网_书友最值得收藏!

Reactor in action 

Let's learn more about the reactor API with a practical example. Create a new Maven project similar to what we created in the Anatomy of RxJava section. The current version of the Project Reactor at the time of writing  is 3.2.6. We need to provide a Maven dependency for the reactor as follows:

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>reactor-demo</groupId>
<artifactId>simple-reactor-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>Smiple Reactor Dmo</name>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.2.6.RELEASE</version>
</dependency>
</dependencies>
</project>

When we define a Reactor dependency, Reactive Streams JAR will be added as a transitive dependency. Next, is to add a Java class as follows:

public class ReactorBasic {
private static List<String> carModels = Arrays.asList(
"Era","Magna","Sportz","Astha","Astha(O)");
public static void main(String args[]) {
Flux<String> fewWords = Flux.just("Hello", "World");
Flux<String> manyWords = Flux.fromIterable(carModels);
Mono<String> singleWord = Mono.just("Single value");
fewWords.subscribe(t->System.out.println(t));
System.out.println("-----------------------------");
manyWords.subscribe(System.out::println);
System.out.println("-----------------------------");
singleWord.subscribe(System.out::println);
}
}

We have used Flux and Mono to create various publishers. The  just() method is used to populate the stream. We can also reach the iterable types (like List, Set, n) to form a data stream with the fromIterable() method. A few other methods like from(), fromArray() , and fromStream() are used to construct data streams from other producers, arrays, and existing Java streams, respectively, and can be used as follows:

public class ReactorFromOtherPublisher {
public static void main(String[] args) {
Flux<String> fewWords = Flux.just("One","Two");
/* from array */
Flux<Integer> intFlux = Flux.fromArray(new Integer[]{1,2,3,4,5,6,7});
/* from Java 8 stream */
Flux<String> strFlux = Flux.fromStream(Stream.of(
"Ten", "Hundred", "Thousand", "Ten Thousands", "Lac","Ten Lac", "Crore"));
/* from other Publisher */
Flux<String> fromOtherPublisherFlux = Flux.from(fewWords);
intFlux.subscribe(System.out::println);
strFlux.subscribe(System.out::println);
fromOtherPublisherFlux.subscribe(System.out::println);
}
}

The subscriber can be plugged with the subscribe() method. This is similar to what we have done with Observable in RxJava. With Flux, we can create a publisher with the finite or infinite stream.

We can also control to generate a stream with a value or just an empty stream. All of that can be done with a few utility methods provided by the Flux class as follows:

  • Flux.empty(): It is used to generate an empty stream having no values and only executes completion events.
  • Flux.error(): It is used to signal the error condition by generating an error stream with no any value but only errors.
  • Flux.never(): As its name suggests, it generates a stream with no events of any type.
  • Flux.defer(): It is used to construct a publisher when a subscriber makes the subscription to Flux. In short, it is lazy in nature.
主站蜘蛛池模板: 七台河市| 凉城县| 鄢陵县| 昭平县| 调兵山市| 通许县| 崇明县| 扬中市| 凤台县| 琼中| 天祝| 桦南县| 浪卡子县| 金昌市| 叶城县| 东莞市| 云林县| 湘潭县| 千阳县| 吕梁市| 缙云县| 铜鼓县| 黄石市| 北宁市| 鄢陵县| 象州县| 泊头市| 喀什市| 革吉县| 兴业县| 册亨县| SHOW| 丰都县| 东阳市| 孝义市| 天峨县| 报价| 彭州市| 宜兰县| 杭锦后旗| 玛曲县|