Python技術棧與Spark大數據平臺整合實戰--大數據ML樣本集案例實戰

版權聲明:本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。QQ郵箱地址:1120746959@qq.com,若有任何學術交流,可隨時聯繫。java

1 Python技術棧與Spark大數據數據平臺整合

  • 下載Anaconda3 Linux版本python

    Anaconda3-5.3.1-Linux-x86_64.sh
    複製代碼
  • 安裝Anaconda3apache

    bash Anaconda3-5.3.1-Linux-x86_64.sh -b 
    複製代碼
  • 環境變量配置PYSPARK_DRIVER_PYTHON以及PYSPARK_PYTHON配置瀏覽器

    export SCALA_HOME=/usr/local/install/scala-2.11.8
      export JAVA_HOME=/usr/lib/java/jdk1.8.0_45
      export HADOOP_HOME=/usr/local/install/hadoop-2.7.3
      export SPARK_HOME=/usr/local/install/spark-2.3.0-bin-hadoop2.7
      export FLINK_HOME=/usr/local/install/flink-1.6.1
      
      export ANACONDA_PATH=/root/anaconda3
      export PYSPARK_DRIVER_PYTHON=$ANACONDA_PATH/bin/ipython
      export PYSPARK_PYTHON=$ANACONDA_PATH/bin/python
      
      
      export JRE_HOME=${JAVA_HOME}/jre
      export CLASS_PATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
      export PATH=:${JAVA_HOME}/bin:${SCALA_HOME}/bin:${HADOOP_HOME}/bin:${SPARK_HOME}/bin:$PATH
      export PATH=/root/anaconda3/bin:$PATH
    複製代碼
  • 啓動Saprkbash

  • 啓動jupyter notebook服務器

    老版本
      PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook --allow-root" pyspark
      
      將來版本
      PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS=`jupyter notebook --allow-root` pyspark
    複製代碼

  • jupyter遠程訪問app

    jupyter notebook --generate-config
    
      vi ~/.jupyter/jupyter_notebook_config.py
      c.NotebookApp.ip = '*' # 容許訪問此服務器的 IP,星號表示任意 IP
      c.NotebookApp.open_browser = False # 運行時不打開本機瀏覽器
      c.NotebookApp.port = 12035 # 使用的端口,隨意設置
      c.NotebookApp.enable_mathjax = True # 啓用 MathJax
      c.NotebookApp.allow_remote_access = True
    複製代碼
  • jupyter NoteBook開發界面dom

  • spark程序調試

lines=sc.textFile("/LICENSE")
    pairs = lines.map(lambda s: (s, 1))
    counts = pairs.reduceByKey(lambda a, b: a + b)
    
    counts.count()
    243
    
    counts.first()
    ('                                 Apache License', 1)
複製代碼
  • Standalone模式啓動函數

    PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook --allow-root" MASTER=spark://SparkMaster:7077 pyspark
    複製代碼

2 Spark轉換運算

2.1 scala 操做

val intRDD=sc.parallelize(List(1,2,3))
    intRDD.collect
    Array[Int] = Array(1, 2, 3)
複製代碼

2.2 python 操做

  • python基礎RDD操做oop

    #parallelize
      intRDD=sc.parallelize([1,2,3])
      intRDD.collect()
      [1, 2, 3]
      
      StringRDD=sc.parallelize(["Apple","Orange"])
      StringRDD.collect()
      ['Apple', 'Orange']
      
      #具名函數
      def addOne(x):
          return x+1
      intRDD.map(addOne).collect()
     
     #匿名函數
     intRDD=sc.parallelize([1,2,3])
     intRDD.map(lambda x:x+1).collect() 
     [2, 3, 4]
     
     #過濾器
     intRDD.filter(lambda x:1< x and x<5).collect()
     [2, 3]
     
     #in
     stringRDD =sc.parallelize(["apple","blue"])
     stringRDD.filter(lambda x:"apple" in x).collect()
     ['apple']
     
     #distinct
     intRDD=sc.parallelize([1,2,3,2,7])
     intRDD.distinct().collect()
     [1, 2, 3, 7]
     
     #randomSplit
     sRDD=intRDD.randomSplit([0.4,0.6])
     sRDD[0].collect()
     [1, 2]
     
     #groupBy
     group=intRDD.groupBy(lambda x:"even" if(x%2==0) else "odd").collect()
     print(group)
     
     [('odd', <pyspark.resultiterable.ResultIterable object at 0x7f2186897978>), ('even', <pyspark.resultiterable.ResultIterable object at 0x7f21868978d0>)] 
    
     print (sorted(group[0][1]))
     [1, 3, 7]
    
     print (sorted(group[1][1]))
     [2, 2]
    複製代碼
  • python多個RDD轉換操做

    intRDD1=sc.parallelize(["apple","blue"])
     intRDD2=sc.parallelize([1,2])
     intRDD3=sc.parallelize(["apple","blue"])
     
     #合併運算
     intRDD1.union(intRDD2).union(intRDD3).collect()
     
     ['apple', 'blue', 1, 2, 'apple', 'blue']
    
     #交集運算
     intRDD1=sc.parallelize([3,1,2,5,5])
     intRDD2=sc.parallelize([5,6])
     intRDD3=sc.parallelize([2,7])
     intRDD1.intersection(intRDD2).collect()
     [5]
     
     intRDD1=sc.parallelize([3,1,2,5,5])
     intRDD2=sc.parallelize([5,6])
     intRDD3=sc.parallelize([2,7])
     intRDD1.subtract(intRDD2).collect()
     [2, 3, 1]
     
     intRDD1.first()
     intRDD1.take(3)
     
     intRDD1.takeOrdered(3)
     [1, 2, 3]
     
     intRDD1.takeOrdered(3,lambda x:-x)
     [5, 5, 3]
    複製代碼
  • Python RDD基於Key-Value轉換

    kvRDD1=sc.parallelize([(3,4),[3,6],[5,6],[1,2]])
      kvRDD1.collect()
      [(3, 4), [3, 6], [5, 6], [1, 2]]
      
      kvRDD1.keys().collect()
      [3, 3, 5, 1]
      
      kvRDD1.values().collect()
      [4, 6, 6, 2]
      
      kvRDD1.filter(lambda keyvalue :keyvalue[0]<5).collect()
      [(3, 4), [3, 6], [1, 2]]
      
      kvRDD1.mapValues(lambda x:x*x).collect()
      [(3, 16), (3, 36), (5, 36), (1, 4)]
      
      kvRDD1.sortByKey(ascending=False).collect()
      [[1, 2], (3, 4), [3, 6], [5, 6]]
      
      kvRDD1.reduceByKey(lambda x,y:x+y).collect()
      [(3, 10), (5, 6), (1, 2)]
    複製代碼
  • Python 多個RDD 轉換操做

    #join
      kvRDD1=sc.parallelize([(3,4),[3,6],[5,6],[1,2]])
      kvRDD2=sc.parallelize([(3,8)])  
      kvRDD1.join(kvRDD2).collect() 
      
      [(3, (4, 8)), (3, (6, 8))]
      
      #左鏈接
      kvRDD1.leftOuterJoin(kvRDD2).collect()
      [(3, (4, 8)), (3, (6, 8)), (5, (6, None)), (1, (2, None))]
      
      #右鏈接
      kvRDD1.rightOuterJoin(kvRDD2).collect()
      [(3, (4, 8)), (3, (6, 8))]
      
      #去除掉相同的key
      kvRDD1.subtractByKey(kvRDD2).collect()
      [(5, 6), (1, 2)]
      
      kvRDD1.countByKey()
      defaultdict(int, {3: 2, 5: 1, 1: 1})
      
      
      #建立字典,對於Key=3的以value=6爲輸出
      KV1=kvRDD1.collectAsMap()
      {3: 6, 5: 6, 1: 2}
      KV1[3]
      6
      
      kvRDD1.lookup(3)
      [4, 6]
    複製代碼
  • Python 的廣播變量

    kvFruit = sc.parallelize([(1,"apple"),(2,"orange"),(3,"banana"),(4,"grape")])
      FruitMap=kvFruit.collectAsMap()
      
      print(FruitMap)
      
      #廣播
      broadcastFruitMap=sc.broadcast(FruitMap)
      print(broadcastFruitMap.value)
      {1: 'apple', 2: 'orange', 3: 'banana', 4: 'grape'}
    
      #取出廣播
      fruitIds =sc.parallelize([2,4,3,1])
      fruitNames =fruitIds.map(lambda x:broadcastFruitMap.value[x]).collect()
      print ("水果名稱" +str(fruitNames))
      
      水果名稱['orange', 'grape', 'banana', 'apple']
    複製代碼
  • Python 的累加器

    intRDD=sc.parallelize([1,2,3])
      
      total=sc.accumulator(0.0)
      num=sc.accumulator(0)
      
      intRDD.foreach(lambda i:[total.add(i),num.add(1)])
      avg=total.value/num.value
      print (str(total.value )+" "+ str(num.value) + " "+ str(avg))
      
      6.0 3 2.0
    複製代碼
  • Python持久化操做

    intRDD=sc.parallelize([1,2,3])
      intRDD.persist()
      
      intRDD.is_cached
      
       #沒有執行成功
      intRDD.persist(StorageLevel.MEMORY_AND_DISK)
    複製代碼
  • python 綜合案例

    textFile=sc.textFile("/LICENSE")
      
      stringRDD = textFile.flatMap(lambda line:line.split(" ")).map(lambda x: (x,1)).reduceByKey(lambda x,y:x+y)
      print(stringRDD.take(10))
      stringRDD.saveAsTextFile("/pythonWordCount")
      
      [('', 1445), ('Apache', 6), ('License', 9), ('Version', 2), ('2.0,', 1), ('January', 1), ('2004', 1), ('http://www.apache.org/licenses/', 1), ('TERMS', 2), ('AND', 3)]
    複製代碼

3 總結

經過Python技術棧與Spark大數據數據平臺整合,咱們將實現python生態最完善的計算和可視化體系。

秦凱新 於深圳 201812132319

相關文章
相關標籤/搜索