前提:python
1.已經安裝好spark。個人是spark2.2.0。socket
2.已經有python環境,我這邊使用的是python3.6。測試
1、安裝py4jthis
使用pip,運行以下命令:spa
pip install py4j
使用conda,運行以下命令:命令行
conda install py4j
2、使用pycharm建立一個project。code
建立過程當中選擇python的環境。進入以後點擊Run--》Edit Configurations--》Environment variables.blog
添加PYTHONPATH和SPARK_HOME,其中PYTHONPATH爲spark安裝路徑中的python目錄,SPARK_HOME爲spark安裝目錄。ip
而後點ok,到第一個頁面點Apply,ok。element
3、點Preferences --》Project Structure--》Add Content Root
添加spark安裝路徑中python目錄下的lib裏面的py4j-0.10.4-src.zip和pyspark.zip。而後Apply,ok。
4、編寫pyspark wordcount測試一下。我這邊使用的是pyspark streaming程序。
代碼以下:
WordCount.py
from pyspark import SparkContext from pyspark.streaming import StreamingContext # Create a local StreamingContext with two working thread and batch interval of 1 second sc = SparkContext("local[2]", "NetWordCount") ssc = StreamingContext(sc, 1) # Create a DStream that will connect to hostname:port, like localhost:9999 lines = ssc.socketTextStream("localhost", 9999) # Split each line into words words = lines.flatMap(lambda line: line.split(" ")) # Count each word in each batch pairs = words.map(lambda word: (word, 1)) wordCounts = pairs.reduceByKey(lambda x, y: x + y) # Print the first ten elements of each RDD generated in this DStream to the console wordCounts.pprint() ssc.start() # Start the computation ssc.awaitTermination() # Wait for the computation to terminate
先到終端運行以下命令:
$ nc -lk 9999
接着能夠在pycharm中右鍵運行一下。而後在上面這個命令行中輸入單詞以空格分割:
我輸入以下:
a b a d d d d
而後摁回車。能夠看到pycharm中輸出以下結果:
Time: 2017-12-17 22:06:19 ------------------------------------------- ('b', 1) ('d', 4) ('a', 2)
至此,完成。