windows 安裝 spark 及 PyCharm IDEA 調試 TopN 實例

首先聲明本文搭建的環境爲:windows8.1 + spark1.6.0 + python2.7 + jdk8,spark on windows 對 windows及python版本不怎麼挑,可是對 spark 版本要求極其苛刻,好比 spark1.6.1 就沒法運行。html

一、安裝 jdk

安裝spark第一步就是安裝jdk(不管windows仍是linux),spark執行依賴jdk。在oracle官網上下載jdk,這裏我選擇的是8u74 windows x64版本,你也能夠根據本身的需求下載,jdk的安裝在此不表,無非就是下一步,選安裝路徑什麼的。java

關於具體的 jdk path 怎麼設置能夠參考 hadoop on windows 這個系列,在此再也不贅述:python

http://my.oschina.net/leejun2005/blog?catalog=3609138mysql

二、安裝spark

在Apache Spark™官方網站下載spark,選擇spark-1.6.0-bin-hadoop2.6.tgz。linux

添加spark環境變量,在PATH後面追加:
%SPARK_HOME%\bin
%SPARK_HOME%\sbingit

windows 環境下的spark搭建完畢!!!github

注意此處有坑:sql

Failed to locate the winutils binary in the hadoop binary path java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.shell

spark雖然支持standalone模式,而且不依賴hadoop。可是在windows環境下仍是須要hadoop的這個winutils.exe。所以須要下載2.6版本匹配的winutils.exe. 能夠google "hadoop.dll 2.6" 或在此下載(hadoop dll  winutils.exe,GitHub各類版本都有), 將下載後的文件覆蓋至hadoop的bin目錄(沒有的話須要建個目錄,並設置相應hadoop環境HADOOP_HOME及PATH環境變量)。apache

三、搭建 pyspark 開發環境

spark支持scala、python和java,因爲對python的好感多於scala,所以開發環境是Python。
下面開始搭建python環境:

2.7或3.5都可,安裝過程在此不表,安裝完成後在環境變量裏添加PYTHONPATH,這一步很重要:

若是配置正確,打開python自帶的IDE,輸入如下代碼,而後等待鏈接成功的消息便可:

from pyspark import SparkConf, SparkContext 
conf = SparkConf().setMaster("local").setAppName("MY First App") 
sc = SparkContext(conf = conf)

也能夠手動啓動測試下:

spark-class.cmd org.apache.spark.deploy.master.Master
spark-class.cmd org.apache.spark.deploy.worker.Worker spark://localhost:7077

四、SPARK分析CSDN密碼庫經常使用密碼 TOP10

# coding=utf-8
# 測試utf-8編碼
from __future__ import division
import decimal
from pyspark import SparkConf, SparkContext, StorageLevel

import sys
reload(sys)
sys.setdefaultencoding('utf-8')

conf = SparkConf().setMaster("local").setAppName("CSDN_PASSWD_Top10")
sc = SparkContext(conf=conf)

file_rdd = sc.textFile("H:\mysql\csdn_database\www.csdn.net.sql")
passwds = file_rdd.map(lambda line: line.split("#")[1].strip()).map(lambda passwd: (passwd, 1)).persist(
    storageLevel=StorageLevel.MEMORY_AND_DISK_SER)
passwd_nums = passwds.count()
top10_passwd = passwds.reduceByKey(lambda a, b: a + b).sortBy(lambda item: item[1], ascending=False).take(10)
for item in top10_passwd:
    print item[0] + "\t" + str(item[1]) + "\t" + str(round(decimal.Decimal(item[1] / passwd_nums), 4) * 100) + "%"


# 123456789       235037  3.66%
# 12345678        212761  3.31%
# 11111111        76348   1.19%
# dearbook        46053   0.72%
# 00000000        34953   0.54%
# 123123123       20010   0.31%
# 1234567890      17794   0.28%
# 88888888        15033   0.23%
# 111111111       6995    0.11%
# 147258369       5966    0.09%

# 最後來驗證一下數據:
# 數據量:650w 記錄,pyspark 耗時 3分54秒,機器配置:i5 cpu,4G MEM,250G SSD,win8.1 操做系統,python 2.7.11
# awk -F"#" 'a[$2]++{for(i in a)print i"\t"a[i]}' www.csdn.net.sql|sort -k2nr|head -10
# cygwin 下性能太差,等待半小時無結果,Kill
# grep -F '# 123456789 #' www.csdn.net.sql|wc -l
# 235012
# awk -F'#' '{print $2}' www.csdn.net.sql|grep -P '\s+123456789\s+'|wc -l
# 235033
# awk -F'#' '{print $2}' www.csdn.net.sql|grep -E '^123456789$'|wc -l
# 0
# awk -F'#' '{print $2}' www.csdn.net.sql|grep -E ' 123456789$'|wc -l
# 5
# awk -F'#' '{print $2}' www.csdn.net.sql|grep -E '^123456789 '|wc -l
# 0

五、Scala-Shell 版本

代碼以下:

C:\Users\username>spark-shell

scala> val textFile = spark.read.textFile("C:\\Users\\username\\Desktop\\parse_slow_log.py")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]

scala> textFile.count()
res0: Long = 156

scala> textFile.first()
res1: String = # encoding: utf-8

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]

scala> textFile.filter(line => line.contains("Spark")).count()
res2: Long = 0

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res3: Int = 27

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]

scala> wordCounts.collect()
res4: Array[(String, Long)] = Array((self.slowlog,1), (import,3), (False,,1), (file_name,,1), (flag_word,3), (MySQL,1), (else,1), (*,2), (slowlog,1), (default=script_path),1), (0,4), ("",2), (-d,1), (__auther,1), (for,5...
scala>

六、基於idea 和maven 的 spark開發環境搭建

完整教程:http://www.javashuo.com/article/p-cpmewjcd-dt.html

Intellij Idea搭建Spark開發環境:http://www.javashuo.com/article/p-scerltda-nb.html

Refer:

[1] Spark 入門(Python、Scala 版)

http://my.oschina.net/leejun2005/blog/411605

[2] Spark Streaming Programming Guide

http://spark.apache.org/docs/latest/streaming-programming-guide.html

相關文章
相關標籤/搜索