大數據學習——spark學習

計算圓周率python

[root@mini1 bin]# ./run-example SparkPi

 

 

 

[root@mini1 bin]# ./run-example SparkPi 10

 

[root@mini1 bin]# ./run-example SparkPi 1000

 

運行spark-shell的兩種方式:shell

1直接運行spark-shell apache

  單機經過多線程跑任務,只運行一個進程叫submitapi

2運行spark-shell --master spark://mini1:7077多線程

  將任務運行在集羣中,運行submit在master上,運行executor在worker上oop

 

 

啓動spa

[root@mini1 bin]# ./spark-shell 命令行

 

hdfs線程

hadoop/sbin/start-dfs.sh3d

 

計算wordcount

 sc.textFile("/root/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect

 

升序,降序排列

 

mapReduce執行流程

 

 

從hdfs採集數據

上傳文件 hdfs dfs -put words.txt / 

sc.textFile("hdfs://mini1:9000/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).collect

 

 

經過spark的api寫wordcount 

本地運行

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by Administrator on 2019/6/11.
  */
object WordCount extends App {
  //建立conf,設置應用的名字和運行的方式,local[2]運行2線程,產生兩個文件結果

  val conf = new SparkConf().setAppName("wordcount").setMaster("local[2]")

  //建立sparkcontext
  val sc = new SparkContext(conf)


  val file: RDD[String] = sc.textFile("hdfs://mini1:9000/words.txt")
  val words: RDD[String] = file.flatMap(_.split(" "))
  //壓平,分割每一行數據爲每一個單詞
  val tuple: RDD[(String, Int)] = words.map((_, 1))
  //將單詞轉換爲(單詞,1)
  val result: RDD[(String, Int)] = tuple.reduceByKey(_ + _)
  //將相同的key進行彙總聚合
  val resultSort: RDD[(String, Int)] = result.sortBy(_._2, false) //排序
  //  result.collect() //在命令行打印
  resultSort.foreach(println)


}

 

集羣運行

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by Administrator on 2019/6/11.
  */
object WordCount {
  def main(args: Array[String]) {

    //建立conf,設置應用的名字和運行的方式,local[2]運行2線程,產生兩個文件結果
    //.setMaster("local[1]")採用1個線程,在本地模擬spark運行模式
    val conf = new SparkConf().setAppName("wordcount")

    //建立sparkcontext
    val sc = new SparkContext(conf)


    val file: RDD[String] = sc.textFile("hdfs://mini1:9000/words.txt")
    val words: RDD[String] = file.flatMap(_.split(" "))
    //壓平,分割每一行數據爲每一個單詞
    val tuple: RDD[(String, Int)] = words.map((_, 1))
    //將單詞轉換爲(單詞,1)
    val result: RDD[(String, Int)] = tuple.reduceByKey(_ + _)
    //將相同的key進行彙總聚合
    val resultSort: RDD[(String, Int)] = result.sortBy(_._2, false) //排序
    resultSort.saveAsTextFile(args(1))

  }


}

打包

 

把該代碼包傳到任意一臺裝有spark的機器上

 

我上傳到了bin下

 

 提交

[root@mini1 bin]# ./spark-submit --help
#開始加了這兩個參數 致使一直運行失敗,連接超時,還去問了初夏老師
[root@mini1 bin]# ./spark-submit --master spark://mini1:7077--class com.cyf.WordCount --executor-memory 200M --total-executor-cores 1 original-spark_6_01-1.0-SNAPSHOT.jar hdfs://mini1:9000/words.txt hdfs://mini1:9000/ceshi/wordcountcluster
 
 
[root@mini1 bin]#./spark-submit --master spark://mini1:7077 --class com.cyf.WordCount  original-spark_6_01-1.0-SNAPSHOT.jar hdfs://mini1:9000/words.txt hdfs://mini1:9000/ceshi/wordcountcluster

 

開始加上邊兩個參數運行,一直報鏈接超時的錯誤

 

後來把參數去掉,運行成功了

 

 

 

 

 

 

 

 

 

python

wo.py

#!/usr/bin/python

from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName("aaa").setMaster("spark://mini1:7077")
sc = SparkContext(conf=conf)
data = ["tom", "lilei", "tom", "lilei", "wangsf"]
rdd = sc.parallelize(data).map(lambda x: (x, 1 )).reduceByKey(lambda a, b: a + b).saveAsTextFile("hdfs://mini1:9000/ceshi/python2")

 上傳,運行

[root@mini1 bin]# ./spark-submit wo.py

 

 

 

 

 

相關文章
相關標籤/搜索