官术网_书友最值得收藏!

  • PHP Reactive Programming
  • Martin Sikora
  • 1070字
  • 2021-07-09 19:06:17

Writing JSONDecodeOperator

We're going to work with calls to remote API's a few times throughout this book, so it would be very handy to have an operator that transforms JSON string responses into their PHP array representations.

This example looks like something that could be easily done with just the map() operator:

// rxphp_06.php  
Rx\Observable::just('{"value":42}') 
    ->map(function($value) { 
        return json_decode($value, true); 
    }) 
    ->subscribe(new DebugSubject()); 

This prints the correct result for sure, as we can see in the following output:

$ php rxphp_06.php
16:39:50 [] onNext: {"value": 42} (array)
16:39:50 [] onCompleted

Well, but what about malformed JSON strings? What happens if we try to decode the following:

Rx\Observable::just('NA') 
    ->map(function($value) { 
        return json_decode($value, true); 
    }) 
    ->subscribe(new DebugSubject()); 

The function json_decode() doesn't throw an exception when trying to process an invalid JSON string; it just returns null:

15:51:06 [] onNext: (NULL)

This is probably not what we want. If the JSON string is invalid, then something is wrong because this situation should never happen and we want to send an onError notification.

If we wanted to know any further information about which error occurred, we'd have to call json_last_error(). So, this is a perfect opportunity to write a custom operator that decodes JSON strings that, if any error occurs, will send an onError.

All operators implement the OperatorInterface and __invoke() method. This so-called "magic" method is supported from PHP 5.3+ and allows the use of objects as functions:

// __invoke.php 
class InvokeExampleClass { 
    public function __invoke($x) { 
        echo strlen($x); 
    } 
} 
$obj = new InvokeExampleClass(); 
$obj('apple'); 
var_dump(is_callable($obj)); 

When class implements __invoke(), it's automatically considered as callable as well:

$ php __invoke.php
int(5)
bool(true)

Writing operators is very similar. A stub for our class will look like the following:

// JSONDecodeOperator.php 
use Rx\ObservableInterface as ObservableI; 
use Rx\ObserverInterface as ObserverI; 
use Rx\SchedulerInterface as SchedulerI; 
use Rx\Operator\OperatorInterface as OperatorI; 
 
class JSONDecodeOperator implements OperatorI { 
    public function __invoke(ObservableI $observable, 
            ObserverI $observer, SchedulerI $scheduler = null) { 
        // ... 
    } 
} 

Method __invoke() takes three arguments and returns a Disposable object. Right now, we'll use just the first two and not worry about the $scheduler:

  • ObservableInterface $observable: This is our input Observable that we'll subscribe to
  • ObserverInterface $observer: This is where we'll emit all output values from this operator

We'll follow almost the same principle as when writing a custom Subject class. We're going to use CallbackObserver to subscribe to the Observable and perform all of our logic:

class JSONDecodeOperator implements OperatorI { 
  public function __invoke(ObservableI $observable, 
      ObserverI $observer, SchedulerI $scheduler = null) { 
 
    $obs = new CallbackObserver( 
      function ($value) use ($observer) { 
        $decoded = json_decode($value, true); 
        if (json_last_error() == JSON_ERROR_NONE) { 
          $observer->onNext($decoded); 
        } else { 
          $msg = json_last_error_msg(); 
          $e = new InvalidArgumentException($msg); 
          $observer->onError($e); 
        } 
      }, 
      function ($error) use ($observer) { 
        $observer->onError($error); 
      }, 
      function () use ($observer) { 
        $observer->onCompleted(); 
      } 
    ); 
 
    return $observable->subscribe($obs, $scheduler); 
  } 
} 

There're a few interesting things to notice:

  • When onError or onComplete notifications occur, we just pass them along without any further logic.
  • The operator can send any signal any time it wants. Inside CallbackObserver class's onNext closure, we check whether any error occurred while decoding the input JSON string coming from the source Observable using json_last_error().
  • The operator has full access to the source Observable.
  • The operator can emit values independently on values from the source Observable.

In order to use our operator, we have to use the Observable::lift(), method which takes a Closure as an argument that needs to return an instance of an operator (this function is a so-called operator factory):

// rxphp_07.php 
Rx\Observable::just('{"value":42}') 
    ->lift(function() { 
        return new JSONDecodeOperator(); 
    }) 
    ->subscribe(new DebugSubject()); 

Using custom operators was significantly simplified in RxPHP 2, but using the lift() method is universal and works in both versions of RxPHP.

Valid JSON string is decoded as expected:

$ php rxphp_07.php
17:58:49 [] onNext: {"value": 42} (array)
17:58:49 [] onCompleted

On the other hand, the same invalid JSON string that we used above doesn't call onNext, but onError instead. It sends this notification with an instance of InvalidArgumentException class and the error message from json_last_error_msg(), as shown in the following output:

17:59:25 onError (InvalidArgumentException): Syntax error

As usual, we're going to reuse this class throughout this book. The next chapter is going to work with remote APIs a lot, so this operator is going to be very handy.

Simplifying propagation of notifications

In the JSONDecodeOperator class, we didn't want to modify either onError nor onComplete notifications and we just passed them along. However, there's an easier way to do this thanks to how PHP works with callables. A valid callable is also an array with two items: an object and a method name.

This means we can rewrite the above CallbackObserver instantiation as follows:

$callbackObserver = new CallbackObserver( 
    function ($value) use ($observer) { 
        // ... 
    }, 
    [$observer, 'onError'], 
    [$observer, 'onCompleted'] 
); 

The functionality is exactly the same. Instead of creating an anonymous function for each notification, we can just pass the callable directly.

Using custom operators in RxPHP 2

In Chapter 1, Introduction to Reactive Programming, we mentioned a magic __call() method. RxPHP 2 uses this method to allow the use of custom operators by auto-discovering them in two namespace formats.

The first option is defining our operator class in the Rx\Operator namespace:

// JSONDecodeOperator.php 
namespace Rx\Operator; 
 
use Rx\ObservableInterface as ObservableI; 
use Rx\ObserverInterface as ObserverI; 
use Rx\Operator\OperatorInterface as OperatorI; 
use Rx\DisposableInterface as DisposableI; 
 
class JSONDecodeOperator implements OperatorI { 
  public function __invoke(ObservableI $observable, 
      ObserverI $observer): DisposableI { 
 
   return $observable->subscribe( 
     function ($value) use ($observer) { 
       $decoded = json_decode($value, true); 
       if (json_last_error() == JSON_ERROR_NONE) { 
         $observer->onNext($decoded); 
       } else { 
         $msg = json_last_error_msg(); 
         $e = new InvalidArgumentException($msg); 
         $observer->onError($e); 
       } 
     }, 
     [$observer, 'onError'], 
     [$observer, 'onCompleted'] 
   ); 
  } 
} 

It's the same JSONDecodeOperator class, just updated for RxPHP 2. Using this operator is, then, very simple:

Observable::just('{"value":42}') 
    ->JSONDecode() 
    ->subscribe(new DebugSubject()); 

Since our operator resides under the Rx\Operator namespace, it's expanded by the __call() method to Rx\Operator\JSONDecodeOperator. This means we don't need to use the lift() method at all.

Another way is to prefix the operator name and namespace with underscores _ which are then merged into a full class name. This means we can put all application specific operators under a custom namespace:

// JSONDecodeOperator.php 
namespace MyApp\Rx\Operator; 
... 
class JSONDecodeOperator implements OperatorI { ... } 

Now we can use the operator as follows:

Observable::just('{"value":42}') 
    ->_MyApp_JSONDecode() 
    ->subscribe(new DebugSubject()); 
主站蜘蛛池模板: 柘城县| 皮山县| 四川省| 静安区| 濉溪县| 宜州市| 手游| 敦煌市| 丹寨县| 房产| 靖宇县| 阳东县| 喀什市| 丰镇市| 阿尔山市| 金秀| 原平市| 永兴县| 奎屯市| 宁海县| 基隆市| 阿拉善右旗| 班玛县| 石嘴山市| 蓝山县| 库伦旗| 德江县| 内丘县| 青浦区| 望奎县| 城口县| 罗源县| 松原市| 丰顺县| 台北市| 五台县| 大连市| 濉溪县| 固安县| 宝清县| 南和县|