官术网_书友最值得收藏!

  • Spring 5.0 Projects
  • Nilang Patel
  • 353字
  • 2021-07-02 12:35:06

Custom subscribers

In a certain scenario, calling a Subscribe method on Publisher is not appropriate and you may want to write custom subscriber with own handling. Reactor framework provides support for defining custom subscribers by extending the reactor.core.publisher.BaseSubscriber<T> abstract class. You don't need to implement the Subscribe interface of Reactive Streams specification directly. Instead, you need to just extend this class to apply the custom implementation as follows:

static class CustomSubscriber extends BaseSubscriber<String>{
@Override
protected void hookOnSubscribe(Subscription subscription) {
System.out.println("Fetching the values ...!!");
subscription.request(10);
}
@Override
protected void hookOnNext(String value) {
System.out.println("Fetchig next value in hookOnNext()-->"+value);
}
@Override
protected void hookOnComplete() {
System.out.println("Congratulation, Everything is completed successfully ..!!");
}
@Override
protected void hookOnError(Throwable throwable) {
System.out.println("Opps, Something went wrong ..!! "+throwable.getMessage());
}
@Override
protected void hookOnCancel() {
System.out.println("Oh !!, Operation has been cancelled ..!! ");
}
@Override
protected void hookFinally(SignalType type) {
System.out.println("Shutting down the operation, Bye ..!! "+type.name());
}
}

The  BaseSubscriber class provides various hook methods, which represent the corresponding event. It is a placeholder to provide a custom implementation. Implementing these methods is similar to using various versions of the subscribe() method that we have seen in the Type of subscriber section. For example, if you only implement the hookOnNext, hookOnError , and hookOnComplete methods, then it is equivalent to the fourth version of subscribe().

The  hookOnSubscribe() method facilitates a subscription event. The backpressure is provided with subscription.request(). You can request as many element, as you want. For example, update the code for the hookOnSubscribe() method as follows:

 @Override
protected void hookOnSubscribe(Subscription subscription) {
System.out.println("Fetching the values ...!!");
for(int index=0; index<6;index++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
subscription.request(1);
}
}

We are requesting records one-by-one by calling subscription.request(1) in a loop. To get an idea how it works, we put a two-second delay in between so you will get a record for every two requests. Once all data is completed, it will trigger the completion event and the hookOnComplete() method will be called. The output would be as follows:

主站蜘蛛池模板: 汝城县| 来宾市| 城市| 屯门区| 牡丹江市| 岚皋县| 永仁县| 定西市| 大姚县| 都江堰市| 尉犁县| 平远县| 宁远县| 偃师市| 梨树县| 治多县| 建瓯市| 灵石县| 克东县| 东乌珠穆沁旗| 岢岚县| 汪清县| 焉耆| 苍山县| 渝中区| 临沧市| 澳门| 哈密市| 陇西县| 左权县| 洪洞县| 东源县| 都昌县| 永靖县| 伊川县| 台北县| 丰城市| 镇安县| 河西区| 海门市| 凤翔县|