spark官方教程包括如下幾個模塊java
spark主要使用的分佈式抽象集合工具是Dataset。Dataset建立方式有從Haddop InputForamtes建立,或者從其餘DataSet轉換而來。sql
Hadoop InputFormats方式:apache
scala> val textFile = spark.read.textFile("README.md") textFile: org.apache.spark.sql.Dataset[String] = [value: string]
對於DataSet,咱們能夠直接獲取數據,好比調用action方法,或者經過transform方法轉換成一個新的數值;編程
action操做安全
scala> textFile.count() // Number of items in this Dataset res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs scala> textFile.first() // First item in this Dataset res1: String = # Apache Spark
tranform操做app
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark")) linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
transform+action操做機器學習
scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"? res3: Long = 15
更多數據集操做分佈式
Dataset action和transformations能被適用於更多更復雜的操做。好比獲取一個文件中最多單詞的行 所包含的單詞數目函數
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b) res4: Long = 15
先用一個map,把每行的單詞數統計出來。再用一個reduce,把 最多單詞的行 所包含的單詞數目返回回去。對於reduce中的操做,咱們能夠使用數學庫中的max函數代替工具
scala> import java.lang.Math import java.lang.Math scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b)) res5: Int = 15
Caching
spark支持把數據集推送獲得集羣範圍的內存中。在數據常常須要訪問的時候這個函數時頗有用。
spark應用例子
/* SimpleApp.scala */ import org.apache.spark.sql.SparkSession object SimpleApp { def main(args: Array[String]) { val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system val spark = SparkSession.builder.appName("Simple Application").getOrCreate() val logData = spark.read.textFile(logFile).cache() val numAs = logData.filter(line => line.contains("a")).count() val numBs = logData.filter(line => line.contains("b")).count() println(s"Lines with a: $numAs, Lines with b: $numBs") spark.stop() } }
程序很簡單,只是對spark的readme文件,獲取包含字母a的行數和字母b的行數。