官术网_书友最值得收藏!

Vert.x adjustments

Along with the transformation of RxJava, the rest of the reactive libraries and frameworks vendors have also started adopting the Reactive Streams specification. Following the specification, Vert.x included an additional module which provides support for the Reactive Streams API. The following example demonstrates this addition:

...                                                                // (1)
.requestHandler(request -> { //

ReactiveReadStream<Buffer> rrs = // (2)
ReactiveReadStream.readStream(); //
HttpServerResponse response = request.response(); //

Flowable<Buffer> logs = Flowable // (3)
.fromPublisher(logsService.stream()) //
.map(Buffer::buffer) //
.doOnTerminate(response::end); //

logs.subscribe(rrs); // (4)

response.setStatusCode(200); // (5)
response.setChunked(true); //
response.putHeader("Content-Type", "text/plain"); //
response.putHeader("Connection", "keep-alive"); //

Pump.pump(rrs, response) // (6)
.start(); //
})
...

The key is as follows:

  1. This is the request handler declaration. This is a generic request handler that allows handling any requests sent to the server. 
  2. This is the Subscriber and HTTP response declaration. Here ReactiveReadStream implements both org.reactivestreams.Subscriber and ReadStream, which allows transforming any Publisher to the source of data compatible with a Vert.x API. 
  3. This is the processing flow declaration. In that example, we refer to the new Reactive Streams-based LogsService interface, and to write a functional transformation of the elements in the stream we use the Flowable API from RxJava 2.x.
  4. This is the subscription stage. Once the processing flow is declared, we may subscribe ReactiveReadStream to the Flowable.
  5. This is a response preparation stage.
  6. This is the final response being sent to the client. Here, the Pump class plays an important role in a sophisticated mechanism of backpressure control to prevent the underlying WriteStream buffer from getting too full.

As we can see, Vert.x does not provide a fluent API for writing a stream of element processing. However, it provides an API that allows converting any Publisher to the Vert.x API, keeping the sophisticated backpressure management from Reactive Streams in place.

主站蜘蛛池模板: 康保县| 九寨沟县| 河北区| 开化县| 扎囊县| 阿坝县| 泾川县| 广南县| 南靖县| 石嘴山市| 西昌市| 灵寿县| 湖南省| 昆山市| 襄垣县| 益阳市| 平乡县| 四子王旗| 栾川县| 清原| 怀仁县| 宁安市| 安溪县| 涡阳县| 石楼县| 健康| 永修县| 石首市| 灯塔市| 吐鲁番市| 景泰县| 卓尼县| 公主岭市| 甘南县| 大竹县| 南雄市| 中阳县| 乐清市| 宾川县| 桂平市| 凤凰县|