python wordCount

1、

如下環境在pycharm中運行java

確保jdk,hadoop,spark,scala 軟件在電腦端安裝完畢,環境搭建python

pycharm 環境配置es6

 環境變量配置:apache

PYTHONUNBUFFERED=1;  -u 不緩衝stdin、stdout和stderr 默認是緩衝的。同PYTHONUNBUFFERED=1 不用buffer的意思數組

PYTHONPATH=C:\spark-2.2.0-bin-hadoop2.7\python;jvm

SPARK_HOME=C:\spark-2.2.0-bin-hadoop2.7\分佈式

from pyspark import SparkContext,SparkConf

conf = SparkConf().setMaster("local").setAppName("liujinjie")
sc = SparkContext(conf=conf)
textFile = sc.textFile("D:\hello.txt")
wordCount = textFile.flatMap(lambda line: line.split(",")).map(lambda word:(word,1)).reduceByKey(lambda a,b :a + b)
wordCount.foreach(print)

 其中flatMap,Map,reduceByKey 含義講解 link函數

 2、oop

scala 代碼spa

collect: 收集一個彈性分佈式數據集的全部元素到一個數組中,這樣便於咱們觀察,畢竟分佈式數據集比較抽象。Spark的collect方法,是Action類型的一個算子,會從遠程集羣拉取數據到driver端。最後,將大量數據
 聚集到一個driver節點上,將數據用數組存放,佔用了jvm堆內存,很是用意形成內存溢出,只用做小型數據的觀察。
    val arr = res.collect();

import org.apache.spark.{SparkContext, SparkConf}
object WordCount {
  def main(args: Array[String]) {
    /*
    建立Spark配置對象SparkConf,設置Spark程序運行時的配置信息,
    SetMaster 來設置程序要連接的Spark集羣Master的URL,若是設定爲local 則表明在本地運行,
    setAppName 設定應用程序名稱,在程序運行監控界面可看到名稱
    */
  val conf = new SparkConf().setMaster("local").setAppName("testRdd")
  val sc = new SparkContext(conf) // 等價與  val sc = new SparkContext("local","testRdd")
    /**
    * 建立SparkContext對象,SparkContext是全部程序功能的惟一入口,不管用Scala、Java、Python、R等都必需要有個SparkContext
    * SparkContext核心做用是初始化spark應用程序所需核心組件,包含DAGScheduler,TaskScheduler,SchedulerBackend
    * 還負責 spark 程序往master 註冊程序等, 是整個應用程序最重要的一個對象, 經過傳入conf  定製spark運行的具體參數跟配置信息
    */
    /**
      * 3.根據具體數據的來源(HDFS,HBase,Local,FS,DB,S3等)經過SparkContext來建立RDD;
      * RDD的建立基本有三種方式:根據外部的數據來源(例如HDFS)、根據Scala集合、由其餘的RDD操做;
      * 數據會被RDD劃分紅爲一系列的Partitions,分配到每一個Partition的數據屬於一個Task的處理範疇;
      */
    /**
      * 4.對初始的RDD進行Transformation級別的處理,例如map,filter等高階函數的變成,來進行具體的數據計算
      * 4.1.將每一行的字符串拆分紅單個單詞
      */
    //對每一行的字符串進行拆分並把全部行的拆分結果經過flat合併成一個大的集合
    val lines = sc.textFile("d://hello.txt")
    val words = lines.flatMap(_.split(","))
    val pairs = words.map{word =>(word,1)}
    val wordCounts = pairs.reduceByKey(_ + _)
    wordCounts.foreach(pair => println(pair._1 + ":" + pair._2))
    sc.stop()

//  data.flatMap(_.split(","))//下劃線是佔位符,flatMap是對行操做的方法,對讀入的數據進行分割
//    .collect()//將分佈式的RDD返回一個單機的scala array,在這個數組上運用scala的函數操做,並返回結果到驅動程序
//    .foreach(println)//循環打印
  }
}
scala> List(1, 2, 3, 4, 5, 6) collect { case i if i % 2 == 0 => i * i }
res0: List[Int] = List(4, 16, 36)


scala> List(1, 2, 3, 4, 5, 6) collect { case i if i % 2 == 0 => i+1 }
res1: List[Int] = List(3, 5, 7)


scala> List(1, 2, 3, 4, 5, 6) collect { case i  => i+1 }
res2: List[Int] = List(2, 3, 4, 5, 6, 7)


scala> List(1, 2, 3, 4, 5, 6) collect { case i  =>(i, i+1 )}
res3: List[(Int, Int)] = List((1,2), (2,3), (3,4), (4,5), (5,6), (6,7))


scala> List(1, 2, 7, 4, 9, 6) collect { case i  =>(i, i+1 )}
res4: List[(Int, Int)] = List((1,2), (2,3), (7,8), (4,5), (9,10), (6,7))


scala> List(1, 2, 3, 4, 5, 6) collect { case i if (i=i+1;i)=> i * i }
<console>:1: error: ')' expected but ';' found.
       List(1, 2, 3, 4, 5, 6) collect { case i if (i=i+1;i)=> i * i }
                                                        ^
<console>:1: error: ';' expected but ')' found.
       List(1, 2, 3, 4, 5, 6) collect { case i if (i=i+1;i)=> i * i }
                                                          ^

scala> List(1, 2, 3, 4, 5, 6) collect { case i if (i=i+1;true)=> i * i }
<console>:1: error: ')' expected but ';' found.
       List(1, 2, 3, 4, 5, 6) collect { case i if (i=i+1;true)=> i * i }
                                                        ^
<console>:1: error: ';' expected but ')' found.
       List(1, 2, 3, 4, 5, 6) collect { case i if (i=i+1;true)=> i * i }
                                                             ^

scala> List(1, 2, 3, 4, 5, 6) collect { case i if {i=i+1;true}=> i * i }
<console>:8: error: reassignment to val
              List(1, 2, 3, 4, 5, 6) collect { case i if {i=i+1;true}=> i * i }
                                                           ^
scala> List(1, 2, 3, 4, 5, 6) collect { case i if {i+1;true}=> i * i }
res6: List[Int] = List(1, 4, 9, 16, 25, 36)
相關文章
相關標籤/搜索