Python響應式類庫RxPy簡介

RxPy是很是流行的響應式框架Reactive X的Python版本,其實這些版本都是同樣的,只不過是各個語言的實現不一樣而已。所以,若是學會了其中一種,那麼使用其餘的響應式版本也是垂手可得的。以前我就據說過這個框架,最近決定好好研究一下。編程

基本概念

Reactive X中有幾個核心的概念,先來簡單介紹一下。設計模式

Observable和Observer(可觀察對象和觀察者)

首先是Observable和Observer,它們分別是可觀察對象和觀察者。Observable能夠理解爲一個異步的數據源,會發送一系列的值。Observer則相似於消費者,須要先訂閱Observable,而後才能夠接收到其發射的值。能夠說這組概念是設計模式中的觀察者模式和生產者-消費者模式的綜合體。緩存

Operator(操做符)

另一個很是重要的概念就是操做符了。操做符做用於Observable的數據流上,能夠對其施加各類各樣的操做。更重要的是,操做符還能夠鏈式組合起來。這樣的鏈式函數調用不只將數據和操做分隔開來,並且代碼更加清晰可讀。一旦熟練掌握以後,你就會愛上這種感受的。多線程

Single(單例)

在RxJava和其變體中,還有一個比較特殊的概念叫作Single,它是一種只會發射同一個值的Observable,說白了就是單例。固然若是你對Java等語言比較熟悉,那麼單例想必也很熟悉。框架

Subject(主體)

主體這個概念很是特殊,它既是Observable又是Observer。正是由於這個特色,因此Subject能夠訂閱其餘Observable,也能夠將發射對象給其餘Observer。在某些場景中,Subject會有很大的做用。dom

Scheduler(調度器)

默認狀況下Reactive X只運行在當前線程下,可是若是有須要的話,也能夠用調度器來讓Reactive X運行在多線程環境下。有不少調度器和對應的操做符,能夠處理多線程場景下的各類要求。異步

Observer和Observable

先來看看一個最簡單的例子,運行的結果會依次打印這些數字。這裏的of是一個操做符,能夠根據給定的參數建立一個新的Observable。建立以後,就能夠訂閱Observable,三個回調方法在對應的時機執行。一旦Observer訂閱了Observable,就會接收到後續Observable發射的各項值。函數式編程

from rx import of

ob = of(1, 2, 34, 5, 6, 7, 7)
ob.subscribe(
    on_next=lambda i: print(f'Received: {i}'),
    on_error=lambda e: print(f'Error: {e}'),
    on_completed=lambda: print('Completed')

)

這個例子看起來好像很簡單,而且看起來沒什麼用。可是當你瞭解了Rx的一些核心概念,就會理解到這是一個多麼強大的工具。更重要的是,Observable生成數據和訂閱的過程是異步的,若是你熟悉的話,就能夠利用這個特性作不少事情。函數

操做符

在RxPy中另外一個很是重要的概念就是操做符了,甚至能夠說操做符就是最重要的一個概念了。幾乎全部的功能均可以經過組合各個操做符來實現。熟練掌握操做符就是學好RxPy的關鍵了。操做符之間也能夠用pipe函數鏈接起來,構成複雜的操做鏈。工具

from rx import of, operators as op
import rx

ob = of(1, 2, 34, 5, 6, 7, 7)
ob.pipe(
    op.map(lambda i: i ** 2),
    op.filter(lambda i: i >= 10)
).subscribe(lambda i: print(f'Received: {i}'))

在RxPy中有大量操做符,能夠完成各類各樣的功能。咱們來簡單看看其中一些經常使用的操做符。若是你熟悉Java8的流類庫或者其餘函數式編程類庫的話,應該對這些操做符感到很是親切。

建立型操做符

首先是建立Observable的操做符,列舉了一些比較經常使用的建立型操做符。

操做符 做用
just(n) 只包含1個值的Observable
repeated_value(v,n) 重複n次值爲v的Observable
of(a,b,c,d) 包含全部參數的Observable
empty() 一個空的Observable
from_iterable(iter) 用iterable建立一個Observable
generate(0, lambda x: x < 10, lambda x: x + 1) 用初始值和循環條件生成Observable
interval(n) 以n秒爲間隔定時發送整數序列的Observable

過濾型操做符

過濾型操做符的主要做用是對Observable進行篩選和過濾。

操做符 做用
debounce 按時間間隔過濾,在範圍內的值會被忽略
distinct 忽略重複的值
elementAt 只發射第n位的值
filter 按條件過濾值
first/last 發射首/尾值
skip 跳過前n個值
take 只取前n個值

轉換型操做符

操做符 做用
flatMap 轉換多個Observable的值並將它們合併爲一個Observable
groupBy 對值進行分組,返回多個Observable
map 將Observable映射爲另外一個Observable
scan 將函數應用到Observable的每一個值上,而後返回後面的值

算術操做符

操做符 做用
average 平均數
count 個數
max 最大值
min 最小值
reduce 將函數應用到每一個值上,而後返回最終的計算結果
sum 求和

Subject

Subject是一種特殊的對象,它既是Observer又是Observable。不過這個對象通常不太經常使用,可是假如某些用途仍是頗有用的。因此仍是要介紹一下。下面的代碼,由於訂閱的時候第一個值已經發射出去了,因此只會打印訂閱以後才發射的值。

from rx.subject import Subject, AsyncSubject, BehaviorSubject, ReplaySubject

# Subject同時是Observer和Observable

print('--------Subject---------')
subject = Subject()
subject.on_next(1)
subject.subscribe(lambda i: print(i))
subject.on_next(2)
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 2 3 4

另外還有幾個特殊的Subject,下面來介紹一下。

ReplaySubject

ReplaySubject是一個特殊的Subject,它會記錄全部發射過的值,不論何時訂閱的。因此它能夠用來當作緩存來使用。ReplaySubject還能夠接受一個bufferSize參數,指定能夠緩存的最近數據數,默認狀況下是所有。

下面的代碼和上面的代碼幾乎徹底同樣,可是由於使用了ReplaySubject,因此全部的值都會被打印。固然你們也能夠試試把訂閱語句放到其餘位置,看看輸出是否會產生變化。

# ReplaySubject會緩存全部值,若是指定參數的話只會緩存最近的幾個值
print('--------ReplaySubject---------')
subject = ReplaySubject()
subject.on_next(1)
subject.subscribe(lambda i: print(i))
subject.on_next(2)
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 1 2 3 4

BehaviorSubject

BehaviorSubject是一個特殊的Subject,它只會記錄最近一次發射的值。並且在建立它的時候,必須指定一個初始值,全部訂閱它的對象均可以接收到這個初始值。固然若是訂閱的晚了,這個初始值一樣會被後面發射的值覆蓋,這一點要注意。

# BehaviorSubject會緩存上次發射的值,除非Observable已經關閉
print('--------BehaviorSubject---------')
subject = BehaviorSubject(0)
subject.on_next(1)
subject.on_next(2)
subject.subscribe(lambda i: print(i))
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 2 3 4

AsyncSubject

AsyncSubject是一個特殊的Subject,顧名思義它是一個異步的Subject,它只會在Observer完成的時候發射數據,並且只會發射最後一個數據。所以下面的代碼僅僅會輸出4.假如註釋掉最後一行co_completed調用,那麼什麼也不會輸出。

# AsyncSubject會緩存上次發射的值,並且僅會在Observable關閉後開始發射
print('--------AsyncSubject---------')
subject = AsyncSubject()
subject.on_next(1)
subject.on_next(2)
subject.subscribe(lambda i: print(i))
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 4

Scheduler

雖然RxPy算是異步的框架,可是其實它默認仍是運行在單個線程之上的,所以若是使用了某些會阻礙線程運行的操做,那麼程序就會卡死。固然針對這些狀況,咱們就可使用其餘的Scheduler來調度任務,保證程序可以高效運行。

下面的例子建立了一個ThreadPoolScheduler,它是基於線程池的調度器。兩個Observable用subscribe_on方法指定了調度器,所以它們會使用不一樣的線程來工做。

import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as op

import multiprocessing
import time
import threading
import random


def long_work(value):
    time.sleep(random.randint(5, 20) / 10)
    return value


pool_schedular = ThreadPoolScheduler(multiprocessing.cpu_count())

rx.range(5).pipe(
    op.map(lambda i: long_work(i + 1)),
    op.subscribe_on(pool_schedular)
).subscribe(lambda i: print(f'Work 1: {threading.current_thread().name}, {i}'))

rx.of(1, 2, 3, 4, 5).pipe(
    op.map(lambda i: i * 2),
    op.subscribe_on(pool_schedular)
).subscribe(lambda i: print(f'Work 2: {threading.current_thread().name}, {i}'))

若是你觀察過各個操做符的API的話,能夠發現大部分操做符都支持可選的Scheduler參數,爲操做符指定一個調度器。若是操做符上指定了調度器的話,會優先使用這個調度器;其次的話,會使用subscribe方法上指定的調度器;若是以上都沒有指定的話,就會使用默認的調度器。

應用場景

好了,介紹了一些Reactive X的知識以後,下面來看看如何來使用Reactive X。在不少應用場景下,均可以利用Reactive X來抽象數據處理,把概念簡單化。

防止重複發送

不少狀況下咱們都須要控制事件的發生間隔,好比有一個按鈕不當心按了好幾回,只但願第一次按鈕生效。這種狀況下可使用debounce操做符,它會過濾Observable,小於指定時間間隔的數據會被過濾掉。debounce操做符會等待一段時間,直到過了間隔時間,纔會發射最後一次的數據。若是想要過濾後面的數據,發送第一次的數據,則要使用throttle_first操做符。

下面的代碼能夠比較好的演示這個操做符,快速按回車鍵發送數據,注意觀察按鍵和數據顯示之間的關係,還能夠把throttle_first操做符換成debounce操做符,而後再看看輸出會發生什麼變化,還能夠徹底註釋掉pipe中的操做符,再看看輸出會有什麼變化。

import rx
from rx import operators as op
from rx.subject import Subject
import datetime

# debounce操做符,僅在時間間隔以外的能夠發射

ob = Subject()
ob.pipe(
    op.throttle_first(3)
    # op.debounce(3)
).subscribe(
    on_next=lambda i: print(i),
    on_completed=lambda: print('Completed')
)

print('press enter to print, press other key to exit')
while True:
    s = input()
    if s == '':
        ob.on_next(datetime.datetime.now().time())
    else:
        ob.on_completed()
        break

操做數據流

若是須要對一些數據進行操做,那麼一樣有一大堆操做符能夠知足需求。固然這部分功能並非Reactive X獨有的,若是你對Java 8的流類庫有所瞭解,會發現這二者這方面的功能幾乎是徹底同樣的。

下面是個簡單的例子,將兩個數據源結合起來,而後找出來其中全部的偶數。

import rx
from rx import operators as op
from rx.subject import Subject
import datetime

# 操做數據流
some_data = rx.of(1, 2, 3, 4, 5, 6, 7, 8)
some_data2 = rx.from_iterable(range(10, 20))
some_data.pipe(
    op.merge(some_data2),
    op.filter(lambda i: i % 2 == 0),
    # op.map(lambda i: i * 2)
).subscribe(lambda i: print(i))

再或者一個利用reduce的簡單例子,求1-100的整數和。

import rx
from rx import operators as op
from rx.subject import Subject
import datetime

rx.range(1, 101).pipe(
    op.reduce(lambda acc, i: acc + i, 0)
).subscribe(lambda i: print(i))
相關文章
相關標籤/搜索