- Hands-On Reactive Programming with Reactor
- Rahul Sharma
- 536字
- 2021-08-13 15:22:54
Stream publisher
As we discussed in the previous chapter, the publisher is responsible for the generation of unbounded asynchronous events, and it pushes them to the associated subscribers. It is represented by the org.reactivestreams.Publisher interface, as follows:
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
The interface provides a single subscribe method. The method is invoked by any party that is interested in listening to events published by the publisher. The interface is quite simple, and it can be used to publish any type of event, be it a UI event (like a mouse-click) or a data event.
Since the interface is simple, let's add an implementation for our custom FibonacciPublisher:
public class FibonacciPublisher implements Publisher<Integer> {
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
int count = 0, a = 0, b = 1;
while (count < 50) {
int sum = a + b;
subscriber.onNext(b);
a = b;
b = sum;
count++;
}
subscriber.onComplete();
}
}
This implementation may look good, but does it comply to publisher behavior according to the specification? The specification prescribes rules that describe publisher behavior. A publisher must generate the following four types of events:
- Subscription event
- Data of type T, as declared by the publisher
- Completion event
- Error event
According to the specification, a publisher can emit any number of data events. However, it must publish only one event for completion, error, and subscription. Once a completion or an error event is published, the publisher can no longer send data events back to a subscriber.
As backpressure is an important aspect of the specification, a publisher cannot push an arbitrary number of events to a subscriber. Instead, the subscriber must specify how many events it can receive, and a publisher must publish events equal to, or less than, the specified number.
In order to validate a publisher, the Reactive Streams API has published a test compatibility kit. Let's add the reactive-streams-tck in the build.gradle to our project:
dependencies {
// rest removed for brevity
testCompile group: 'org.reactivestreams',
name: 'reactive-streams-tck', version: '1.0.2'
}
The Technology Compatibility Kit (TCK) provides a PublisherVerifier interface that must be implemented in order to validate a publisher. It provides the following two methods:
- createPublisher(long): This method must provide an instance of the publisher that can produce the specified number of events
- createFailedPublisher(): This method must try to build a publisher that has raised an error event
Let's add the following implementation to test our FibonacciPublisher:
public class FibonacciPublisherVerifier extends PublisherVerification<Integer> {
public FibonacciPublisherVerifier(){
super(new TestEnvironment());
}
@Override
public Publisher<Integer> createFailedPublisher() {
return null;
}
@Override
public Publisher<Integer> createPublisher(long elements) {
return new FibonacciPublisher();
}
}
Now, let's run the test case to determine whether we comply with the Reactive Streams publisher specification:

As shown in the preceding screenshot, there are around 20 test failures and 16 skipped tests. We could fix each one of them, but the aim here is to understand that even a simple interface of a publisher is governed by many behavior specifications. Therefore, it is overkill to build a custom publisher. As service builders, we can use the Reactor framework. This provides publisher implementations capable of publishing any kind of data.
- Mastering Hadoop 3
- Mastering Proxmox(Third Edition)
- 網上沖浪
- 人工免疫算法改進及其應用
- Google App Inventor
- 數據庫原理與應用技術
- AWS Certified SysOps Administrator:Associate Guide
- 數據挖掘方法及天體光譜挖掘技術
- Ceph:Designing and Implementing Scalable Storage Systems
- 從零開始學SQL Server
- Linux Shell編程從初學到精通
- PowerPoint 2010幻燈片制作高手速成
- Effective Business Intelligence with QuickSight
- QTP自動化測試實踐
- 傳感技術基礎與技能實訓