ipyparallel WordCount實現

          i pyparallel 之中,能夠利用多個engine同時運行一個任務來加快處理的速度。在ipyparallel之中,集羣被抽象爲view,包括direct_view和balanced_view。其中,direct_view是全部的engine的抽象,固然也能夠自行指定由哪些engine構成,而balanced_view是多個engine通過負載均衡以後,抽象出來的由「單一」engine構成的view。利用ipyparallel並行化的基本思路是將要處理的數據首先進行切分,而後分佈到每個engine上,而後將最終的處理結果合併,獲得最終的結果,其思路和mapreduce相似。
         下面是一個ipyparallel的並行化wordcount實現,主要思路是:首先讀取文件中的句子。利用dview的scatter方法將全部的句子切分紅n塊發送到每個engine上,正好每個engine一個。而後在每個engine上對切分以後的句子統計詞頻,最後歸併全部engine處理以後的結果。
#!/usr/bin/env python
# coding: utf-8

import time

from itertools import repeat
from ipyparallel import Client, Reference
from urllib import urlretrieve
#對text進行wordcount處理
def wordfreq(text):
    """Return a dictionary of words and word counts in a string."""
    freqs = {}
    for word in text.split():
        lword = word.lower()
        freqs[lword] = freqs.get(lword, 0) + 1
    return freqs
#輸出詞頻前n個的單詞以及其出現的次數
def print_wordfreq(freqs, n=10):
    """Print the n most common words and counts in the freqs dict."""

    words, counts = freqs.keys(), freqs.values()
    items = zip(counts, words)
    items.sort(reverse=True)
    for (count, word) in items[:n]:
        print(word, count)

#自行實現的並行版本的word_freq,對若干行句子進行處理,返回詞,出現次數 鍵值對
def myword_freq(texts):
    freqs = {}
    for str in texts:
        for word in str.split():
            lword = word.lower()
            freqs[lword] = freqs.get(lword, 0) + 1
    return freqs
#自行實現的並行版本的wordfreq,首先將texts[]分散傳送至每個engine,而後在每個engine上執行程序myword_freq,返回求出的詞 詞頻鍵值對
def myPwordfreq(view,lines):
    #將文本平均分佈在每個engine上
    view.scatter('texts',lines,flatten=True)
    ar=view.apply(myword_freq,Reference('texts'))
    freqs_list=ar.get()
    #歸併最終的處理結果 reduce it!
    word_set=set()
    for f in freqs_list:
        word_set.update(f.keys())
    freqs=dict(zip(word_set,repeat(0)))
    for f in freqs_list:
        for word,count in f.items():
            freqs[word]+=count
    return freqs

if __name__ == '__main__':
    # Create a Client and View
    rc = Client()

    dview = rc[:]
    # Run the serial version
    print("Serial word frequency count:")
    text = open('lines.txt').read()
    tic = time.time()
    freqs = wordfreq(text)
    toc = time.time()
    print_wordfreq(freqs, 10)
    print("Took %.3f s to calculate"%(toc-tic))
    # The parallel version
    print("\nParallel word frequency count:")
    lines=text.splitlines()
    tic=time.time()
    pfreqs=myPwordfreq(dview,lines)
    toc=time.time()
    print_wordfreq(pfreqs)
    print("Took %.3f s to calculate"%(toc-tic))
相關文章
相關標籤/搜索