- PHP Reactive Programming
- Martin Sikora
- 1009字
- 2021-07-09 19:06:16
Basic principles of Reactive Extensions
Let's have a look at a very simple example of RxPHP, similar to what we did in the previous chapter, and use it to demonstrate some of the basic principles behind Reactive Extensions.
We won't bother with defining an observer right now and will focus only on Observables and operators:
// rxphp_basics_01.php use Rx\Observable; $fruits = ['apple', 'banana', 'orange', 'raspberry']; Observable::fromArray($fruits) // Observable ->map(function($value) { // operator return strlen($value); }) ->filter(function($len) { // operator return $len > 5; }) ->subscribe($observer); // observer
In this example, we have one Observable, two operators and one observer.
An Observable can be chained with operators. In this example, the operators are map()
and filter()
.
Observables have the subscribe()
method that is used by observers to start receiving values at the end of the chain.
We can represent this chain by the following diagram:

Each arrow shows the direction of propagation of items and notifications
We should probably explain the difference between using Observables and just iterating the array.
Observables are like a push model, where a value is pushed down the operator chain when it's ready. This is very important because it's the Observable that decides when it should emit the next value. The internal logic of Observables can do whatever it needs to (for example, it can run some asynchronous task) and still remain completely hidden.
A similar concept to Observables are Promises. However, while a Promise represents a single value that will exist in the future, an Observable represents a stream of values.
On the other hand, iterating the array is like a pull model. We'd be pulling one item after another. The important consequence is that we'd have to have the array prepared beforehand (that's before we start iterating it).
Another important difference is that Observables behave like a data stream (or data flow). We talked about streams in Chapter 1, Introduction to Reactive Programming. In practice, this means that an Observable knows when it has emitted all its items, or when an error has occurred and is able to send proper notification down the chain.
For this reason, Observables can call three different methods on their observers (we'll see how this is implemented later in this chapter when we write a custom operator and a custom Observable):
onNext
: This method is called when the next item is ready to be emitted. We typically say that "an Observable emits an item".onError
: Notification called when an error has occurred. This could be any type of error represented by an instance of theException
class.onComplete
: Notification called when there're no more items to be emitted.
Each Observable can emit zero or more items.
Each Observable can send one error, or one complete notification; but never both.
This is why the CallbackObserver
class we used in Chapter 1, Introduction to Reactive Programming, takes three callables as arguments. These callables are called when the observer receives a next item, on error notification or on complete notification, respectively. All three callables are optional parameters and we can decide to ignore any of them.
For example, we can make an observer like the following:
use Rx\Observer\Callback\Observer; $observer = new CallbackObserver( function($value) { echo "Next: $value\n"; }, function(Exception $err) { $msg = $err->getMessage(); echo "Error: $msg\n"; }, function() { echo "Complete\n"; } );
This observer defines all three callables. We can test it on the Observable we defined above and have a look at its output:
$ php rxphp_basics_01.php Next: 6 Next: 6 Next: 9 Complete
We can see that only three values passed the filter()
operator, followed by a proper complete notification at the end.
In RxPHP, every operator that takes a callable as an argument wraps its call internally with try…catch
block. If the callable throws Exception
, then this Exception
is sent as onError
notification. Consider the following example:
// rxphp_basics_02.php $fruits = ['apple', 'banana', 'orange', 'raspberry']; Observable::fromArray($fruits) ->map(function($value) { if ($value[0] == 'o') { throw new Exception("It's broken."); } return strlen($value); }) ->filter(function($len) { return $len > 5; }) ->subscribe($observer);
With the same observer that we defined previously, this example will have the following output:
$ php rxphp_basics_02.php Next: 6 Error: It's broken.
It's important to see that, when an error occurred, no more items were emitted, there's also no complete notification. This is because, when the observer received an error, it automatically unsubscribed.
We'll talk more about the process behind subscribing and unsubscribing in Chapter 3, Writing a Reddit Reader with RxPHP, and in Chapter 10, Using Advanced Operators and Techniques in RxPHP.
In Chapter 8, Multicasting in RxPHP and PHP7 pthreads Extension, we'll look more in-depth into what happens inside observers when they receive an error or complete notification.
One last thing before we move on. We said that Observables represent data streams. The great advantage of this is that we can easily combine or split streams, similar to what we saw in Chapter 1, Introduction to Reactive Programming, when talking about the gulp
build tool.
Let's have a look at a slightly more advanced example of merging two Observables:
// rxphp_basics_03.php $fruits = ['apple', 'banana', 'orange', 'raspberry']; $vegetables = ['potato', 'carrot']; Observable::fromArray($fruits) ->map(function($value) { return strlen($value); }) ->filter(function($len) { return $len > 5; }) ->merge(Observable::fromArray($vegetables)) ->subscribe($observer);
We used the merge()
operator to combine the existing Observable with another Observable. Notice that we can add the operator anywhere we want. Since we added it after the filter()
operator and before the subscribe()
call, the items from the second Observable are going to be emitted right into the observer and will skip the preceding operator chain.
We can represent this chain by the following diagram:

The output for this example looks like the following:
$ php rxphp_basics_03.php Next: 6 Next: 6 Next: 9 Next: potato Next: carrot Complete
These principles apply to all Rx implementations. Now, we should have a basic idea of what working with Observables, observers and operators in Rx looks like and we can talk more about each of them separately.
- Learning Apex Programming
- Mastering ServiceStack
- 算法精粹:經(jīng)典計算機科學問題的Java實現(xiàn)
- Python自動化運維快速入門
- Java 9 Programming Blueprints
- Java持續(xù)交付
- C程序設計案例教程
- 軟件測試技術指南
- 小學生C++創(chuàng)意編程(視頻教學版)
- 焊接機器人系統(tǒng)操作、編程與維護
- 常用工具軟件立體化教程(微課版)
- SQL Server實用教程(SQL Server 2008版)
- Java零基礎實戰(zhàn)
- 微信小程序開發(fā)實戰(zhàn):設計·運營·變現(xiàn)(圖解案例版)
- .NET 4.0面向對象編程漫談:應用篇