[若是代碼顯示有問題,請點擊閱讀原文]服務器
經過本文你會知道Python裏面何時用yield最合適。本文不會給你講生成器是什麼,因此你須要先了解Python的yield,再來看本文。多線程
疑惑app
多年之前,當我剛剛開始學習Python協程的時候,我看到絕大多數的文章都舉了一個生產者-消費者的例子,用來表示在生產者內部能夠隨時調用消費者,達到和多線程相同的效果。這裏憑記憶簡單還原一下當年我看到的代碼:ide
import time def consumer(): product = None while True: if product is not None: print('consumer: {}'.format(product)) product = yield None def producer(): c = consumer() next(c) for i in range(10): c.send(i) start = time.time() producer() end = time.time() print(f'直到把全部數據塞入Kafka,一共耗時:{end - start}秒')
運行效果以下圖所示。函數
這些文章的說法,就像統一好了口徑同樣,說這樣寫能夠減小線程切換開銷,從而大大提升程序的運行效率。可是當年我始終想不明白,這種寫法與直接調用函數有什麼區別,以下圖所示。學習
直到後來我須要操做Kafka的時候,我明白了使用yield的好處。線程
探索設計
爲了便於理解,我會把實際場景作一些簡化,以方便說明事件的產生髮展和解決過程。事件的原由是我須要把一些信息寫入到Kafka中,個人代碼一開始是這樣的:3d
import time from pykafka import KafkaClient client = KafkaClient(hosts="127.0.0.1:9092") topic = client.topics[b'test'] def consumer(product): with topic.get_producer(delivery_reports=True) as producer: producer.produce(str(product).encode()) def feed(): for i in range(10): consumer(i) start = time.time() feed() end = time.time() print(f'直到把全部數據塞入Kafka,一共耗時:{end - start}秒')
這段代碼的運行效果以下圖所示。
code
寫入10條數據須要100秒,這樣的龜速顯然是有問題的。問題就出在這一句代碼:
with topic.get_producer(delivery_reports=True) as producer
得到Kafka生產者對象是一個很是耗費時間的過程,每獲取一次都須要10秒鐘才能完成。因此寫入10個數據就獲取十次生產者對象。這消耗的100秒主要就是在獲取生產者對象,而真正寫入數據的時間短到能夠忽略不計。
因爲生產者對象是能夠複用的,因而我對代碼做了一些修改:
import timefrom pykafka import KafkaClient client = KafkaClient(hosts="127.0.0.1:9092") topic = client.topics[b'test'] products = [] def consumer(product_list): with topic.get_producer(delivery_reports=True) as producer: for product in product_list: producer.produce(str(product).encode()) def feed(): for i in range(10): products.append(i) consumer(products) start = time.time() feed() end = time.time() print(f'直到把全部數據塞入Kafka,一共耗時:{end - start}秒')
首先把全部數據存放在一個列表中,最後再一次性給consumer函數。在一個Kafka生產者對象中展開列表,再把數據一條一條塞入Kafka。這樣因爲只須要獲取一次生產者對象,因此須要耗費的時間大大縮短,以下圖所示。
這種寫法在數據量小的時候是沒有問題的,但數據量一旦大起來,若是所有先放在一個列表裏面的話,服務器內存就爆了。
因而我又修改了代碼。每100條數據保存一次,並清空暫存的列表:
import timefrom pykafka import KafkaClient client = KafkaClient(hosts="127.0.0.1:9092") topic = client.topics[b'test'] def consumer(product_list): with topic.get_producer(delivery_reports=True) as producer: for product in product_list: producer.produce(str(product).encode()) def feed(): products = [] for i in range(1003): products.append(i) if len(products) >= 100: consumer(products) products = [] if products: consumer(products) start = time.time() feed() end = time.time() print(f'直到把全部數據塞入Kafka,一共耗時:{end - start}秒')
因爲最後一輪循環可能沒法湊夠100條數據,因此feed函數裏面,循環結束之後還須要判斷products列表是否爲空,若是不爲空,還要再消費一次。這樣的寫法,在上面這段代碼中,一共1003條數據,每100條數據獲取一次生產者對象,那麼須要獲取11次生產者對象,耗時至少爲110秒。
顯然,要解決這個問題,最直接的辦法就是減小獲取Kafka生產者對象的次數並最大限度複用生產者對象。若是讀者觸類旁通的能力比較強,那麼根據開關文件的兩種寫法:
# 寫法一 with open('test.txt', 'w', encoding='utf-8') as f: f.write('xxx') # 寫法二 f = open('test.txt', 'w', encoding='utf-8') f.write('xxx') f.close()
能夠推測出獲取Kafka生產者對象的另外一種寫法:
# 寫法二 producer = topic.get_producer(delivery_reports=True) producer.produce(b'xxxx') producer.close()
這樣一來,只要獲取一次生產者對象並把它做爲全局變量就能夠一直使用了。
然而,pykafka的官方文檔中使用的是第一種寫法,經過上下文管理器with來得到生產者對象。暫且不論第二種方式是否會報錯,只從寫法上來講,第二種方式必須要手動關閉對象。開發者常常會出現開了忘記關的狀況,從而致使不少問題。並且若是中間出現了異常,使用上下文管理器的第一種方式會自動關閉生產者對象,但第二種方式仍然須要開發者手動關閉。
函數VS生成器
可是若是使用第一種方式,怎麼能在一個上下文裏面接收生產者傳進來的數據呢?這個時候纔是yield派上用場的時候。
首先須要明白,使用yield之後,函數就變成了一個生成器。生成器與普通函數的不一樣之處能夠經過下面兩段代碼來進行說明:
def funciton(i): print('進入') print(i) print('結束') for i in range(5): funciton(i)
運行效果以下圖所示。
函數在被調用的時候,函數會從裏面的第一行代碼一直運行到某個return或者函數的最後一行纔會退出。
而生成器能夠從中間開始運行,從中間跳出。例以下面的代碼:
def generator(): print('進入') i = None while True: if i is not None: print(i) print('跳出') i = yield None g = generator() next(g) for i in range(5): g.send(i)
運行效果以下圖所示。
從圖中能夠看到,進入只打印了一次。代碼運行到i = yield None後就跳到外面,外面的數據能夠經過g.send(i)的形式傳進生成器,生成器內部拿到外面傳進來的數據之後繼續執行下一輪while循環,打印出被傳進來的內容,而後到i = yield None的時候又跳出。如此反覆。
因此回到最開始的Kafka問題。若是把with topic.get_producer(delivery_reports=True) as producer寫在上面這一段代碼的print('進入')這個位置上,那豈不是隻須要獲取一次Kafka生產者對象,而後就能夠一直使用了?
根據這個邏輯,設計以下代碼:
import timefrom pykafka import KafkaClient client = KafkaClient(hosts="127.0.0.1:9092") topic = client.topics[b'test'] def consumer(): with topic.get_producer(delivery_reports=True) as producer: print('init finished..') next_data = '' while True: if next_data: producer.produce(str(next_data).encode()) next_data = yield True def feed(): c = consumer() next(c) for i in range(1000): c.send(i) start = time.time() feed() end = time.time() print(f'直到把全部數據塞入Kafka,一共耗時:{end - start}秒')
這一次直接插入1000條數據,總共只須要10秒鐘,相比於每插入一次都獲取一次Kafka生產者對象的方法,效率提升了1000倍。運行效果以下圖所示。
後記``
讀者若是仔細對比第一段代碼和最後一段代碼,就會發現他們本質上是一回事。可是第一段代碼,也就是網上不少人講yield的時候舉的生產者-消費者的例子之因此會讓人以爲毫無用處,就在於他們的消費者幾乎就是秒運行,這樣看不出和函數調用的差異。而我最後這一段代碼,它的消費者分紅兩個部分,第一部分是獲取Kafka生產者對象,這個過程很是耗時;第二部分是把數據經過Kafka生產者對象插入Kafka,這一部分運行速度極快。在這種狀況下,使用生成器把這個消費者代碼分開,讓耗時長的部分只運行一次,讓耗時短的反覆運行,這樣就能體現出生成器的優點。