We're going to work with calls to remote API's a few times throughout this book, so it would be very handy to have an operator that transforms JSON string responses into their PHP array representations.
This example looks like something that could be easily done with just the map() operator:
The function json_decode() doesn't throw an exception when trying to process an invalid JSON string; it just returns null:
15:51:06 [] onNext: (NULL)
This is probably not what we want. If the JSON string is invalid, then something is wrong because this situation should never happen and we want to send an onError notification.
If we wanted to know any further information about which error occurred, we'd have to call json_last_error(). So, this is a perfect opportunity to write a custom operator that decodes JSON strings that, if any error occurs, will send an onError.
All operators implement the OperatorInterface and __invoke() method. This so-called "magic" method is supported from PHP 5.3+ and allows the use of objects as functions:
// __invoke.php
class InvokeExampleClass {
public function __invoke($x) {
echo strlen($x);
}
}
$obj = new InvokeExampleClass();
$obj('apple');
var_dump(is_callable($obj));
When class implements __invoke(), it's automatically considered as callable as well:
$ php __invoke.phpint(5)bool(true)
Writing operators is very similar. A stub for our class will look like the following:
// JSONDecodeOperator.php
use Rx\ObservableInterface as ObservableI;
use Rx\ObserverInterface as ObserverI;
use Rx\SchedulerInterface as SchedulerI;
use Rx\Operator\OperatorInterface as OperatorI;
class JSONDecodeOperator implements OperatorI {
public function __invoke(ObservableI $observable,
ObserverI $observer, SchedulerI $scheduler = null) {
// ...
}
}
Method __invoke() takes three arguments and returns a Disposable object. Right now, we'll use just the first two and not worry about the $scheduler:
ObservableInterface $observable: This is our input Observable that we'll subscribe to
ObserverInterface $observer: This is where we'll emit all output values from this operator
We'll follow almost the same principle as when writing a custom Subject class. We're going to use CallbackObserver to subscribe to the Observable and perform all of our logic:
class JSONDecodeOperator implements OperatorI {
public function __invoke(ObservableI $observable,
ObserverI $observer, SchedulerI $scheduler = null) {
$obs = new CallbackObserver(
function ($value) use ($observer) {
$decoded = json_decode($value, true);
if (json_last_error() == JSON_ERROR_NONE) {
$observer->onNext($decoded);
} else {
$msg = json_last_error_msg();
$e = new InvalidArgumentException($msg);
$observer->onError($e);
}
},
function ($error) use ($observer) {
$observer->onError($error);
},
function () use ($observer) {
$observer->onCompleted();
}
);
return $observable->subscribe($obs, $scheduler);
}
}
There're a few interesting things to notice:
When onError or onComplete notifications occur, we just pass them along without any further logic.
The operator can send any signal any time it wants. Inside CallbackObserver class's onNext closure, we check whether any error occurred while decoding the input JSON string coming from the source Observable using json_last_error().
The operator has full access to the source Observable.
The operator can emit values independently on values from the source Observable.
In order to use our operator, we have to use the Observable::lift(), method which takes a Closure as an argument that needs to return an instance of an operator (this function is a so-called operator factory):
On the other hand, the same invalid JSON string that we used above doesn't call onNext, but onError instead. It sends this notification with an instance of InvalidArgumentException class and the error message from json_last_error_msg(), as shown in the following output:
As usual, we're going to reuse this class throughout this book. The next chapter is going to work with remote APIs a lot, so this operator is going to be very handy.
Simplifying propagation of notifications
In the JSONDecodeOperator class, we didn't want to modify either onError nor onComplete notifications and we just passed them along. However, there's an easier way to do this thanks to how PHP works with callables. A valid callable is also an array with two items: an object and a method name.
This means we can rewrite the above CallbackObserver instantiation as follows:
$callbackObserver = new CallbackObserver(
function ($value) use ($observer) {
// ...
},
[$observer, 'onError'],
[$observer, 'onCompleted']
);
The functionality is exactly the same. Instead of creating an anonymous function for each notification, we can just pass the callable directly.
Using custom operators in RxPHP 2
In Chapter 1, Introduction to Reactive Programming, we mentioned a magic __call() method. RxPHP 2 uses this method to allow the use of custom operators by auto-discovering them in two namespace formats.
The first option is defining our operator class in the Rx\Operator namespace:
// JSONDecodeOperator.php
namespace Rx\Operator;
use Rx\ObservableInterface as ObservableI;
use Rx\ObserverInterface as ObserverI;
use Rx\Operator\OperatorInterface as OperatorI;
use Rx\DisposableInterface as DisposableI;
class JSONDecodeOperator implements OperatorI {
public function __invoke(ObservableI $observable,
ObserverI $observer): DisposableI {
return $observable->subscribe(
function ($value) use ($observer) {
$decoded = json_decode($value, true);
if (json_last_error() == JSON_ERROR_NONE) {
$observer->onNext($decoded);
} else {
$msg = json_last_error_msg();
$e = new InvalidArgumentException($msg);
$observer->onError($e);
}
},
[$observer, 'onError'],
[$observer, 'onCompleted']
);
}
}
It's the same JSONDecodeOperator class, just updated for RxPHP 2. Using this operator is, then, very simple:
Since our operator resides under the Rx\Operator namespace, it's expanded by the __call() method to Rx\Operator\JSONDecodeOperator. This means we don't need to use the lift() method at all.
Another way is to prefix the operator name and namespace with underscores _ which are then merged into a full class name. This means we can put all application specific operators under a custom namespace: