計算圓周率python
[root@mini1 bin]# ./run-example SparkPi
[root@mini1 bin]# ./run-example SparkPi 10
[root@mini1 bin]# ./run-example SparkPi 1000
運行spark-shell的兩種方式:shell
1直接運行spark-shell apache
單機經過多線程跑任務,只運行一個進程叫submitapi
2運行spark-shell --master spark://mini1:7077多線程
將任務運行在集羣中,運行submit在master上,運行executor在worker上oop
啓動spa
[root@mini1 bin]# ./spark-shell 命令行
hdfs線程
hadoop/sbin/start-dfs.sh3d
計算wordcount
sc.textFile("/root/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
升序,降序排列
mapReduce執行流程
從hdfs採集數據
上傳文件 hdfs dfs -put words.txt /
sc.textFile("hdfs://mini1:9000/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).collect
經過spark的api寫wordcount
本地運行
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * Created by Administrator on 2019/6/11. */ object WordCount extends App { //建立conf,設置應用的名字和運行的方式,local[2]運行2線程,產生兩個文件結果 val conf = new SparkConf().setAppName("wordcount").setMaster("local[2]") //建立sparkcontext val sc = new SparkContext(conf) val file: RDD[String] = sc.textFile("hdfs://mini1:9000/words.txt") val words: RDD[String] = file.flatMap(_.split(" ")) //壓平,分割每一行數據爲每一個單詞 val tuple: RDD[(String, Int)] = words.map((_, 1)) //將單詞轉換爲(單詞,1) val result: RDD[(String, Int)] = tuple.reduceByKey(_ + _) //將相同的key進行彙總聚合 val resultSort: RDD[(String, Int)] = result.sortBy(_._2, false) //排序 // result.collect() //在命令行打印 resultSort.foreach(println) }
集羣運行
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * Created by Administrator on 2019/6/11. */ object WordCount { def main(args: Array[String]) { //建立conf,設置應用的名字和運行的方式,local[2]運行2線程,產生兩個文件結果 //.setMaster("local[1]")採用1個線程,在本地模擬spark運行模式 val conf = new SparkConf().setAppName("wordcount") //建立sparkcontext val sc = new SparkContext(conf) val file: RDD[String] = sc.textFile("hdfs://mini1:9000/words.txt") val words: RDD[String] = file.flatMap(_.split(" ")) //壓平,分割每一行數據爲每一個單詞 val tuple: RDD[(String, Int)] = words.map((_, 1)) //將單詞轉換爲(單詞,1) val result: RDD[(String, Int)] = tuple.reduceByKey(_ + _) //將相同的key進行彙總聚合 val resultSort: RDD[(String, Int)] = result.sortBy(_._2, false) //排序 resultSort.saveAsTextFile(args(1)) } }
打包
把該代碼包傳到任意一臺裝有spark的機器上
我上傳到了bin下
提交
[root@mini1 bin]# ./spark-submit --help
#開始加了這兩個參數 致使一直運行失敗,連接超時,還去問了初夏老師
[root@mini1 bin]# ./spark-submit --master spark://mini1:7077--class com.cyf.WordCount --executor-memory 200M --total-executor-cores 1 original-spark_6_01-1.0-SNAPSHOT.jar hdfs://mini1:9000/words.txt hdfs://mini1:9000/ceshi/wordcountcluster
[root@mini1 bin]#./spark-submit --master spark://mini1:7077 --class com.cyf.WordCount original-spark_6_01-1.0-SNAPSHOT.jar hdfs://mini1:9000/words.txt hdfs://mini1:9000/ceshi/wordcountcluster
開始加上邊兩個參數運行,一直報鏈接超時的錯誤
後來把參數去掉,運行成功了
python
wo.py
#!/usr/bin/python from pyspark import SparkContext, SparkConf conf = SparkConf().setAppName("aaa").setMaster("spark://mini1:7077") sc = SparkContext(conf=conf) data = ["tom", "lilei", "tom", "lilei", "wangsf"] rdd = sc.parallelize(data).map(lambda x: (x, 1 )).reduceByKey(lambda a, b: a + b).saveAsTextFile("hdfs://mini1:9000/ceshi/python2")
上傳,運行
[root@mini1 bin]# ./spark-submit wo.py