SparkStreaming測試

sparkStream本地測試python

1.  nc -lk 9999 啓動服務端, 而後啓動 network_wordcount.py,終端輸出每秒的streaming數據流bash

2. 在服務端輸入數據, 客戶端就能顯示結果app

 

完整代碼以下socket

from __future__ import print_function

import sys 

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
        sys.exit(-1)
    sc = SparkContext(appName="PythonStreamingNetworkWordCount")
    ssc = StreamingContext(sc, 1)

    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
    counts = lines.flatMap(lambda line: line.split(" "))\
                  .map(lambda word: (word, 1))\
                  .reduceByKey(lambda a, b: a+b)
    print("log test")

    counts.pprint()
    ssc.start()
    ssc.awaitTermination()
相關文章
相關標籤/搜索