RxPy是很是流行的響應式框架Reactive X的Python版本,其實這些版本都是同樣的,只不過是各個語言的實現不一樣而已。所以,若是學會了其中一種,那麼使用其餘的響應式版本也是垂手可得的。以前我就據說過這個框架,最近決定好好研究一下。編程
Reactive X中有幾個核心的概念,先來簡單介紹一下。設計模式
首先是Observable和Observer,它們分別是可觀察對象和觀察者。Observable能夠理解爲一個異步的數據源,會發送一系列的值。Observer則相似於消費者,須要先訂閱Observable,而後才能夠接收到其發射的值。能夠說這組概念是設計模式中的觀察者模式和生產者-消費者模式的綜合體。緩存
另一個很是重要的概念就是操做符了。操做符做用於Observable的數據流上,能夠對其施加各類各樣的操做。更重要的是,操做符還能夠鏈式組合起來。這樣的鏈式函數調用不只將數據和操做分隔開來,並且代碼更加清晰可讀。一旦熟練掌握以後,你就會愛上這種感受的。多線程
在RxJava和其變體中,還有一個比較特殊的概念叫作Single,它是一種只會發射同一個值的Observable,說白了就是單例。固然若是你對Java等語言比較熟悉,那麼單例想必也很熟悉。框架
主體這個概念很是特殊,它既是Observable又是Observer。正是由於這個特色,因此Subject能夠訂閱其餘Observable,也能夠將發射對象給其餘Observer。在某些場景中,Subject會有很大的做用。dom
默認狀況下Reactive X只運行在當前線程下,可是若是有須要的話,也能夠用調度器來讓Reactive X運行在多線程環境下。有不少調度器和對應的操做符,能夠處理多線程場景下的各類要求。異步
先來看看一個最簡單的例子,運行的結果會依次打印這些數字。這裏的of是一個操做符,能夠根據給定的參數建立一個新的Observable。建立以後,就能夠訂閱Observable,三個回調方法在對應的時機執行。一旦Observer訂閱了Observable,就會接收到後續Observable發射的各項值。函數式編程
from rx import of ob = of(1, 2, 34, 5, 6, 7, 7) ob.subscribe( on_next=lambda i: print(f'Received: {i}'), on_error=lambda e: print(f'Error: {e}'), on_completed=lambda: print('Completed') )
這個例子看起來好像很簡單,而且看起來沒什麼用。可是當你瞭解了Rx的一些核心概念,就會理解到這是一個多麼強大的工具。更重要的是,Observable生成數據和訂閱的過程是異步的,若是你熟悉的話,就能夠利用這個特性作不少事情。函數
在RxPy中另外一個很是重要的概念就是操做符了,甚至能夠說操做符就是最重要的一個概念了。幾乎全部的功能均可以經過組合各個操做符來實現。熟練掌握操做符就是學好RxPy的關鍵了。操做符之間也能夠用pipe函數鏈接起來,構成複雜的操做鏈。工具
from rx import of, operators as op import rx ob = of(1, 2, 34, 5, 6, 7, 7) ob.pipe( op.map(lambda i: i ** 2), op.filter(lambda i: i >= 10) ).subscribe(lambda i: print(f'Received: {i}'))
在RxPy中有大量操做符,能夠完成各類各樣的功能。咱們來簡單看看其中一些經常使用的操做符。若是你熟悉Java8的流類庫或者其餘函數式編程類庫的話,應該對這些操做符感到很是親切。
首先是建立Observable的操做符,列舉了一些比較經常使用的建立型操做符。
操做符 | 做用 | |
---|---|---|
just(n) | 只包含1個值的Observable | |
repeated_value(v,n) | 重複n次值爲v的Observable | |
of(a,b,c,d) | 包含全部參數的Observable | |
empty() | 一個空的Observable | |
from_iterable(iter) | 用iterable建立一個Observable | |
generate(0, lambda x: x < 10, lambda x: x + 1) | 用初始值和循環條件生成Observable | |
interval(n) | 以n秒爲間隔定時發送整數序列的Observable |
過濾型操做符的主要做用是對Observable進行篩選和過濾。
操做符 | 做用 |
---|---|
debounce | 按時間間隔過濾,在範圍內的值會被忽略 |
distinct | 忽略重複的值 |
elementAt | 只發射第n位的值 |
filter | 按條件過濾值 |
first/last | 發射首/尾值 |
skip | 跳過前n個值 |
take | 只取前n個值 |
操做符 | 做用 |
---|---|
flatMap | 轉換多個Observable的值並將它們合併爲一個Observable |
groupBy | 對值進行分組,返回多個Observable |
map | 將Observable映射爲另外一個Observable |
scan | 將函數應用到Observable的每一個值上,而後返回後面的值 |
操做符 | 做用 |
---|---|
average | 平均數 |
count | 個數 |
max | 最大值 |
min | 最小值 |
reduce | 將函數應用到每一個值上,而後返回最終的計算結果 |
sum | 求和 |
Subject是一種特殊的對象,它既是Observer又是Observable。不過這個對象通常不太經常使用,可是假如某些用途仍是頗有用的。因此仍是要介紹一下。下面的代碼,由於訂閱的時候第一個值已經發射出去了,因此只會打印訂閱以後才發射的值。
from rx.subject import Subject, AsyncSubject, BehaviorSubject, ReplaySubject # Subject同時是Observer和Observable print('--------Subject---------') subject = Subject() subject.on_next(1) subject.subscribe(lambda i: print(i)) subject.on_next(2) subject.on_next(3) subject.on_next(4) subject.on_completed() # 2 3 4
另外還有幾個特殊的Subject,下面來介紹一下。
ReplaySubject是一個特殊的Subject,它會記錄全部發射過的值,不論何時訂閱的。因此它能夠用來當作緩存來使用。ReplaySubject還能夠接受一個bufferSize參數,指定能夠緩存的最近數據數,默認狀況下是所有。
下面的代碼和上面的代碼幾乎徹底同樣,可是由於使用了ReplaySubject,因此全部的值都會被打印。固然你們也能夠試試把訂閱語句放到其餘位置,看看輸出是否會產生變化。
# ReplaySubject會緩存全部值,若是指定參數的話只會緩存最近的幾個值 print('--------ReplaySubject---------') subject = ReplaySubject() subject.on_next(1) subject.subscribe(lambda i: print(i)) subject.on_next(2) subject.on_next(3) subject.on_next(4) subject.on_completed() # 1 2 3 4
BehaviorSubject是一個特殊的Subject,它只會記錄最近一次發射的值。並且在建立它的時候,必須指定一個初始值,全部訂閱它的對象均可以接收到這個初始值。固然若是訂閱的晚了,這個初始值一樣會被後面發射的值覆蓋,這一點要注意。
# BehaviorSubject會緩存上次發射的值,除非Observable已經關閉 print('--------BehaviorSubject---------') subject = BehaviorSubject(0) subject.on_next(1) subject.on_next(2) subject.subscribe(lambda i: print(i)) subject.on_next(3) subject.on_next(4) subject.on_completed() # 2 3 4
AsyncSubject是一個特殊的Subject,顧名思義它是一個異步的Subject,它只會在Observer完成的時候發射數據,並且只會發射最後一個數據。所以下面的代碼僅僅會輸出4.假如註釋掉最後一行co_completed調用,那麼什麼也不會輸出。
# AsyncSubject會緩存上次發射的值,並且僅會在Observable關閉後開始發射 print('--------AsyncSubject---------') subject = AsyncSubject() subject.on_next(1) subject.on_next(2) subject.subscribe(lambda i: print(i)) subject.on_next(3) subject.on_next(4) subject.on_completed() # 4
雖然RxPy算是異步的框架,可是其實它默認仍是運行在單個線程之上的,所以若是使用了某些會阻礙線程運行的操做,那麼程序就會卡死。固然針對這些狀況,咱們就可使用其餘的Scheduler來調度任務,保證程序可以高效運行。
下面的例子建立了一個ThreadPoolScheduler,它是基於線程池的調度器。兩個Observable用subscribe_on方法指定了調度器,所以它們會使用不一樣的線程來工做。
import rx from rx.scheduler import ThreadPoolScheduler from rx import operators as op import multiprocessing import time import threading import random def long_work(value): time.sleep(random.randint(5, 20) / 10) return value pool_schedular = ThreadPoolScheduler(multiprocessing.cpu_count()) rx.range(5).pipe( op.map(lambda i: long_work(i + 1)), op.subscribe_on(pool_schedular) ).subscribe(lambda i: print(f'Work 1: {threading.current_thread().name}, {i}')) rx.of(1, 2, 3, 4, 5).pipe( op.map(lambda i: i * 2), op.subscribe_on(pool_schedular) ).subscribe(lambda i: print(f'Work 2: {threading.current_thread().name}, {i}'))
若是你觀察過各個操做符的API的話,能夠發現大部分操做符都支持可選的Scheduler參數,爲操做符指定一個調度器。若是操做符上指定了調度器的話,會優先使用這個調度器;其次的話,會使用subscribe方法上指定的調度器;若是以上都沒有指定的話,就會使用默認的調度器。
好了,介紹了一些Reactive X的知識以後,下面來看看如何來使用Reactive X。在不少應用場景下,均可以利用Reactive X來抽象數據處理,把概念簡單化。
不少狀況下咱們都須要控制事件的發生間隔,好比有一個按鈕不當心按了好幾回,只但願第一次按鈕生效。這種狀況下可使用debounce操做符,它會過濾Observable,小於指定時間間隔的數據會被過濾掉。debounce操做符會等待一段時間,直到過了間隔時間,纔會發射最後一次的數據。若是想要過濾後面的數據,發送第一次的數據,則要使用throttle_first操做符。
下面的代碼能夠比較好的演示這個操做符,快速按回車鍵發送數據,注意觀察按鍵和數據顯示之間的關係,還能夠把throttle_first操做符換成debounce操做符,而後再看看輸出會發生什麼變化,還能夠徹底註釋掉pipe中的操做符,再看看輸出會有什麼變化。
import rx from rx import operators as op from rx.subject import Subject import datetime # debounce操做符,僅在時間間隔以外的能夠發射 ob = Subject() ob.pipe( op.throttle_first(3) # op.debounce(3) ).subscribe( on_next=lambda i: print(i), on_completed=lambda: print('Completed') ) print('press enter to print, press other key to exit') while True: s = input() if s == '': ob.on_next(datetime.datetime.now().time()) else: ob.on_completed() break
若是須要對一些數據進行操做,那麼一樣有一大堆操做符能夠知足需求。固然這部分功能並非Reactive X獨有的,若是你對Java 8的流類庫有所瞭解,會發現這二者這方面的功能幾乎是徹底同樣的。
下面是個簡單的例子,將兩個數據源結合起來,而後找出來其中全部的偶數。
import rx from rx import operators as op from rx.subject import Subject import datetime # 操做數據流 some_data = rx.of(1, 2, 3, 4, 5, 6, 7, 8) some_data2 = rx.from_iterable(range(10, 20)) some_data.pipe( op.merge(some_data2), op.filter(lambda i: i % 2 == 0), # op.map(lambda i: i * 2) ).subscribe(lambda i: print(i))
再或者一個利用reduce的簡單例子,求1-100的整數和。
import rx from rx import operators as op from rx.subject import Subject import datetime rx.range(1, 101).pipe( op.reduce(lambda acc, i: acc + i, 0) ).subscribe(lambda i: print(i))