Spark基本命令

1、spark所在目錄

cd usr/local/spark

2、啓動spark

/usr/local/spark/sbin/start-all.sh

啓動Hadoop以及Spark:html

bash ./starths.sh

瀏覽器查看:python

172.16.31.17:8080

中止Hadoop以及Sparksql

bash ./stophs.sh

3、基礎使用

參考連接:https://www.cnblogs.com/dasn/articles/5644919.htmlshell

1.運行Spark示例(SparkPi)

在 ./examples/src/main 目錄下有一些 Spark 的示例程序,有 Scala、Java、Python、R 等語言的版本。apache

咱們能夠先運行一個示例程序 SparkPi(即計算 π 的近似值),執行以下命令:編程

./bin/run-example SparkPi 2>&1 | grep "Pi is roughly"

執行時會輸出很是多的運行信息,輸出結果不容易找到,能夠經過 grep 命令進行過濾(命令中的 2>&1 能夠將全部的信息都輸出到 stdout 中,不然因爲輸出日誌的性質,仍是會輸出到屏幕中)json

Python 版本的 SparkPi 則須要經過 spark-submit 運行:瀏覽器

./bin/spark-submit examples/src/main/python/pi.py

2.經過Spark-shell進行交互分析

Spark Shell 支持 Scala 和 Python緩存

Scala 運行於 Java 平臺(JVM,Java 虛擬機),併兼容現有的 Java 程序。bash

Scala 是 Spark 的主要編程語言,若是僅僅是寫 Spark 應用,並不是必定要用 Scala,用 Java、Python 都是能夠的。 使用 Scala 的優點是開發效率更高,代碼更精簡,而且能夠經過 Spark Shell 進行交互式實時查詢,方便排查問題。

2.1 啓動Spark Shell

./bin/spark-shell

2.2 基礎操做

Spark 的主要抽象是分佈式的元素集合(distributed collection of items),稱爲RDD(Resilient Distributed Dataset,彈性分佈式數據集),它可被分發到集羣各個節點上,進行並行操做。RDDs 能夠經過 Hadoop InputFormats 建立(如 HDFS),或者從其餘 RDDs 轉化而來。

2.2.1 RDD建立方式

參考連接:

https://blog.csdn.net/sundujing/article/details/51397209 http://www.javashuo.com/article/p-nimgstgx-go.html https://www.iteblog.com/archives/1512.html https://www.jianshu.com/p/a7503b26cb73

(1)使用本地文件建立RDD:

val textFile = sc.textFile("file:///usr/local/spark/README.md")

代碼中經過 「file://」 前綴指定讀取本地文件。

(2)使用hdfs文件建立RDD:

val textfile = sc.textFile("/winnie/htest/test01.txt")

Spark shell 默認是讀取 HDFS 中的文件,須要先上傳文件到 HDFS 中,不然會有「org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://localhost:9000/user/local/spark/README.md」的錯誤。

(3)使用集合建立RDD:

從集合中建立RDD主要有下面兩個方法:makeRDD 和 parallelize

makeRDD示例:

sc.makeRDD(1 to 50)

sc.makeRDD(Array("1", "2", "3"))

sc.makeRDD(List(1,3,5))

parallelize示例:

同上

2.2.2 RDD兩種操做

參考連接: https://blog.csdn.net/HANLIPENGHANLIPENG/article/details/53508746

(1)RDDs支持兩種類型的操做:

actions: 執行操做,在數據集上運行計算後返回值

transformations: 轉化操做,從現有數據集建立一個新的數據集

轉化操做並不會當即執行,而是到了執行操做纔會被執行

轉化操做:

map()          參數是函數,函數應用於RDD每個元素,返回值是新的RDD 
flatMap()      參數是函數,函數應用於RDD每個元素,將元素數據進行拆分,變成迭代器,返回值是新的RDD 
filter()       參數是函數,函數會過濾掉不符合條件的元素,返回值是新的RDD 
distinct()     沒有參數,將RDD裏的元素進行去重操做 
union()        參數是RDD,生成包含兩個RDD全部元素的新RDD 
intersection() 參數是RDD,求出兩個RDD的共同元素 
subtract()     參數是RDD,將原RDD裏和參數RDD裏相同的元素去掉 
cartesian()    參數是RDD,求兩個RDD的笛卡兒積

行動操做:

collect()       返回RDD全部元素 
count()         RDD裏元素個數,對於文本文件,爲總行數
first()			RRD中的第一個item,對於文本文件,爲首行內容 
countByValue()  各元素在RDD中出現次數 
reduce()        並行整合全部RDD數據,例如求和操做 
fold(0)(func)   和reduce功能同樣,不過fold帶有初始值 
aggregate(0)(seqOp,combop) 和reduce功能同樣,可是返回的RDD數據類型和原RDD不同 
foreach(func)   對RDD每一個元素都是使用特定函數

行動操做每次的調用是不會存儲前面的計算結果的,若想要存儲前面的操做結果,要把須要緩存中間結果的RDD調用cache(),cache()方法是把中間結果緩存到內存中,也能夠指定緩存到磁盤中(也能夠只用persisit())

(2)實例

textFile.count()
textFile.first()

val linesWithSpark = textFile.filter(line => line.contains("Spark")) // 篩選出包含 Spark 的行
linesWithSpark.count() // 統計行數

action 和 transformation 能夠用鏈式操做的方式結合使用,使代碼更爲簡潔:

textFile.filter(line => line.contains("Spark")).count()

找到包含單詞最多的那一行內容共有幾個單詞:

textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)

代碼首先將每一行內容 map 爲一個整數,這將建立一個新的 RDD,並在這個 RDD 中執行 reduce 操做,找到最大的數。map()、reduce() 中的參數是 Scala 的函數字面量(function literals,也稱爲閉包 closures),而且能夠使用語言特徵或 Scala/Java 的庫。

Hadoop MapReduce 是常見的數據流模式,在 Spark 中一樣能夠實現(下面這個例子也就是 WordCount):

val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)  // 實現單詞統計
wordCounts.collect() // 輸出單詞統計結果

2.2.3 Spark shell退出

參考連接:http://www.javashuo.com/article/p-hwerenoj-db.html

:quit

3.Spark SQL 和 DataFrames

Spark SQL 是 Spark 內嵌的模塊,用於結構化數據。

在 Spark 程序中能夠使用 SQL 查詢語句或 DataFrame API。

DataFrames 和 SQL 提供了通用的方式來鏈接多種數據源,支持 Hive、Avro、Parquet、ORC、JSON、和 JDBC,而且能夠在多種數據源之間執行 join 操做。

3.1 Spark SQL 基本操做

Spark SQL 的功能是經過 SQLContext 類來使用的,而建立 SQLContext 是經過 SparkContext 建立的。

在 Spark shell 啓動時,輸出日誌的最後有這麼幾條信息:

這些信息代表 SparkContent 和 SQLContext 都已經初始化好了,可經過對應的 sc、sqlContext 變量直接進行訪問。

使用 SQLContext 能夠從現有的 RDD 或數據源建立 DataFrames。

經過 Spark 提供的 JSON 格式的數據源文件 ./examples/src/main/resources/people.json 來進行演示,該數據源內容以下:


【補充:xshell上傳文件】

rz

【補充:xshell下載文件】

sz
相關文章
相關標籤/搜索