版權聲明:本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。QQ郵箱地址:1120746959@qq.com,若有任何學術交流,可隨時聯繫。java
下載Anaconda3 Linux版本python
Anaconda3-5.3.1-Linux-x86_64.sh
複製代碼
安裝Anaconda3apache
bash Anaconda3-5.3.1-Linux-x86_64.sh -b
複製代碼
環境變量配置PYSPARK_DRIVER_PYTHON以及PYSPARK_PYTHON配置瀏覽器
export SCALA_HOME=/usr/local/install/scala-2.11.8
export JAVA_HOME=/usr/lib/java/jdk1.8.0_45
export HADOOP_HOME=/usr/local/install/hadoop-2.7.3
export SPARK_HOME=/usr/local/install/spark-2.3.0-bin-hadoop2.7
export FLINK_HOME=/usr/local/install/flink-1.6.1
export ANACONDA_PATH=/root/anaconda3
export PYSPARK_DRIVER_PYTHON=$ANACONDA_PATH/bin/ipython
export PYSPARK_PYTHON=$ANACONDA_PATH/bin/python
export JRE_HOME=${JAVA_HOME}/jre
export CLASS_PATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=:${JAVA_HOME}/bin:${SCALA_HOME}/bin:${HADOOP_HOME}/bin:${SPARK_HOME}/bin:$PATH
export PATH=/root/anaconda3/bin:$PATH
複製代碼
啓動Saprkbash
啓動jupyter notebook服務器
老版本
PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook --allow-root" pyspark
將來版本
PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS=`jupyter notebook --allow-root` pyspark
複製代碼
jupyter遠程訪問app
jupyter notebook --generate-config
vi ~/.jupyter/jupyter_notebook_config.py
c.NotebookApp.ip = '*' # 容許訪問此服務器的 IP,星號表示任意 IP
c.NotebookApp.open_browser = False # 運行時不打開本機瀏覽器
c.NotebookApp.port = 12035 # 使用的端口,隨意設置
c.NotebookApp.enable_mathjax = True # 啓用 MathJax
c.NotebookApp.allow_remote_access = True
複製代碼
jupyter NoteBook開發界面dom
lines=sc.textFile("/LICENSE")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)
counts.count()
243
counts.first()
(' Apache License', 1)
複製代碼
Standalone模式啓動函數
PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook --allow-root" MASTER=spark://SparkMaster:7077 pyspark
複製代碼
val intRDD=sc.parallelize(List(1,2,3))
intRDD.collect
Array[Int] = Array(1, 2, 3)
複製代碼
python基礎RDD操做oop
#parallelize
intRDD=sc.parallelize([1,2,3])
intRDD.collect()
[1, 2, 3]
StringRDD=sc.parallelize(["Apple","Orange"])
StringRDD.collect()
['Apple', 'Orange']
#具名函數
def addOne(x):
return x+1
intRDD.map(addOne).collect()
#匿名函數
intRDD=sc.parallelize([1,2,3])
intRDD.map(lambda x:x+1).collect()
[2, 3, 4]
#過濾器
intRDD.filter(lambda x:1< x and x<5).collect()
[2, 3]
#in
stringRDD =sc.parallelize(["apple","blue"])
stringRDD.filter(lambda x:"apple" in x).collect()
['apple']
#distinct
intRDD=sc.parallelize([1,2,3,2,7])
intRDD.distinct().collect()
[1, 2, 3, 7]
#randomSplit
sRDD=intRDD.randomSplit([0.4,0.6])
sRDD[0].collect()
[1, 2]
#groupBy
group=intRDD.groupBy(lambda x:"even" if(x%2==0) else "odd").collect()
print(group)
[('odd', <pyspark.resultiterable.ResultIterable object at 0x7f2186897978>), ('even', <pyspark.resultiterable.ResultIterable object at 0x7f21868978d0>)]
print (sorted(group[0][1]))
[1, 3, 7]
print (sorted(group[1][1]))
[2, 2]
複製代碼
python多個RDD轉換操做
intRDD1=sc.parallelize(["apple","blue"])
intRDD2=sc.parallelize([1,2])
intRDD3=sc.parallelize(["apple","blue"])
#合併運算
intRDD1.union(intRDD2).union(intRDD3).collect()
['apple', 'blue', 1, 2, 'apple', 'blue']
#交集運算
intRDD1=sc.parallelize([3,1,2,5,5])
intRDD2=sc.parallelize([5,6])
intRDD3=sc.parallelize([2,7])
intRDD1.intersection(intRDD2).collect()
[5]
intRDD1=sc.parallelize([3,1,2,5,5])
intRDD2=sc.parallelize([5,6])
intRDD3=sc.parallelize([2,7])
intRDD1.subtract(intRDD2).collect()
[2, 3, 1]
intRDD1.first()
intRDD1.take(3)
intRDD1.takeOrdered(3)
[1, 2, 3]
intRDD1.takeOrdered(3,lambda x:-x)
[5, 5, 3]
複製代碼
Python RDD基於Key-Value轉換
kvRDD1=sc.parallelize([(3,4),[3,6],[5,6],[1,2]])
kvRDD1.collect()
[(3, 4), [3, 6], [5, 6], [1, 2]]
kvRDD1.keys().collect()
[3, 3, 5, 1]
kvRDD1.values().collect()
[4, 6, 6, 2]
kvRDD1.filter(lambda keyvalue :keyvalue[0]<5).collect()
[(3, 4), [3, 6], [1, 2]]
kvRDD1.mapValues(lambda x:x*x).collect()
[(3, 16), (3, 36), (5, 36), (1, 4)]
kvRDD1.sortByKey(ascending=False).collect()
[[1, 2], (3, 4), [3, 6], [5, 6]]
kvRDD1.reduceByKey(lambda x,y:x+y).collect()
[(3, 10), (5, 6), (1, 2)]
複製代碼
Python 多個RDD 轉換操做
#join
kvRDD1=sc.parallelize([(3,4),[3,6],[5,6],[1,2]])
kvRDD2=sc.parallelize([(3,8)])
kvRDD1.join(kvRDD2).collect()
[(3, (4, 8)), (3, (6, 8))]
#左鏈接
kvRDD1.leftOuterJoin(kvRDD2).collect()
[(3, (4, 8)), (3, (6, 8)), (5, (6, None)), (1, (2, None))]
#右鏈接
kvRDD1.rightOuterJoin(kvRDD2).collect()
[(3, (4, 8)), (3, (6, 8))]
#去除掉相同的key
kvRDD1.subtractByKey(kvRDD2).collect()
[(5, 6), (1, 2)]
kvRDD1.countByKey()
defaultdict(int, {3: 2, 5: 1, 1: 1})
#建立字典,對於Key=3的以value=6爲輸出
KV1=kvRDD1.collectAsMap()
{3: 6, 5: 6, 1: 2}
KV1[3]
6
kvRDD1.lookup(3)
[4, 6]
複製代碼
Python 的廣播變量
kvFruit = sc.parallelize([(1,"apple"),(2,"orange"),(3,"banana"),(4,"grape")])
FruitMap=kvFruit.collectAsMap()
print(FruitMap)
#廣播
broadcastFruitMap=sc.broadcast(FruitMap)
print(broadcastFruitMap.value)
{1: 'apple', 2: 'orange', 3: 'banana', 4: 'grape'}
#取出廣播
fruitIds =sc.parallelize([2,4,3,1])
fruitNames =fruitIds.map(lambda x:broadcastFruitMap.value[x]).collect()
print ("水果名稱" +str(fruitNames))
水果名稱['orange', 'grape', 'banana', 'apple']
複製代碼
Python 的累加器
intRDD=sc.parallelize([1,2,3])
total=sc.accumulator(0.0)
num=sc.accumulator(0)
intRDD.foreach(lambda i:[total.add(i),num.add(1)])
avg=total.value/num.value
print (str(total.value )+" "+ str(num.value) + " "+ str(avg))
6.0 3 2.0
複製代碼
Python持久化操做
intRDD=sc.parallelize([1,2,3])
intRDD.persist()
intRDD.is_cached
#沒有執行成功
intRDD.persist(StorageLevel.MEMORY_AND_DISK)
複製代碼
python 綜合案例
textFile=sc.textFile("/LICENSE")
stringRDD = textFile.flatMap(lambda line:line.split(" ")).map(lambda x: (x,1)).reduceByKey(lambda x,y:x+y)
print(stringRDD.take(10))
stringRDD.saveAsTextFile("/pythonWordCount")
[('', 1445), ('Apache', 6), ('License', 9), ('Version', 2), ('2.0,', 1), ('January', 1), ('2004', 1), ('http://www.apache.org/licenses/', 1), ('TERMS', 2), ('AND', 3)]
複製代碼
經過Python技術棧與Spark大數據數據平臺整合,咱們將實現python生態最完善的計算和可視化體系。
秦凱新 於深圳 201812132319