官术网_书友最值得收藏!

  • PHP Reactive Programming
  • Martin Sikora
  • 1009字
  • 2021-07-09 19:06:16

Basic principles of Reactive Extensions

Let's have a look at a very simple example of RxPHP, similar to what we did in the previous chapter, and use it to demonstrate some of the basic principles behind Reactive Extensions.

We won't bother with defining an observer right now and will focus only on Observables and operators:

// rxphp_basics_01.php 
use Rx\Observable; 
$fruits = ['apple', 'banana', 'orange', 'raspberry']; 
 
Observable::fromArray($fruits) // Observable 
    ->map(function($value) { // operator 
        return strlen($value); 
    }) 
    ->filter(function($len) { // operator 
        return $len > 5; 
    }) 
    ->subscribe($observer); // observer 

In this example, we have one Observable, two operators and one observer.

An Observable can be chained with operators. In this example, the operators are map() and filter().

Observables have the subscribe() method that is used by observers to start receiving values at the end of the chain.

We can represent this chain by the following diagram:

Each arrow shows the direction of propagation of items and notifications

We should probably explain the difference between using Observables and just iterating the array.

Observables are like a push model, where a value is pushed down the operator chain when it's ready. This is very important because it's the Observable that decides when it should emit the next value. The internal logic of Observables can do whatever it needs to (for example, it can run some asynchronous task) and still remain completely hidden.

A similar concept to Observables are Promises. However, while a Promise represents a single value that will exist in the future, an Observable represents a stream of values.

On the other hand, iterating the array is like a pull model. We'd be pulling one item after another. The important consequence is that we'd have to have the array prepared beforehand (that's before we start iterating it).

Another important difference is that Observables behave like a data stream (or data flow). We talked about streams in Chapter 1, Introduction to Reactive Programming. In practice, this means that an Observable knows when it has emitted all its items, or when an error has occurred and is able to send proper notification down the chain.

For this reason, Observables can call three different methods on their observers (we'll see how this is implemented later in this chapter when we write a custom operator and a custom Observable):

  • onNext: This method is called when the next item is ready to be emitted. We typically say that "an Observable emits an item".
  • onError: Notification called when an error has occurred. This could be any type of error represented by an instance of the Exception class.
  • onComplete: Notification called when there're no more items to be emitted.

Each Observable can emit zero or more items.

Each Observable can send one error, or one complete notification; but never both.

This is why the CallbackObserver class we used in Chapter 1Introduction to Reactive Programming, takes three callables as arguments. These callables are called when the observer receives a next item, on error notification or on complete notification, respectively. All three callables are optional parameters and we can decide to ignore any of them.

For example, we can make an observer like the following:

use Rx\Observer\Callback\Observer; 
 
$observer = new CallbackObserver( 
    function($value) { 
        echo "Next: $value\n"; 
    }, 
    function(Exception $err) { 
        $msg = $err->getMessage(); 
        echo "Error: $msg\n"; 
    }, 
    function() { 
        echo "Complete\n"; 
    } 
); 

This observer defines all three callables. We can test it on the Observable we defined above and have a look at its output:

$ php rxphp_basics_01.php
Next: 6
Next: 6
Next: 9
Complete

We can see that only three values passed the filter() operator, followed by a proper complete notification at the end.

In RxPHP, every operator that takes a callable as an argument wraps its call internally with try…catch block. If the callable throws Exception, then this Exception is sent as onError notification. Consider the following example:

// rxphp_basics_02.php 
$fruits = ['apple', 'banana', 'orange', 'raspberry']; 
Observable::fromArray($fruits) 
    ->map(function($value) { 
        if ($value[0] == 'o') { 
            throw new Exception("It's broken."); 
        } 
        return strlen($value); 
    }) 
    ->filter(function($len) { 
        return $len > 5; 
    }) 
    ->subscribe($observer); 

With the same observer that we defined previously, this example will have the following output:

$ php rxphp_basics_02.php
Next: 6
Error: It's broken.

It's important to see that, when an error occurred, no more items were emitted, there's also no complete notification. This is because, when the observer received an error, it automatically unsubscribed.

We'll talk more about the process behind subscribing and unsubscribing in Chapter 3, Writing a Reddit Reader with RxPHP, and in Chapter 10, Using Advanced Operators and Techniques in RxPHP.

In Chapter 8, Multicasting in RxPHP and PHP7 pthreads Extension, we'll look more in-depth into what happens inside observers when they receive an error or complete notification.

One last thing before we move on. We said that Observables represent data streams. The great advantage of this is that we can easily combine or split streams, similar to what we saw in Chapter 1, Introduction to Reactive Programming, when talking about the gulp build tool.

Let's have a look at a slightly more advanced example of merging two Observables:

// rxphp_basics_03.php 
$fruits = ['apple', 'banana', 'orange', 'raspberry']; 
$vegetables = ['potato', 'carrot']; 
 
Observable::fromArray($fruits) 
    ->map(function($value) { 
        return strlen($value); 
    }) 
    ->filter(function($len) { 
        return $len > 5; 
    }) 
    ->merge(Observable::fromArray($vegetables)) 
    ->subscribe($observer); 

We used the merge() operator to combine the existing Observable with another Observable. Notice that we can add the operator anywhere we want. Since we added it after the filter() operator and before the subscribe() call, the items from the second Observable are going to be emitted right into the observer and will skip the preceding operator chain.

We can represent this chain by the following diagram:

The output for this example looks like the following:

$ php rxphp_basics_03.php
Next: 6
Next: 6
Next: 9
Next: potato
Next: carrot
Complete

These principles apply to all Rx implementations. Now, we should have a basic idea of what working with Observables, observers and operators in Rx looks like and we can talk more about each of them separately.

主站蜘蛛池模板: 祁阳县| 治多县| 通州区| 宝山区| 凯里市| 宝应县| 濉溪县| 义乌市| 尼勒克县| 东台市| 大邑县| 鄂托克旗| 金沙县| 吉木萨尔县| 会理县| 平顺县| 临江市| 新民市| 大连市| 小金县| 望江县| 曲阳县| 桂平市| 新蔡县| 保亭| 财经| 南通市| 阿坝| 景洪市| 岳西县| 长宁区| 临泽县| 措美县| 蛟河市| 林口县| 股票| 页游| 卓尼县| 怀柔区| 榆林市| 青浦区|