經過本文你會知道Python裏面何時用yield最合適。本文不會給你講生成器是什麼,因此你須要先了解Python的yield,再來看本文。python
多年之前,當我剛剛開始學習Python協程的時候,我看到絕大多數的文章都舉了一個生產者-消費者的例子,用來表示在生產者內部能夠隨時調用消費者,達到和多線程相同的效果。這裏憑記憶簡單還原一下當年我看到的代碼:服務器
import time
def consumer():
product = None
while True:
if product is not None:
print('consumer: {}'.format(product))
product = yield None
def producer():
c = consumer()
next(c)
for i in range(10):
c.send(i)
start = time.time()
producer()
end = time.time()
print(f'直到把全部數據塞入Kafka,一共耗時:{end - start}秒')
複製代碼
運行效果以下圖所示。多線程
這些文章的說法,就像統一好了口徑同樣,說這樣寫能夠減小線程切換開銷,從而大大提升程序的運行效率。可是當年我始終想不明白,這種寫法與直接調用函數有什麼區別,以下圖所示。app
直到後來我須要操做Kafka的時候,我明白了使用yield的好處。函數
爲了便於理解,我會把實際場景作一些簡化,以方便說明事件的產生髮展和解決過程。事件的原由是我須要把一些信息寫入到Kafka中,個人代碼一開始是這樣的:學習
import time
from pykafka import KafkaClient
client = KafkaClient(hosts="127.0.0.1:9092")
topic = client.topics[b'test']
def consumer(product):
with topic.get_producer(delivery_reports=True) as producer:
producer.produce(str(product).encode())
def feed():
for i in range(10):
consumer(i)
start = time.time()
feed()
end = time.time()
print(f'直到把全部數據塞入Kafka,一共耗時:{end - start}秒')
複製代碼
這段代碼的運行效果以下圖所示。spa
寫入10條數據須要100秒,這樣的龜速顯然是有問題的。問題就出在這一句代碼:線程
with topic.get_producer(delivery_reports=True) as producer
複製代碼
得到Kafka生產者對象是一個很是耗費時間的過程,每獲取一次都須要10秒鐘才能完成。因此寫入10個數據就獲取十次生產者對象。這消耗的100秒主要就是在獲取生產者對象,而真正寫入數據的時間短到能夠忽略不計。設計
因爲生產者對象是能夠複用的,因而我對代碼做了一些修改:3d
import time
from pykafka import KafkaClient
client = KafkaClient(hosts="127.0.0.1:9092")
topic = client.topics[b'test']
products = []
def consumer(product_list):
with topic.get_producer(delivery_reports=True) as producer:
for product in product_list:
producer.produce(str(product).encode())
def feed():
for i in range(10):
products.append(i)
consumer(products)
start = time.time()
feed()
end = time.time()
print(f'直到把全部數據塞入Kafka,一共耗時:{end - start}秒')
複製代碼
首先把全部數據存放在一個列表中,最後再一次性給consumer函數。在一個Kafka生產者對象中展開列表,再把數據一條一條塞入Kafka。這樣因爲只須要獲取一次生產者對象,因此須要耗費的時間大大縮短,以下圖所示。
這種寫法在數據量小的時候是沒有問題的,但數據量一旦大起來,若是所有先放在一個列表裏面的話,服務器內存就爆了。
因而我又修改了代碼。每100條數據保存一次,並清空暫存的列表:
import time
from pykafka import KafkaClient
client = KafkaClient(hosts="127.0.0.1:9092")
topic = client.topics[b'test']
def consumer(product_list):
with topic.get_producer(delivery_reports=True) as producer:
for product in product_list:
producer.produce(str(product).encode())
def feed():
products = []
for i in range(1003):
products.append(i)
if len(products) >= 100:
consumer(products)
products = []
if products:
consumer(products)
start = time.time()
feed()
end = time.time()
print(f'直到把全部數據塞入Kafka,一共耗時:{end - start}秒')
複製代碼
因爲最後一輪循環可能沒法湊夠100條數據,因此feed
函數裏面,循環結束之後還須要判斷products
列表是否爲空,若是不爲空,還要再消費一次。這樣的寫法,在上面這段代碼中,一共1003條數據,每100條數據獲取一次生產者對象,那麼須要獲取11次生產者對象,耗時至少爲110秒。
顯然,要解決這個問題,最直接的辦法就是減小獲取Kafka生產者對象的次數並最大限度複用生產者對象。若是讀者觸類旁通的能力比較強,那麼根據開關文件的兩種寫法:
# 寫法一
with open('test.txt', 'w', encoding='utf-8') as f:
f.write('xxx')
# 寫法二
f = open('test.txt', 'w', encoding='utf-8')
f.write('xxx')
f.close()
複製代碼
能夠推測出獲取Kafka生產者對象的另外一種寫法:
# 寫法二
producer = topic.get_producer(delivery_reports=True)
producer.produce(b'xxxx')
producer.close()
複製代碼
這樣一來,只要獲取一次生產者對象並把它做爲全局變量就能夠一直使用了。
然而,pykafka的官方文檔中使用的是第一種寫法,經過上下文管理器with
來得到生產者對象。暫且不論第二種方式是否會報錯,只從寫法上來講,第二種方式必須要手動關閉對象。開發者常常會出現開了忘記關的狀況,從而致使不少問題。並且若是中間出現了異常,使用上下文管理器的第一種方式會自動關閉生產者對象,但第二種方式仍然須要開發者手動關閉。
可是若是使用第一種方式,怎麼能在一個上下文裏面接收生產者傳進來的數據呢?這個時候纔是yield派上用場的時候。
首先須要明白,使用yield之後,函數就變成了一個生成器。生成器與普通函數的不一樣之處能夠經過下面兩段代碼來進行說明:
def funciton(i):
print('進入')
print(i)
print('結束')
for i in range(5):
funciton(i)
複製代碼
運行效果以下圖所示。
函數在被調用的時候,函數會從裏面的第一行代碼一直運行到某個return
或者函數的最後一行纔會退出。
而生成器能夠從中間開始運行,從中間跳出。例以下面的代碼:
def generator():
print('進入')
i = None
while True:
if i is not None:
print(i)
print('跳出')
i = yield None
g = generator()
next(g)
for i in range(5):
g.send(i)
複製代碼
運行效果以下圖所示。
從圖中能夠看到,進入
只打印了一次。代碼運行到i = yield None
後就跳到外面,外面的數據能夠經過g.send(i)
的形式傳進生成器,生成器內部拿到外面傳進來的數據之後繼續執行下一輪while
循環,打印出被傳進來的內容,而後到i = yield None
的時候又跳出。如此反覆。
因此回到最開始的Kafka問題。若是把with topic.get_producer(delivery_reports=True) as producer
寫在上面這一段代碼的print('進入')
這個位置上,那豈不是隻須要獲取一次Kafka生產者對象,而後就能夠一直使用了?
根據這個邏輯,設計以下代碼:
import time
from pykafka import KafkaClient
client = KafkaClient(hosts="127.0.0.1:9092")
topic = client.topics[b'test']
def consumer():
with topic.get_producer(delivery_reports=True) as producer:
print('init finished..')
next_data = ''
while True:
if next_data:
producer.produce(str(next_data).encode())
next_data = yield True
def feed():
c = consumer()
next(c)
for i in range(1000):
c.send(i)
start = time.time()
feed()
end = time.time()
print(f'直到把全部數據塞入Kafka,一共耗時:{end - start}秒')
複製代碼
這一次直接插入1000條數據,總共只須要10秒鐘,相比於每插入一次都獲取一次Kafka生產者對象的方法,效率提升了1000倍。運行效果以下圖所示。
讀者若是仔細對比第一段代碼和最後一段代碼,就會發現他們本質上是一回事。可是第一段代碼,也就是網上不少人講yield的時候舉的生產者-消費者的例子之因此會讓人以爲毫無用處,就在於他們的消費者幾乎就是秒運行,這樣看不出和函數調用的差異。而我最後這一段代碼,它的消費者分紅兩個部分,第一部分是獲取Kafka生產者對象,這個過程很是耗時;第二部分是把數據經過Kafka生產者對象插入Kafka,這一部分運行速度極快。在這種狀況下,使用生成器把這個消費者代碼分開,讓耗時長的部分只運行一次,讓耗時短的反覆運行,這樣就能體現出生成器的優點。
個人公衆號:未聞Code