翻譯:《實用的Python編程》06_03_Producers_consumers

目錄 | [上一節 (6.2 自定義迭代)]() | [下一節 (6.4 生成器表達式)]()python

6.3 生產者,消費者和管道

生成器在設置各類生產者/消費者問題(producer/consumer problems)和數據流管道(pipeline)中很是有用。本節將對此進行討論。git

生產者消費者問題

生成器與各類形式的 生產者消費者 問題密切相關。github

# Producer
def follow(f):
    ...
    while True:
        ...
        yield line        # Produces value in `line` below
        ...

# Consumer
for line in follow(f):    # Consumes value from `yield` above
    ...

yield 語句生成給 for 語句消費的值。函數

生成器管道

你可使用生成器的這方面特性來設置進程管道(相似於 Unix 管道(pipe))。翻譯

producerprocessingprocessingconsumer日誌

進程管道包括初始的數據生產者、中間的處理階段、最後的消費者。code

producerprocessingprocessingconsumer進程

def producer():
    ...
    yield item
    ...

一般狀況下,生產者是一個生成器,儘管也能夠是其它的序列列表。yield 將數據輸入管道。ip

producerprocessingprocessingconsumerget

def consumer(s):
    for item in s:
        ...

消費者是一個 for 循環,獲取數據(譯註:items)並對數據執行某些操做。

producerprocessingprocessingconsumer

def processing(s):
    for item in s:
        ...
        yield newitem
        ...

中間的處理階段同時消費和生產數據。它們可能修改數據流,也可能篩選數據流(丟棄數據)。

producerprocessingprocessingconsumer

def producer():
    ...
    yield item          # yields the item that is received by the `processing`
    ...

def processing(s):
    for item in s:      # Comes from the `producer`
        ...
        yield newitem   # yields a new item
        ...

def consumer(s):
    for item in s:      # Comes from the `processing`
        ...

設置管道的代碼以下:

a = producer()
b = processing(a)
c = consumer(b)

你會發現數據逐漸地流向不一樣的函數。

練習

對於本練習,stocksim.py 程序仍須要在後臺運行。而且,你將使用到上一節練習(譯註:練習 6.7)編寫的 follow() 函數。

練習 6.8:建立一個簡單的管道

讓咱們來看看管道的思想。請建立下面這個函數:

>>> def filematch(lines, substr):
        for line in lines:
            if substr in line:
                yield line

>>>

filematch() 函數除了再也不打開文件,幾乎與上一節練習的第一個生成器示例徹底相同——僅僅對做爲參數給出的行序列進行操做。如今,請嘗試以下操做:

>>> from follow import follow
>>> lines = follow('Data/stocklog.csv')
>>> ibm = filematch(lines, 'IBM')
>>> for line in ibm:
        print(line)

... wait for output ...

雖然輸出可能須要必定時間纔會出現,可是,最後你必定會看到包含 IBM 數據的行。

練習 6.9:建立一個複雜的管道

經過執行更多操做來進一步理解管道的思想。

>>> from follow import follow
>>> import csv
>>> lines = follow('Data/stocklog.csv')
>>> rows = csv.reader(lines)
>>> for row in rows:
        print(row)

['BA', '98.35', '6/11/2007', '09:41.07', '0.16', '98.25', '98.35', '98.31', '158148']
['AA', '39.63', '6/11/2007', '09:41.07', '-0.03', '39.67', '39.63', '39.31', '270224']
['XOM', '82.45', '6/11/2007', '09:41.07', '-0.23', '82.68', '82.64', '82.41', '748062']
['PG', '62.95', '6/11/2007', '09:41.08', '-0.12', '62.80', '62.97', '62.61', '454327']
...

這很是有趣。你在這裏能夠看到, follow() 函數的輸出被傳遞到 csv.reader()函數,而且,咱們如今獲得了一系列拆分的行。

練習 6.10:建立更多管道組件

讓咱們把這樣的思想擴展到更大的管道中。首先,建立 ticker.py 文件,而後在 ticker.py 文件裏面建立一個函數,像上面同樣讀取 CSV 文件:

# ticker.py

from follow import follow
import csv

def parse_stock_data(lines):
    rows = csv.reader(lines)
    return rows

if __name__ == '__main__':
    lines = follow('Data/stocklog.csv')
    rows = parse_stock_data(lines)
    for row in rows:
        print(row)

接着,建立一個選擇特定列的新函數:

# ticker.py
...
def select_columns(rows, indices):
    for row in rows:
        yield [row[index] for index in indices]
...
def parse_stock_data(lines):
    rows = csv.reader(lines)
    rows = select_columns(rows, [0, 1, 4])
    return rows

再次運行程序,你應該能夠看到輸出縮小以下:

['BA', '98.35', '0.16']
['AA', '39.63', '-0.03']
['XOM', '82.45','-0.23']
['PG', '62.95', '-0.12']
...

再接着,建立一個生成器函數以轉換數據類型並構建字典。示例:

# ticker.py
...

def convert_types(rows, types):
    for row in rows:
        yield [func(val) for func, val in zip(types, row)]

def make_dicts(rows, headers):
    for row in rows:
        yield dict(zip(headers, row))
...
def parse_stock_data(lines):
    rows = csv.reader(lines)
    rows = select_columns(rows, [0, 1, 4])
    rows = convert_types(rows, [str, float, float])
    rows = make_dicts(rows, ['name', 'price', 'change'])
    return rows
...

再次運行程序,你應該可以看到像下面這樣的字典流:

{ 'name':'BA', 'price':98.35, 'change':0.16 }
{ 'name':'AA', 'price':39.63, 'change':-0.03 }
{ 'name':'XOM', 'price':82.45, 'change': -0.23 }
{ 'name':'PG', 'price':62.95, 'change':-0.12 }
...

練習 6.11:篩選數據

建立一個篩選數據的函數。示例:

# ticker.py
...

def filter_symbols(rows, names):
    for row in rows:
        if row['name'] in names:
            yield row

使用該函數能夠篩選出投資組合中的股票:

import report
portfolio = report.read_portfolio('Data/portfolio.csv')
rows = parse_stock_data(follow('Data/stocklog.csv'))
rows = filter_symbols(rows, portfolio)
for row in rows:
    print(row)

練習 6.12:整合全部的代碼

請在 ticker.py 文件中編寫函數 ticker(portfile, logfile, fmt) ,該函數根據給定的投資組合、日誌文件和表格格式建立實時的股票報價器。示例:

>>> from ticker import ticker
>>> ticker('Data/portfolio.csv', 'Data/stocklog.csv', 'txt')
      Name      Price     Change
---------- ---------- ----------
        GE      37.14      -0.18
      MSFT      29.96      -0.09
       CAT      78.03      -0.49
        AA      39.34      -0.32
...

>>> ticker('Data/portfolio.csv', 'Data/stocklog.csv', 'csv')
Name,Price,Change
IBM,102.79,-0.28
CAT,78.04,-0.48
AA,39.35,-0.31
CAT,78.05,-0.47
...

討論

心得體會:你能夠建立各類生成器函數,並把它們連接在一塊兒執行涉及數據流的管道處理。另外,你能夠建立一個函數,把一系列的管道階段打包到一個單獨的函數中調用(例如 parse_stock_data() 函數)。

目錄 | [上一節 (6.2 自定義迭代)]() | [下一節 (6.4 生成器表達式)]()

注:完整翻譯見 https://github.com/codists/practical-python-zh

相關文章
相關標籤/搜索