/*scala test*/ import org.apache.spark.SparkConf import org.apache.spark.SparkContext object WordCount { def main(args: Array[String]): Unit = { println("start...") /** * 第一步:建立Spark的配置對象SparkConf,設置Spark程序運行時的配置信息, * 例如說經過設置setMaster來設置程序要連接的Spark集羣的Master的URL, * 若是設置爲local,則表明Spark程序在本地運行。 */ val conf = new SparkConf //建立SparkConf對象 conf.setAppName("wordCount") //設置應用程序的名稱,在程序運行的監控界面能夠看到名稱 conf.setMaster("local") //此時,程序在本地運行,不須要安裝Spark集羣 /** * 第二步:建立SparkContext對象 * SparkContext是Spark程序全部功能的惟一入口,不管是採用scala、java、Python,R等都 * 必須有一個SparkContext。SparkContext核心做用:初始化Spark應用程序運行所須要的核心組件,包括 * DAGScheduler,TaskScheduler、SchedulerBackend同時還會負責Spark程序往Master註冊程序等。 * SparkContext是這個Spark程序中最爲相當重要的一個對象。 */ val sc = new SparkContext(conf) /** * 第三步:根據具體的數據源(HDFS、HBase、Local FS、DB、S3等)經過SparkContext建立RDD。 * RDD的建立方式有三種:根據外部的數據源(HDFS)、根據Scala集合、其餘的RDD操做。數據會被RDD劃分紅一系列的 * Partitions,分配到每一個Partition的數據屬於一個Task的處理範疇 */ val lines = sc.textFile("D://data//2.txt", 1)// /** * 第四步:對初始化的RDD進行Transformation級別處理,例如map、filter等高階函數等的編程,來進行具體的數據計算。 */ /** * 4.一、對每一行的字符串拆分紅單個的單詞 */ val words = lines.flatMap { line => line.split(" ") } //對每一行的字符串進行單詞拆分並把全部行的拆分結果經過flat合併成爲一 /** * 4.二、在單詞拆分的基礎上對每一個單詞實例計數爲1,也就是word => (word,1) */ val pairs = words.map { word => (word, 1) } /** * 4.三、在每一個單詞實例計數爲1基礎之上統計每一個單詞在文件中出現的總次數 */ val wordCounts = pairs.reduceByKey(_+_) //對相同的key,進行value的累計 wordCounts.foreach(map => println(map._1 +":"+ map._2)) sc.stop() println("end...") } }
/*python test*/ from pyspark import SparkConf, SparkContext conf = SparkConf().setMaster("local").setAppName("My App") sc = SparkContext(conf = conf) lines = sc.parallelize(["pandas", "cat", "i like pandas"]) word = lines.filter(lambda s: "pandas" in s) print(word.count())
/*java test*/ import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; /** * 用java語言開發spark程序 * 第一個學習程序 wordcount * @author 18521 * */ public class wordCountLocal { public static void main(String[] args) { // TODO Auto-generated method stub // 1 建立一個sparkconf 對象並配置 // 使用setMaster 能夠設置spark集羣能夠連接集羣的URL,若是設置local 表明在本地運行而不是在集羣運行 SparkConf conf = new SparkConf() .setAppName("wordCountLocal") .setMaster("local"); // 2 建立javasparkContext對象 // sparkcontext 是一個入口,主要做用就是初始化spark應用程序所需的一些核心組件,例如調度器,task, // 還會註冊spark,sparkMaster結點上註冊。反正就是spake應用中最重要的對象 JavaSparkContext sc = new JavaSparkContext(conf); // 3 對輸入源建立一個出事RDD // 元素就是輸入源文件中的一行 JavaRDD<String> lines = sc.textFile("D://data/2.txt"); // 4 把輸入源拆分紅一個一個的單詞 // 引用一個RDD 都會建立一個function 類(比較簡單的話就是一個匿名內部類) // FlatMapFunction 有連個參數輸入和輸出 JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; public Iterable<String> call(String arg0) throws Exception { // TODO Auto-generated method stub return Arrays.asList(arg0.split(" ")); } }); // 5 須要將每個單詞映射爲(單詞,1) 後面才能夠更具單詞key 對後面value 1 進行累加從而達到計數的功能 JavaPairRDD<String, Integer> parirs = words.mapToPair(new PairFunction<String, String, Integer>() { /** * 每個單詞都映射成(單詞,1) */ private static final long serialVersionUID = 1L; public Tuple2<String, Integer> call(String arg0) throws Exception { // TODO Auto-generated method stub return new Tuple2<String, Integer>(arg0, 1); } }); // 6 以單詞作爲key 統計單詞出現的次數,用reducebykey 算子,對每個key對於的value進行操做 JavaPairRDD<String, Integer> wordcount = parirs.reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer arg0, Integer arg1) throws Exception { // TODO Auto-generated method stub return arg0 + arg1; } }); // 7 已經經過spark 的幾個算子 flatMap,mapToPair,reduceByKey 已經統計出每個結點中的單詞出現的次數 // 這中操做叫作transformation,可是在一開始的RDD是把文件拆分打散到不一樣的結點中的,因此後面還須要操做action 進行集合 // 9 action 操做經過foreach 來遍歷全部最後一個RDD生成的元素 wordcount.foreach(new VoidFunction<Tuple2<String, Integer>>() { @Override public void call(Tuple2<String, Integer> arg0) throws Exception { // TODO Auto-generated method stub System.out.println(arg0._1 + " 出現了:" + arg0._2 + "次"); } }); sc.close(); } }