官术网_书友最值得收藏!

Subscription

The subscription is an important component in Reactive Streams. It provides the necessary control flow, so that publishers do not over-run a subscriber. This is known as backpressure.

Once the subscriber receives the subscription event, it must request that the publisher publish a specified count of events over their respective subscription. This is done by invoking the request(long) method of the subscription object.

As data events are generated, they are received by the subscriber. Once the limit has been reached, the publisher must stop publishing more events. As the subscriber processes these events, it must request additional events from the publisher:

public interface Subscription {
public void request(long n);
public void cancel();
}

The subscription object allows a subscriber to control the events it wants to receive. Whenever the subscriber determines that it no longer wants the events, it can invoke the cancel() method of the subscription. Once invoked, a subscriber may receive fewer data events, in accordance with the demand raised before the cancellation. Post-cancellation, the subscription will become void, meaning that it cannot be used to request additional data.

A value of Long.MaxValue for the request method would result in an infinite flow of events from the publisher.

A subscriber can cancel an active subscription with the onSubscribe() method before any demand can be raised using the request method. In this case, the publisher will drop the subscription without raising any events.

Now that we have gone over the subscriber interface in detail, we can try to build a FibonacciSubscriber, as follows:

public class FibonacciSubscriber implements Subscriber<Long> {
private Subscription sub;
@Override
public void onSubscribe(Subscription s) {
sub = s;
sub.request(10);
}
@Override
public void onNext(Long fibNumber) {
System.out.println(fibNumber);
sub.cancel();
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
sub=null;
}
@Override
public void onComplete() {
System.out.println("Finished");
sub=null;
}
}

The preceding implementation does the following things:

  1. Upon receiving the subscription event, a request is raised to handle 10 events.
  2. When received, all data events are printed to the output console.
  3. After processing a single event, the subscriber cancels the subscription.
  4. The onCompletion method sets the subscription to null.
  5. The onError method prints the error message to the console and sets the subscription as null.

Now, let's validate the subscriber by using the SubscriberBlackboxVerification<T> abstract class. We need to implement the createSubsciber() method, as shown in the following code:

public class FibonacciSubsciberVerification extends SubscriberBlackboxVerification<Long> {
public FibonacciSubsciberVerification(){
super(new TestEnvironment());
}
@Override
public Subscriber<Long> createSubscriber() {
return new FibonacciSubscriber();
}
@Override
public Long createElement(int element) {
return new Long(element);
}
}

Let's run the test case to determine whether our subscriber meets the Reactive Streams criteria:

Here, we can also find a large number of broken test cases. These broken test cases define the behavior for a subscriber. We could fix these, but the better option would be to use Reactor to create our services. In the following section, we will describe the publisher and subscriber implementations available in Reactor. These implementations conform to the specification behaviors.

主站蜘蛛池模板: 武邑县| 林周县| 凌海市| 沙洋县| 西宁市| 巴南区| 平远县| 汨罗市| 巴南区| 锡林郭勒盟| 新沂市| 奉贤区| 和静县| 石楼县| 台湾省| 宣恩县| 安福县| 山丹县| 河南省| 阿拉善左旗| 独山县| 杭锦旗| 界首市| 专栏| 资兴市| 汾西县| 广南县| 宜兰县| 广南县| 拉萨市| 邢台市| 专栏| 土默特右旗| 万源市| 瑞安市| 政和县| 苍山县| 色达县| 修文县| 卢氏县| 涪陵区|