java,scala,python,spark,hadoop,local模式測試代碼,idea,jdk1.8,scla2.11,python2.7,spark-2.4.3-bin-hadoop2.7

/*scala test*/
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object WordCount {
  def main(args: Array[String]): Unit = {
    println("start...")
    /**
      * 第一步:建立Spark的配置對象SparkConf,設置Spark程序運行時的配置信息,
      * 例如說經過設置setMaster來設置程序要連接的Spark集羣的Master的URL,
      * 若是設置爲local,則表明Spark程序在本地運行。
      */
    val conf = new SparkConf //建立SparkConf對象
    conf.setAppName("wordCount") //設置應用程序的名稱,在程序運行的監控界面能夠看到名稱
    conf.setMaster("local") //此時,程序在本地運行,不須要安裝Spark集羣

    /**
      * 第二步:建立SparkContext對象
      * SparkContext是Spark程序全部功能的惟一入口,不管是採用scala、java、Python,R等都
      * 必須有一個SparkContext。SparkContext核心做用:初始化Spark應用程序運行所須要的核心組件,包括
      * DAGScheduler,TaskScheduler、SchedulerBackend同時還會負責Spark程序往Master註冊程序等。
      * SparkContext是這個Spark程序中最爲相當重要的一個對象。
      */
    val sc = new SparkContext(conf)

    /**
      * 第三步:根據具體的數據源(HDFS、HBase、Local FS、DB、S3等)經過SparkContext建立RDD。
      * RDD的建立方式有三種:根據外部的數據源(HDFS)、根據Scala集合、其餘的RDD操做。數據會被RDD劃分紅一系列的
      * Partitions,分配到每一個Partition的數據屬於一個Task的處理範疇
      */
    val lines = sc.textFile("D://data//2.txt", 1)//

    /**
      * 第四步:對初始化的RDD進行Transformation級別處理,例如map、filter等高階函數等的編程,來進行具體的數據計算。
      */
    /**
      * 4.一、對每一行的字符串拆分紅單個的單詞
      */
    val words = lines.flatMap { line => line.split(" ") } //對每一行的字符串進行單詞拆分並把全部行的拆分結果經過flat合併成爲一

    /**
      * 4.二、在單詞拆分的基礎上對每一個單詞實例計數爲1,也就是word => (word,1)
      */
    val pairs = words.map { word => (word, 1) }

    /**
      * 4.三、在每一個單詞實例計數爲1基礎之上統計每一個單詞在文件中出現的總次數
      */
    val wordCounts = pairs.reduceByKey(_+_) //對相同的key,進行value的累計

    wordCounts.foreach(map => println(map._1 +":"+ map._2))

    sc.stop()

    println("end...")
  }
}
/*python test*/
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf = conf)

lines = sc.parallelize(["pandas", "cat", "i like pandas"])
word = lines.filter(lambda s: "pandas" in s)

print(word.count())

 

/*java test*/
import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

/**
 * 用java語言開發spark程序
 * 第一個學習程序 wordcount
 * @author 18521
 *
 */
public class wordCountLocal {

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        // 1 建立一個sparkconf 對象並配置
        // 使用setMaster 能夠設置spark集羣能夠連接集羣的URL,若是設置local 表明在本地運行而不是在集羣運行
        SparkConf conf = new SparkConf()
                .setAppName("wordCountLocal")
                .setMaster("local");

        // 2 建立javasparkContext對象
        // sparkcontext 是一個入口,主要做用就是初始化spark應用程序所需的一些核心組件,例如調度器,task,
        // 還會註冊spark,sparkMaster結點上註冊。反正就是spake應用中最重要的對象
        JavaSparkContext sc = new JavaSparkContext(conf);
        // 3 對輸入源建立一個出事RDD
        // 元素就是輸入源文件中的一行
        JavaRDD<String> lines = sc.textFile("D://data/2.txt");
        // 4 把輸入源拆分紅一個一個的單詞
        // 引用一個RDD 都會建立一個function 類(比較簡單的話就是一個匿名內部類)
        // FlatMapFunction 有連個參數輸入和輸出
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

            private static final long serialVersionUID = 1L;


            public Iterable<String> call(String arg0) throws Exception {
                // TODO Auto-generated method stub
                return Arrays.asList(arg0.split(" "));
            }
        });
        // 5 須要將每個單詞映射爲(單詞,1) 後面才能夠更具單詞key 對後面value 1 進行累加從而達到計數的功能
        JavaPairRDD<String, Integer> parirs = words.mapToPair(new PairFunction<String, String, Integer>() {

            /**
             * 每個單詞都映射成(單詞,1)
             */
            private static final long serialVersionUID = 1L;


            public Tuple2<String, Integer> call(String arg0) throws Exception {
                // TODO Auto-generated method stub
                return new Tuple2<String, Integer>(arg0, 1);
            }
        });
        // 6 以單詞作爲key 統計單詞出現的次數,用reducebykey 算子,對每個key對於的value進行操做
        JavaPairRDD<String, Integer> wordcount = parirs.reduceByKey(new Function2<Integer, Integer, Integer>() {


            public Integer call(Integer arg0, Integer arg1) throws Exception {
                // TODO Auto-generated method stub
                return arg0 + arg1;
            }
        });

        // 7 已經經過spark 的幾個算子 flatMap,mapToPair,reduceByKey 已經統計出每個結點中的單詞出現的次數
        // 這中操做叫作transformation,可是在一開始的RDD是把文件拆分打散到不一樣的結點中的,因此後面還須要操做action 進行集合
        // 9 action 操做經過foreach 來遍歷全部最後一個RDD生成的元素
        wordcount.foreach(new VoidFunction<Tuple2<String, Integer>>() {

            @Override
            public void call(Tuple2<String, Integer> arg0) throws Exception {
                // TODO Auto-generated method stub
                System.out.println(arg0._1 + " 出現了:" + arg0._2 + "次");
            }
        });
        sc.close();
    }
}
相關文章
相關標籤/搜索