利用python yielding建立協程將異步編程同步化

轉自:http://www.jackyshen.com/2015/05/21/async-operations-in-form-of-sync-programming-with-python-yielding/

目錄

  • 回顧同步與異步編程
  • 回顧多線程編程
  • yield與協程
  • 異步編程同步化

回顧同步與異步編程

同步編程即線性化編程,代碼按照既定順序執行,上一條語句執行完纔會執行下一條,不然就一直等在那裏。
可是許多實際操做都是CPU 密集型任務和 IO 密集型任務,好比網絡請求,此時不能讓這些任務阻塞主線程的工做,因而就會採用異步編程。html

異步的標準元素就是回調函數(Callback, 後來衍生出Promise/Deferred概念),主線程發起一個異步任務,讓其本身到一邊去工做,當其完成後,會經過執行預先指定的回調函數完成後續任務,而後返回主線程。在異步任務執行過程當中,主線程無需等待和阻塞,能夠繼續處理其餘任務。python

下例你們並不陌生,是jQuery標準發送http異步請求的方式。程序員

1
2
3
4
5
6
7
$.ajax({
url:"/echo/json/",
success: function(response)
{
console.info(response.name);
}
});

而併發的核心思想在於,大的任務能夠分解成一系列的子任務,後者能夠被調度成 同時執行或異步執行,而不是一次一個地或者同步地執行。兩個子任務之間的 切換也就是上下文切換。ajax

回顧多線程編程

當主線程發起異步任務,這個任務跑到哪裏去工做了呢?這就說到多線程(包括多進程)編程,一個主線程能夠主動建立多個子線程,而後將任務交給子線程,每一個子線程擁有本身的堆棧空間。操做系統能夠經過分時的方式讓同一個CPU輪流調度各個線程,編程人員無需關心操做系統是如何工做的。編程

可是若是須要在多個線程之間通訊,則須要編程人員本身寫代碼來控制線程之間的協做(利用鎖或信號量)以及通訊(利用管道、隊列等)。json

經典的Producer-Consumer問題

這個問題說的是有兩方進行通訊和協做,一方只負責生產內容,另外一方只負責消費內容。消費者並不知道,也無需知道生產者什麼時候生產,只是當有內容生產出來負責消費便可,沒有內容時就等待。這是一個經典的異步問題。網絡

Threading/Queue方案

傳統的解決方案便是採用多線程來實現,生產者和消費者分別處於不一樣的線程或進程中,由操做系統進行調度。來看一篇經典的多線程教程中的例子,是否是很像Java風格?—囉嗦。多線程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import threading
import time
import logging
import random
import Queue

logging.basicConfig(level=logging.DEBUG,
format='(%(threadName)-9s) %(message)s',)

BUF_SIZE = 10
q = Queue.Queue(BUF_SIZE)

class ProducerThread(threading.Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, verbose=None):
super(ProducerThread,self).__init__()
self.target = target
self.name = name

def run(self):
while True:
if not q.full():
item = random.randint(1,10)
q.put(item)
logging.debug('Putting ' + str(item)
+ ' : ' + str(q.qsize()) + ' items in queue')
time.sleep(random.random())
return

class ConsumerThread(threading.Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, verbose=None):
super(ConsumerThread,self).__init__()
self.target = target
self.name = name
return

def run(self):
while True:
if not q.empty():
item = q.get()
logging.debug('Getting ' + str(item)
+ ' : ' + str(q.qsize()) + ' items in queue')
time.sleep(random.random())
return

if __name__ == '__main__':

p = ProducerThread(name='producer')
c = ConsumerThread(name='consumer')

p.start()
time.sleep(2)
c.start()
time.sleep(2)

MessageQueue方案

基於多線程方案,這個問題已經演變成消息中介模式(有些公司喜歡稱之爲」郵局」),有各類的商業MQ方案能夠直接使用。併發

這裏以RabbitMQ開源方案爲例,Producer一方向名爲隊列中發送」Hello World!」內容,而Consumer一方則監聽隊列,當有內容進入隊列時,就執行callback函數來收取並處理內容。發送與收取的動做是異步執行的,互不干擾。app

1
2
3
4
5
6
7
8
9
10
11
12
13
14
###### Producer ########

import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
####### Consumer ########

import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)

channel.basic_consume(callback,
queue='hello',
no_ack=True)

channel.start_consuming()

yield與協程

何爲協程(Coroutine)及yield

python採用了GIL(Global Interpretor Lock,全局解釋器鎖),默認全部任務都是在同一進程中執行的。(固然,能夠藉助多進程多線程來實現並行化。)咱們調用一個普通的Python函數時,通常是從函數的第一行代碼開始執行,結束於return語句、異常或者函數結束(能夠看做隱式的返回None)。一旦函數將控制權交還給調用者,就意味着所有結束。函數中作的全部工做以及保存在局部變量中的數據都將丟失。再次調用這個函數時,一切都將從頭建立。

所謂協程(Coroutine)就是在同一進程/線程中,利用生成器(generator)來」同時」執行多個函數(routine)。

Python的中yield關鍵字與Coroutine說的是一件事情,先看看yield的基本用法。

任何包含yield關鍵字的函數都會自動成爲生成器(generator)對象,裏面的代碼通常是一個有限或無限循環結構,每當第一次調用該函數時,會執行到yield代碼爲止並返回本次迭代結果,yield指令起到的是return關鍵字的做用。而後函數的堆棧會自動凍結(freeze)在這一行。當函數調用者的下一次利用next()或generator.send()或for-in來再次調用該函數時,就會從yield代碼的下一行開始,繼續執行,再返回下一次迭代結果。經過這種方式,迭代器能夠實現無限序列和惰性求值。

看一個用生成器來計算100之內斐波那契數列的例子。咱們先用普通遞歸方式來進行計算。

1
2
3
4
a = b = 1
while a < 100:
a, b = b, a + b
print a,

再來用yield和生成器來計算斐波那契數列,該函數造成一個無限循環的生成器,由函數調用者顯式地控制迭代次數。

1
2
3
4
5
6
7
8
9
10
11
12
13
def fibonacci():
a = b = 1
yield a
yield b
while True:
a, b = b, a+b
yield b

num = 0
fib = fibonacci()
while num < 100:
num = next(fib)
print num,

總而言之,生成器(以及yield語句)最初的引入是爲了讓程序員能夠更簡單的編寫用來產生值的序列的代碼。 之前,要實現相似隨機數生成器的東西,須要實現一個類或者一個模塊,在生成數據的同時保持對每次調用之間狀態的跟蹤。引入生成器以後,這變得很是簡單。

  • yield則像是generator函數的返回結果
  • yield惟一所作的另外一件事就是保存一個generator函數的狀態
  • generator就是一個特殊類型的迭代器(iterator)
  • 和迭代器類似,咱們能夠經過使用next()來從generator中獲取下一個值
  • 經過隱式地調用next()來忽略一些值

用yield實現協程調度的原理

咱們如今利用yield關鍵字會自動凍結函數堆棧的特性,想象一下,假如如今有兩個函數f1()和f2(),各自包含yield語句,見下例。主線程先啓動f1(), 當f1()執行到yield的時候,暫時返回。這時主線程能夠將執行權交給f2(),執行到f2()的yield後,能夠再將執行權交給f1(),從而實現了在同一線程中交錯執行f1()和f2()。f1()與f2()就是協同執行的程序,故名協程。

咱們嘗試用yield創建協程,來解決Producer-Consumer問題。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# -*- coding: utf-8 -*-
import random

def get_data():
"""返回0到9之間的3個隨機數,模擬異步操做"""
return random.sample(range(10), 3)

def consume():
"""顯示每次傳入的整數列表的動態平均值"""
running_sum = 0
data_items_seen = 0

while True:
print('Waiting to consume')
data = yield
data_items_seen += len(data)
running_sum += sum(data)
print('Consumed, the running average is {}'.format(running_sum / float(data_items_seen)))

def produce(consumer):
"""產生序列集合,傳遞給消費函數(consumer)"""
while True:
data = get_data()
print('Produced {}'.format(data))
consumer.send(data)
yield

if __name__ == '__main__':
consumer = consume()
consumer.send(None)
producer = produce(consumer)

for _ in range(10):
print('Producing...')
next(producer)

下圖將控制流形象化,以說明上下文切換如何發生。

 

在任什麼時候刻,只有一個協程在運行。

異步編程同步化

再也不須要回調

看一下Python官方的例子,利用一個@gen.coroutine裝飾器來簡化代碼編寫,本來調用-回調兩段邏輯,如今被放在了一塊兒,yield充當了回調的入口。這就是異步編程同步化

原始的回調編程模式:

1
2
3
4
5
6
7
8
9
10
class AsyncHandler(RequestHandler):
@asynchronous
def get(self):
http_client = AsyncHTTPClient()
http_client.fetch("http://example.com",
callback=self.on_fetch)

def on_fetch(self, response):
do_something_with_response(response)
self.render("template.html")

 

同步化編程後的結果:

1
2
3
4
5
6
7
class GenAsyncHandler(RequestHandler):
@gen.coroutine
def get(self):
http_client = AsyncHTTPClient()
response = yield http_client.fetch("http://example.com")
do_something_with_response(response)
self.render("template.html")

關於這個裝飾器的實現方式,能夠參見http://my.oschina.net/u/877348/blog/184058

Gevent與Greenlet庫

看了上述代碼,你是否是以爲利用協程就能夠將併發編程所有同步化了?錯!
仔細想一想,即便用了協程,同一時間仍然只能有一段代碼獲得執行,此時若是有同步的I/O任務,則仍會存在阻塞想象。除非…除非將I/O任務自動併發掉,纔有可能真正利用協程來將大量異步併發任務同步化!注意這裏的http_client是異步網絡庫,非同步阻塞庫。通常是須要回調,但利用協程對get()函數同步化之後,當執行到yield時,至關於發出了多個網絡請求,而後掛起這個get()函數,其餘協程將獲得調度。當異步網絡請求都已返回且協程調度有空閒時,會調用get.send(),繼續這個協程,以同步化編程的方式繼續完成原先放在回調函數中的邏輯。上例中網絡請求若是採用普通的urllib.urlopen()就不行了。

慢着,若是urllib.urlopen()可以異步執行,那不就好了?

這就是Greenlet庫所作的,它是以C擴展模塊形式接入Python的輕量級協程,將一些本來同步運行的網絡庫以mockey_patch的方式進行了重寫。Greenlets所有運行在主程序操做系統進程的內部,但它們被協做式地調度。

而Gevent庫則是基於Greenlet,實現了協程調度功能。將多個函數spawn爲協程,而後join到一塊兒,如此簡單!

看一個Gevent的官方例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import gevent.monkey
gevent.monkey.patch_socket()

import gevent
import urllib2
import simplejson as json

def fetch(pid):
response = urllib2.urlopen('http://json-time.appspot.com/time.json')
result = response.read()
json_result = json.loads(result)
datetime = json_result['datetime']

print('Process %s: %s' % (pid, datetime))
return json_result['datetime']

def synchronous():
for i in range(1,10):
fetch(i)

def asynchronous():
threads = []
for i in range(1,10):
threads.append(gevent.spawn(fetch, i))
gevent.joinall(threads)

print('Synchronous:')
synchronous()

print('Asynchronous:')
asynchronous()

multiprocessing.dummy.ThreadPool庫

實現異步編程同步化還有一個方法,就是利用的map()函數。這個函數咱們並不陌生,它能夠在一個序列上實現某個函數之間的映射。

1
results = map(urllib2.urlopen, ['http://www.yahoo.com', 'http://www.reddit.com'])

上述代碼對會依次訪問每一個url,不過由於只有一個進程,後一個urlopen仍然須要等待前一個urlopen完成後纔會進行,仍然是一種串行的方式。可是,只要藉助正確的庫,map()也能夠輕鬆實現並行化操做,那就是multiprocessing庫。

這個庫以及其不爲人知的子庫multiprocessing.dummy,一個用於多進程,一個用於多線程。後者提供改良的map()函數,能夠自動將多個異步任務,分配到多個線程上,編程人員無需關注,也就天然地把異步編程轉爲了同步編程的風格。IO 密集型任務選擇multiprocessing.dummy,CPU 密集型任務選擇multiprocessing。

前述那個教科書式的例子,能夠改寫爲

1
2
3
4
5
6
7
8
9
10
import urllib2 
from multiprocessing.dummy import Pool ThreadPool
urls = [ 'http://www.python.org', 'http://www.python.org/about/', 'http://www.python.org/doc/', 'http://www.python.org/download/']
# Make the Pool of workers
pool = ThreadPool()
# Open the urls in their own threads and return the results
results = pool.map(urllib2.urlopen, urls)
#close the pool and wait for the work to finish
pool.close()
pool.join()

 

關於map()函數和yield關鍵字的解釋,請參考 @申導 的另外一篇文章《Python函數式編程》

相關文章
相關標籤/搜索