官术网_书友最值得收藏!

Writing CURLObservable

As we said, we're going to work with API calls and, for this reason, we need a comfortable way of creating HTTP requests. It's probably no surprise that we'll write a custom Observable that downloads a URL and passes it's response to its observers, where we'll decode it from JSON using the operator we created just a couple of lines above.

We're going to use PHP's cURL module, which is a wrapper around libcurl ( https://curl.haxx.se/libcurl/ ) - a C library for transferring data via any protocols imaginable.

We'll start by using plain simple cURL in PHP and we'll see that it supports some sort of asynchronous approach out-of-the-box.

Imperative approach and cURL

If we just wanted to download a single URL, we wouldn't need anything special. However, we want to make this, and all future applications of CURLObservable class, more interactive, so we'll also keep track of the downloading progress.

A plain and simple approach could look like this:

// curl_01.php 
$ch = curl_init(); 
curl_setopt($ch, CURLOPT_URL, "http://google.com"); 
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); 
curl_setopt($ch, CURLOPT_PROGRESSFUNCTION, 'progress'); 
curl_setopt($ch, CURLOPT_NOPROGRESS, false); 
curl_setopt($ch, CURLOPT_HEADER, 0); 
$html = curl_exec($ch); 
curl_close($ch); 
 
function progress($res, $downtotal, $down, $uptotal, $up) { 
    if ($download_size > 0) { 
        printf("%.2f\n", $down / $downtotal * 100); 
    } 
    ob_flush(); 
    usleep(100 * 1000); 
} 

We're using CURLOPT_PROGRESSFUNCTION option to set a callback function which is invoked internally by the cURL module. It takes four arguments that help us keep track of how much of the page's total size already has been downloaded.

We probably don't need to show its output because it's pretty obvious.

There's also a small subset of cURL functions that work with multiple cURL handles simultaneously. These are all prefixed with curl_multi_ and are executed by calling curl_multi_exec(). Nonetheless, the curl_multi_exec() function is blocking and the interpreter needs to wait until it finishes.

Implementing cURL into a custom Observable

We've already seen how to write a custom observer, Subject and operator. Now is the right time to write an Observable as well. We want the Observable to emit values when downloading the URL and, at the end, return a complete response. We can distinguish between the two types of messages by checking their type. Progress will always be a double, while response will always be a string.

Let's start with our class synopsis to see how it's going to work and then implement each method separately with a short description:

use Rx\Observable; 
use Rx\ObserverInterface as ObserverI; 
 
class CURLObservable extends Observable { 
    public function __construct($url) {} 
    public function subscribe(ObserverI $obsr, $sched = null) {} 
    private function startDownload() {} 
    private function progress($r, $downtot, $down, $uptot, $up) {} 
} 

Every time we write an Observable, we'll extend the base Rx\Observable class. We could theoretically just implement Rx\ObservableInterface, but, most of the time, we also want to inherit all its internal logic and all existing operators.

The constructor and method startDownload() are going to be very simple. In startDownload(), we start downloading the URL while monitoring its progress.

Please note that this code goes inside the CURLObservable class; we're just trying to keep the code short and easy to read, so we have omitted indentation and class definition in this example:

public function __construct($url) { 
    $this->url = $url; 
} 
 
private function startDownload() { 
    $ch = curl_init(); 
    curl_setopt($ch, CURLOPT_URL, $this->url); 
    curl_setopt($ch, CURLOPT_PROGRESSFUNCTION,[$this,'progress']); 
    curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); 
    curl_setopt($ch, CURLOPT_NOPROGRESS, false); 
    curl_setopt($ch, CURLOPT_HEADER, 0); 
    curl_setopt($ch, CURLOPT_USERAGENT, 'Mozilla/5.0 ...'); 
    // Disable gzip compression 
    curl_setopt($ch, CURLOPT_ENCODING, 'gzip;q=0,deflate,sdch'); 
    $response = curl_exec($ch); 
    curl_close($ch); 
 
    return $response; 
} 

This is mostly the same as the example using an imperative approach. The only interesting difference is that we're using a callable [$this,'progress'] instead of just a function name, as we did earlier.

The actual emission of values happens inside the progress() method:

private function progress($res, $downtotal, $down, $uptotal, $up){ 
    if ($downtotal > 0) { 
        $percentage = sprintf("%.2f", $down / $downtotal * 100); 
        foreach ($this->observers as $observer) { 
            /** @var ObserverI $observer */ 
            $observer->onNext(floatval($percentage)); 
        } 
    } 
} 

Since we inherited the original Observable, we can make use of its protected property $observers that holds all subscribed observers, as its name suggests. To emit a value to all of them, we can simply iterate the array and call onNext on each observer.

The only method we haven't seen so far is subscribe():

public function subscribe(ObserverI $obsr, $sched = null) { 
    $disp1 = parent::subscribe($obsr, $sched); 
 
    if (null === $sched) { 
        $sched = new ImmediateScheduler(); 
    } 
 
    $disp2 = $sched->schedule(function() use ($obsr, $started) { 
        $response = $this->startDownload(); 
        if ($response) { 
            $obsr->onNext($response); 
            $obsr->onCompleted(); 
        } else { 
            $msg = 'Unable to download ' . $this->url); 
            $obsr->onError(new Exception($msg)); 
        } 
    }); 
 
    return new CompositeDisposable([$disp1, $disp2]); 
} 

This method combines many of the things we've seen in this chapter:

  • We definitely want to keep the original functionality of the Observable, so we'll call its parent implementation. This adds the observer to the array of observers, as mentioned a moment ago.
  • The parent::subscribe() method returns a disposable. That's the object we can use to unsubscribe the observer from this Observable.
  • If we don't specify what Scheduler this Observable should use, it'll fall back to ImmediateScheduler. We've already mentioned ImmediateScheduler when we were talking about Schedulers in general. In RxPHP 2, we'd use Scheduler::getImmediate() instead of directly using the class name.
  • Right after that, we schedule the work (in terms of Schedulers, it's usually referred to as "action") to be executed by the Scheduler. Note that the action itself is a closure.
  • Then, we start downloading the URL. If we subscribe another observer to the same Observable, it'll re-download the same URL again. Download progress is then emitted with frequency according to cURL's internals. We'll talk more about the subscription process in the next chapter.
  • When downloading finishes, we emit the response or an error.
  • At the end of this method, it returns another disposable. This time, it's CompositeDisposable that is used to wrap other disposables. When calling its dispose() method, these wrapped ones are properly disposed as well.

So, that's it. Now we can test our Observable and see what its output is. We can try to grab a list of the most recent questions on www.stackoverflow.com tagged with functional-programming":

$url = 'https://api.stack...&tagged=functional-programming'; 
$observable = new CurlObservable($url); 
$observable->subscribe(new DebugSubject()); 

This prints a couple of numbers and then the response JSON string:

16:17:52 onNext: 21.39 (double)
16:17:52 onNext: 49.19 (double)
16:17:52 onNext: 49.19 (double)
16:17:52 onNext: 76.99 (double)
16:17:52 onNext: 100 (double)
16:17:52 onNext: {"items":[{"tags":["javascript","... (string)
16:17:52 onCompleted

You can see that one value was emitted twice. This is because of the timing and network latency when cURL evaluates the callback, which is nothing unusual. If we didn't want to see repeated values, we could use the distinct() operator that we saw when talking about "marble diagrams".

Now let's combine it with our JSONDecodeOperator. Since we're now interested only in the string response and want to ignore all progress emissions, we'll also use the filter() operator:

// rxphp_curl.php 
$observable 
    ->filter(function($value) { 
        return is_string($value); 
    }) 
    ->lift(function() { 
        return new JSONDecodeOperator(); 
    }) 
    ->subscribe(new DebugSubject(null, 128)); 

This returns part of the response array (for demonstration purposes, we added indentation and made the output a little longer):

$ php rxphp_curl.php 
16:23:55 [] onNext: { 
    "items": [ 
        { 
            "tags": [ 
                "javascript", 
                "functional-programming", 
       ... (array) 
16:23:55 [] onCompleted 

When we used the filter() operator, you might notice that we called it Observable::filter() without necessarily using the lift() method. This is because almost all operators are, in fact, just lift() calls with predefined Closures that return an appropriate operator class. A good question is whether we can write our own shorthand for JSONDecodeOperator when we're already extending the base Observable class. Maybe something like Observable::jsonDecode()?

The answer is yes, we can. However, in RxPHP 1.x, it wouldn't help us a lot. When we chain operators, they return other instances of Observables that aren't under our control. We could theoretically use Observable::jsonDecode() right after creating CurlObservable because we'd know that it's going to be an instance of this class, but chaining it with filter() brings us back to the original Observable that doesn't know any jsonDecode() methods. In particular, the filter() operator returns an instance of Rx\Observable\AnonymousObservable.

Running multiple requests asynchronously

An interesting use case could be to start multiple requests asynchronously. All calls to curl_exec() are blocking, which means that they block the execution context until they're finished.

Unfortunately, this is a very tricky problem that's hard to solve without using any extra PHP modules, such as pthreads, as we'll see much later in Chapter 9Multithreaded and Distributed Computing with pthreads and Gearman.

We can, however, make use of PHP's standard proc_open() to spawn non-blocking subprocesses that can run in parallel and then just ask for their output.

主站蜘蛛池模板: 元江| 黄大仙区| 长岛县| 建水县| 安远县| 娄烦县| 郸城县| 介休市| 商河县| 奇台县| 中宁县| 桐城市| 慈利县| 大庆市| 无为县| 新昌县| 勐海县| 灵璧县| 安达市| 安陆市| 甘洛县| 共和县| 九台市| 乌兰县| 郯城县| 棋牌| 疏勒县| 佳木斯市| 乾安县| 永城市| 芒康县| 凌源市| 邛崃市| 临沧市| 沛县| 当雄县| 江油市| 陆良县| 浠水县| 甘肃省| 辽阳县|