使用生成器把Kafka寫入速度提升1000倍

使用生成器把Kafka寫入速度提升1000倍

[若是代碼顯示有問題,請點擊閱讀原文]服務器

經過本文你會知道Python裏面何時用yield最合適。本文不會給你講生成器是什麼,因此你須要先了解Python的yield,再來看本文。多線程

疑惑app


多年之前,當我剛剛開始學習Python協程的時候,我看到絕大多數的文章都舉了一個生產者-消費者的例子,用來表示在生產者內部能夠隨時調用消費者,達到和多線程相同的效果。這裏憑記憶簡單還原一下當年我看到的代碼:ide

import time
def consumer():
    product = None
    while True:
        if product is not None:
            print('consumer: {}'.format(product))
        product = yield None

def producer():
    c = consumer()
    next(c)
    for i in range(10):
        c.send(i)

start = time.time()
producer()
end = time.time()

print(f'直到把全部數據塞入Kafka,一共耗時:{end - start}秒')

運行效果以下圖所示。函數

使用生成器把Kafka寫入速度提升1000倍

這些文章的說法,就像統一好了口徑同樣,說這樣寫能夠減小線程切換開銷,從而大大提升程序的運行效率。可是當年我始終想不明白,這種寫法與直接調用函數有什麼區別,以下圖所示。學習

使用生成器把Kafka寫入速度提升1000倍

直到後來我須要操做Kafka的時候,我明白了使用yield的好處。線程

探索設計


爲了便於理解,我會把實際場景作一些簡化,以方便說明事件的產生髮展和解決過程。事件的原由是我須要把一些信息寫入到Kafka中,個人代碼一開始是這樣的:3d

import time
from pykafka import KafkaClient

client = KafkaClient(hosts="127.0.0.1:9092")
topic = client.topics[b'test']
def consumer(product):
    with topic.get_producer(delivery_reports=True) as producer:
        producer.produce(str(product).encode())
def feed():
    for i in range(10):
        consumer(i)

start = time.time()
feed()
end = time.time()
print(f'直到把全部數據塞入Kafka,一共耗時:{end - start}秒')

這段代碼的運行效果以下圖所示。
使用生成器把Kafka寫入速度提升1000倍code

寫入10條數據須要100秒,這樣的龜速顯然是有問題的。問題就出在這一句代碼:

with topic.get_producer(delivery_reports=True) as producer
得到Kafka生產者對象是一個很是耗費時間的過程,每獲取一次都須要10秒鐘才能完成。因此寫入10個數據就獲取十次生產者對象。這消耗的100秒主要就是在獲取生產者對象,而真正寫入數據的時間短到能夠忽略不計。

因爲生產者對象是能夠複用的,因而我對代碼做了一些修改:

import timefrom pykafka import KafkaClient

client = KafkaClient(hosts="127.0.0.1:9092")
topic = client.topics[b'test']
products = []
def consumer(product_list):
    with topic.get_producer(delivery_reports=True) as producer:        for product in product_list:
            producer.produce(str(product).encode())
def feed():
    for i in range(10):
        products.append(i)
    consumer(products)

start = time.time()
feed()
end = time.time()
print(f'直到把全部數據塞入Kafka,一共耗時:{end - start}秒')

首先把全部數據存放在一個列表中,最後再一次性給consumer函數。在一個Kafka生產者對象中展開列表,再把數據一條一條塞入Kafka。這樣因爲只須要獲取一次生產者對象,因此須要耗費的時間大大縮短,以下圖所示。
使用生成器把Kafka寫入速度提升1000倍

這種寫法在數據量小的時候是沒有問題的,但數據量一旦大起來,若是所有先放在一個列表裏面的話,服務器內存就爆了。

因而我又修改了代碼。每100條數據保存一次,並清空暫存的列表:

import timefrom pykafka import KafkaClient

client = KafkaClient(hosts="127.0.0.1:9092")
topic = client.topics[b'test']
def consumer(product_list):
    with topic.get_producer(delivery_reports=True) as producer:        for product in product_list:
            producer.produce(str(product).encode())
def feed():
    products = []
    for i in range(1003):
        products.append(i)
        if len(products) >= 100:
            consumer(products)
            products = []
    if products:
        consumer(products)

start = time.time()
feed()
end = time.time()
print(f'直到把全部數據塞入Kafka,一共耗時:{end - start}秒')

因爲最後一輪循環可能沒法湊夠100條數據,因此feed函數裏面,循環結束之後還須要判斷products列表是否爲空,若是不爲空,還要再消費一次。這樣的寫法,在上面這段代碼中,一共1003條數據,每100條數據獲取一次生產者對象,那麼須要獲取11次生產者對象,耗時至少爲110秒。

顯然,要解決這個問題,最直接的辦法就是減小獲取Kafka生產者對象的次數並最大限度複用生產者對象。若是讀者觸類旁通的能力比較強,那麼根據開關文件的兩種寫法:

# 寫法一

with open('test.txt', 'w', encoding='utf-8') as f:
    f.write('xxx')
# 寫法二

f = open('test.txt', 'w', encoding='utf-8')
f.write('xxx')
f.close()

能夠推測出獲取Kafka生產者對象的另外一種寫法:

# 寫法二

producer = topic.get_producer(delivery_reports=True)
producer.produce(b'xxxx')
producer.close()

這樣一來,只要獲取一次生產者對象並把它做爲全局變量就能夠一直使用了。

然而,pykafka的官方文檔中使用的是第一種寫法,經過上下文管理器with來得到生產者對象。暫且不論第二種方式是否會報錯,只從寫法上來講,第二種方式必須要手動關閉對象。開發者常常會出現開了忘記關的狀況,從而致使不少問題。並且若是中間出現了異常,使用上下文管理器的第一種方式會自動關閉生產者對象,但第二種方式仍然須要開發者手動關閉。

函數VS生成器


可是若是使用第一種方式,怎麼能在一個上下文裏面接收生產者傳進來的數據呢?這個時候纔是yield派上用場的時候。

首先須要明白,使用yield之後,函數就變成了一個生成器。生成器與普通函數的不一樣之處能夠經過下面兩段代碼來進行說明:

def funciton(i):
    print('進入')
    print(i)
    print('結束')
for i in range(5):
    funciton(i)

運行效果以下圖所示。
使用生成器把Kafka寫入速度提升1000倍

函數在被調用的時候,函數會從裏面的第一行代碼一直運行到某個return或者函數的最後一行纔會退出。

而生成器能夠從中間開始運行,從中間跳出。例以下面的代碼:

def generator():
    print('進入')
    i = None
    while True:
        if i is not None:
            print(i)
        print('跳出')
        i = yield None

g = generator()
next(g)
for i in range(5):
    g.send(i)

運行效果以下圖所示。
使用生成器把Kafka寫入速度提升1000倍

從圖中能夠看到,進入只打印了一次。代碼運行到i = yield None後就跳到外面,外面的數據能夠經過g.send(i)的形式傳進生成器,生成器內部拿到外面傳進來的數據之後繼續執行下一輪while循環,打印出被傳進來的內容,而後到i = yield None的時候又跳出。如此反覆。

因此回到最開始的Kafka問題。若是把with topic.get_producer(delivery_reports=True) as producer寫在上面這一段代碼的print('進入')這個位置上,那豈不是隻須要獲取一次Kafka生產者對象,而後就能夠一直使用了?

根據這個邏輯,設計以下代碼:

import timefrom pykafka import KafkaClient

client = KafkaClient(hosts="127.0.0.1:9092")
topic = client.topics[b'test']
def consumer():
    with topic.get_producer(delivery_reports=True) as producer:
        print('init finished..')
        next_data = ''
        while True:
            if next_data:
                producer.produce(str(next_data).encode())
            next_data = yield True

def feed():
    c = consumer()
    next(c)
    for i in range(1000):
        c.send(i)

start = time.time()
feed()
end = time.time()
print(f'直到把全部數據塞入Kafka,一共耗時:{end - start}秒')

這一次直接插入1000條數據,總共只須要10秒鐘,相比於每插入一次都獲取一次Kafka生產者對象的方法,效率提升了1000倍。運行效果以下圖所示。
使用生成器把Kafka寫入速度提升1000倍

後記``

讀者若是仔細對比第一段代碼和最後一段代碼,就會發現他們本質上是一回事。可是第一段代碼,也就是網上不少人講yield的時候舉的生產者-消費者的例子之因此會讓人以爲毫無用處,就在於他們的消費者幾乎就是秒運行,這樣看不出和函數調用的差異。而我最後這一段代碼,它的消費者分紅兩個部分,第一部分是獲取Kafka生產者對象,這個過程很是耗時;第二部分是把數據經過Kafka生產者對象插入Kafka,這一部分運行速度極快。在這種狀況下,使用生成器把這個消費者代碼分開,讓耗時長的部分只運行一次,讓耗時短的反覆運行,這樣就能體現出生成器的優點。

相關文章
相關標籤/搜索