- Hands-On Reactive Programming with Python
- Romain Picard
- 581字
- 2021-06-24 18:25:25
The create operator
The create operator is the operator most often used to create custom observables. The implementation of almost all other factory operators is done on top of this one. Its marble diagram is shown in the following figure:
Its prototype is as follows:
Observable.create(subscribe)
The subscribe parameter is a function which will be called each time an observer subscribes to the observable. The prototype of the subscribe function is as follows:
subscribe(observer)
Its only argument is the observer that subscribed to the observable. The following code shows a simple way to use it:
def on_subscribe(observer):
observer.on_next(1)
observer.on_next(2)
observer.on_next(3)
observer.on_completed()
numbers = Observable.create(on_subscribe)
numbers.subscribe(
on_next=lambda i: print("item: {}".format(i)),
on_error=lambda e: print("error: {}".format(e)),
on_completed=lambda: print("completed")
)
The on_subscribe subscription function emits three items on the observer by calling its on_next method. Then it completes the observable by calling the on_completed method of the observer. This subscribe function is used to create the numbers observable. The subscription provides the following result:
The preceding example was very simple. Let's look at a more realistic example of a very common pattern of the create operator—implementing an observable that reacts from the items of another observable (in other words, an operator). The preceding example sums items from the source observable as long as they are even. Every time an odd number is received, the current sum is emitted on the output observable and its value is reset to the value of the odd number.
Let's start with the subscription to this custom operator, shown as follows:
numbers = Observable.from_([2,2,4,5,2])
sum_even(numbers).subscribe(
on_next=lambda i: print("item: {}".format(i)),
on_error=lambda e: print("error: {}".format(e)),
on_completed=lambda: print("completed")
)
An observable of numbers is created. This observable is provided to the sum_even function, and the resulting observable is subscribed. The skeleton of the sum_even function is as follows:
def sum_even(source):
def on_subscribe(observer):
accumulator = 0
source.subscribe(on_next, on_error, on_completed)
return Observable.create(on_subscribe)
The preceding code just returns an observable, with the nested on_subscribe subscription function. The on_subscribe function initializes the sum accumulator to 0 and subscribes to the source observable. So, when an observer subscribes to the observable returned by sum_even, on_subscribe is called, and the source observable is also subscribed. This is a chain of subscriptions. Finally, the callbacks of the source observer must be implemented as nested functions of on_subscribe, as follows:
def on_next(i):
nonlocal accumulator
if i % 2 == 0:
accumulator += i
else:
observer.on_next(accumulator)
accumulator = i
def on_error(e):
observer.on_error()
def on_completed():
nonlocal accumulator
observer.on_next(accumulator)
observer.on_completed()
The on_next implementation should be clear. The accumulator is updated with the sum of items when they are even and is reset when they are odd. The value of the accumulator is emitted every time an odd number is received. The error and completion of the source observable are propagated to observer of the output observable. The complete code is as follows:
def sum_even(source):
def on_subscribe(observer):
accumulator = 0
def on_next(i):
nonlocal accumulator
if i % 2 == 0:
accumulator += i
else:
observer.on_next(accumulator)
accumulator = i
def on_error(e):
observer.on_error()
def on_completed():
nonlocal accumulator
observer.on_next(accumulator)
observer.on_completed()
source.subscribe(on_next, on_error, on_completed)
return Observable.create(on_subscribe)
numbers = Observable.from_([2,2,4,5,2])
sum_even(numbers).subscribe(
on_next=lambda i: print("item: {}".format(i)),
on_error=lambda e: print("error: {}".format(e)),
on_completed=lambda: print("completed")
)
The preceding code provides the following output:
The two items received correspond to the sum of 2, 2, 4, and the sum of 5 and 2. The completion is correctly received after these two items.
- Containerization with LXC
- Puppet實戰
- Arch Linux Environment Setup How-to
- WindowsServer2012Hyper-V虛擬化部署與管理指南
- Extending Bootstrap
- Docker+Kubernetes應用開發與快速上云
- 嵌入式實時操作系統μC/OS原理與實踐
- 一學就會:Windows Vista應用完全自學手冊
- Hands-On GPU Programming with Python and CUDA
- Linux軟件管理平臺設計與實現
- UI設計手繪表現從入門到精通
- 大學計算機應用基礎實踐教程(Windows 7+MS Office 2010)
- Linux從入門到精通(視頻教學版)
- Linux內核修煉之道
- Microsoft Hyper-V Cluster Design