官术网_书友最值得收藏!

Stream publisher

As we discussed in the previous chapter, the publisher is responsible for the generation of unbounded asynchronous events, and it pushes them to the associated subscribers. It is represented by the org.reactivestreams.Publisher interface, as follows:

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

The interface provides a single subscribe method. The method is invoked by any party that is interested in listening to events published by the publisher. The interface is quite simple, and it can be used to publish any type of event, be it a UI event (like a mouse-click) or a data event.

Since the interface is simple, let's add an implementation for our custom FibonacciPublisher:

public class FibonacciPublisher implements Publisher<Integer> {
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
int count = 0, a = 0, b = 1;
while (count < 50) {
int sum = a + b;
subscriber.onNext(b);
a = b;
b = sum;
count++;
}
subscriber.onComplete();
}
}

This implementation may look good, but does it comply to publisher behavior according to the specification? The specification prescribes rules that describe publisher behavior. A publisher must generate the following four types of events:

  • Subscription event
  • Data of type T, as declared by the publisher
  • Completion event
  • Error event

According to the specification, a publisher can emit any number of data events. However, it must publish only one event for completion, error, and subscription. Once a completion or an error event is published, the publisher can no longer send data events back to a subscriber.

As backpressure is an important aspect of the specification, a publisher cannot push an arbitrary number of events to a subscriber. Instead, the subscriber must specify how many events it can receive, and a publisher must publish events equal to, or less than, the specified number.

In order to validate a publisher, the Reactive Streams API has published a test compatibility kit. Let's add the reactive-streams-tck in the build.gradle to our project:

dependencies {
// rest removed for brevity
testCompile group: 'org.reactivestreams',
name: 'reactive-streams-tck', version: '1.0.2'
}

The Technology Compatibility Kit (TCK) provides a PublisherVerifier interface that must be implemented in order to validate a publisher. It provides the following two methods:

  • createPublisher(long): This method must provide an instance of the publisher that can produce the specified number of events
  • createFailedPublisher(): This method must try to build a publisher that has raised an error event

Let's add the following implementation to test our FibonacciPublisher:

public class FibonacciPublisherVerifier extends PublisherVerification<Integer> {
public FibonacciPublisherVerifier(){
super(new TestEnvironment());
}
@Override
public Publisher<Integer> createFailedPublisher() {
return null;
}
@Override
public Publisher<Integer> createPublisher(long elements) {
return new FibonacciPublisher();
}
}

Now, let's run the test case to determine whether we comply with the Reactive Streams publisher specification:

As shown in the preceding screenshot, there are around 20 test failures and 16 skipped tests. We could fix each one of them, but the aim here is to understand that even a simple interface of a publisher is governed by many behavior specifications. Therefore, it is overkill to build a custom publisher. As service builders, we can use the Reactor framework. This provides publisher implementations capable of publishing any kind of data.

主站蜘蛛池模板: 巩留县| 桐柏县| 芜湖市| 和龙市| 兴业县| 拜城县| 岫岩| 铜川市| 钦州市| 揭阳市| 都匀市| 嘉善县| 娄烦县| 始兴县| 历史| 宁安市| 新河县| 蓬莱市| 腾冲县| 庆元县| 九寨沟县| 乌海市| 开封市| 任丘市| 高唐县| 和政县| 佛教| 金门县| 故城县| 石景山区| 苏州市| 东海县| 黄平县| 沙田区| 昌江| 东阳市| 南城县| 菏泽市| 泊头市| 巧家县| 吉安市|