spark框架是用scala寫的,運行在Java虛擬機(JVM)上。支持Python、Java、Scala或R多種語言編寫客戶端應用。html
訪問http://spark.apache.org/downloads.html選擇預編譯的版本進行下載。java
打開終端,將工做路徑轉到下載的spark壓縮包所在的目錄,而後解壓壓縮包。
可以使用以下命令:python
cd ~ tar -xf spark-2.2.2-bin-hadoop2.7.tgz -C /opt/module/ cd spark-2.2.2-bin-hadoop2.7 ls
注:tar命令中x標記指定tar命令執行解壓縮操做,f標記指定壓縮包的文件名。es6
包含用來入門spark的簡單使用說明sql
包含可用來和spark進行各類方式交互的一系列可執行文件shell
包含spark項目主要組件的源代碼apache
包含一些可查看和運行的spark程序,對學習spark的API很是有幫助編程
./bin/run-example SparkPi 10
./bin/spark-shell --master local[2] # --master選項指定運行模式。local是指使用一個線程本地運行;local[N]是指使用N個線程本地運行。
./bin/pyspark --master local[2]
./bin/sparkR --master local[2]
#支持多種語言提交 ./bin/spark-submit examples/src/main/python/pi.py 10 ./bin/spark-submit examples/src/main/r/dataframe.R ...
使用spark-shell腳本進行交互式分析。app
scala> val textFile = spark.read.textFile("README.md") textFile: org.apache.spark.sql.Dataset[String] = [value: string] scala> textFile.count() // Number of items in this Dataset res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs scala> textFile.first() // First item in this Dataset res1: String = # Apache Spark #使用filter算子返回原DataSet的子集 scala> val linesWithSpark = textFile.filter(line => line.contains("Spark")) linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string] #拉鍊方式 scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"? res3: Long = 15
#使用DataSet的轉換和動做查找最多單詞的行 scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b) res4: Long = 15
#統計單詞個數 scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count() wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint] scala> wordCounts.collect() res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
使用pyspark腳本進行交互式分析框架
>>> textFile = spark.read.text("README.md") >>> textFile.count() # Number of rows in this DataFrame 126 >>> textFile.first() # First row in this DataFrame Row(value=u'# Apache Spark') #filter過濾 >>> linesWithSpark = textFile.filter(textFile.value.contains("Spark")) #拉鍊方式 >>> textFile.filter(textFile.value.contains("Spark")).count() # How many lines contain "Spark"? 15
#查找最多單詞的行 >>> from pyspark.sql.functions import * >>> textFile.select(size(split(textFile.value, "\s+")).name("numWords")).agg(max(col("numWords"))).collect() [Row(max(numWords)=15)] #統計單詞個數 >>> wordCounts = textFile.select(explode(split(textFile.value, "\s+")).alias("word")).groupBy("word").count() >>> wordCounts.collect() [Row(word=u'online', count=1), Row(word=u'graphs', count=1), ...]
spark除了交互式運行以外,spark也能夠在Java、Scala或Python的獨立程序中被鏈接使用。
獨立應用與shell的主要區別在於須要自行初始化SparkContext。
分別統計包含單詞a和單詞b的行數
/* SimpleApp.scala */ import org.apache.spark.sql.SparkSession object SimpleApp { def main(args: Array[String]) { val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system val spark = SparkSession.builder.appName("Simple Application").getOrCreate() val logData = spark.read.textFile(logFile).cache() val numAs = logData.filter(line => line.contains("a")).count() val numBs = logData.filter(line => line.contains("b")).count() println(s"Lines with a: $numAs, Lines with b: $numBs") spark.stop() } }
運行應用
# Use spark-submit to run your application $ YOUR_SPARK_HOME/bin/spark-submit \ --class "SimpleApp" \ --master local[4] \ target/scala-2.11/simple-project_2.11-1.0.jar ... Lines with a: 46, Lines with b: 23
分別統計包含單詞a和單詞b的行數
/* SimpleApp.java */ import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.Dataset; public class SimpleApp { public static void main(String[] args) { String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate(); Dataset<String> logData = spark.read().textFile(logFile).cache(); long numAs = logData.filter(s -> s.contains("a")).count(); long numBs = logData.filter(s -> s.contains("b")).count(); System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs); spark.stop(); } }
運行應用
# Use spark-submit to run your application $ YOUR_SPARK_HOME/bin/spark-submit \ --class "SimpleApp" \ --master local[4] \ target/simple-project-1.0.jar ... Lines with a: 46, Lines with b: 23
分別統計包含單詞a和單詞b的行數
setup.py腳本添加內容 install_requires=[ 'pyspark=={site.SPARK_VERSION}' ]
"""SimpleApp.py""" from pyspark.sql import SparkSession logFile = "YOUR_SPARK_HOME/README.md" # Should be some file on your system spark = SparkSession.builder().appName(appName).master(master).getOrCreate() logData = spark.read.text(logFile).cache() numAs = logData.filter(logData.value.contains('a')).count() numBs = logData.filter(logData.value.contains('b')).count() print("Lines with a: %i, lines with b: %i" % (numAs, numBs)) spark.stop()
運行應用
# Use spark-submit to run your application $ YOUR_SPARK_HOME/bin/spark-submit \ --master local[4] \ SimpleApp.py ... Lines with a: 46, Lines with b: 23
忠於技術,熱愛分享。歡迎關注公衆號:java大數據編程,瞭解更多技術內容。