Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.html
Spark是一個快速且多功能的集羣計算系統。它爲多種不一樣語言提供高級API,和支持通常執行圖的優化引擎。它也有豐富的高級工具集,Spark SQL進行結構化數據的處理,MLib處理機器學習,GraphX進行圖處理,以及Spark Streaming流計算。java
它的主要組件有:算法
SparkCoresql
SparkSQLshell
SparkStreamingexpress
MLlibapache
GraphX編程
BlinkDBjson
Tachyon數組
返回一個包含數據集前n個元素的數組
編寫代碼
object WordCountDemo { def main(args: Array[String]): Unit = { //建立Spark配置對象 val conf = new SparkConf().setMaster("local").setAppName("MyApp") //經過conf建立sc val sc = new SparkContext(conf) //讀取文件 val rdd1 = sc.textFile("/Users/README.md") //計算 val rdd2 = rdd1.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_) //打印 rdd2.take(10).foreach(println) } }
public class WordCountJavaDemo { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setAppName("myapp").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> rdd1 = sc.textFile("/Users/README.md"); JavaRDD<String> rdd2 = rdd1.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) throws Exception { List<String> list = new ArrayList<>(); String[] arr = s.split(" "); for (String ss : arr) { list.add(ss); } return list.iterator(); } }); JavaPairRDD<String, Integer> rdd3 = rdd2.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s, 1); } }); JavaPairRDD<String, Integer> rdd4 = rdd3.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); List<Tuple2<String, Integer>> list = rdd4.collect(); for (Tuple2<String, Integer> t : list) { System.out.println(t._1() + " " + t._2()); } } }
RDD會在多個節點上存儲,就和hdfs的分佈式道理是同樣的。hdfs文件被切分爲多個block存儲在各個節點上,而RDD是被切分爲多個partition。不一樣的partition可能在不一樣的節點上。
一、Driver 分發task,在分發以前,會調用RDD的方法,獲取partition的位置。 將task的計算結果,拉回到Driver端 Driver是一個JVM進程
二、Worker
圖中stage2的並行度是4,也就是有4個task。
寬依賴
父RDD與子RDD,partition的關係是一對多,就是寬依賴。寬依賴於shuffle對應。
窄依賴
父RDD與子RDD,partition的關係是一對一或多對一,就是窄依賴。
特色:懶執行
(1)map
map的輸入變換函數應用於RDD中全部元素
(2)flatMap
flatMap與map區別在於map爲「映射」,而flatMap「先映射,後扁平化」,map對每一次(func)都產生一個元素,返回一個對象,而flatMap多一步就是將全部對象合併爲一個對象。
(3)flatMapValues
每一個元素的Value被輸入函數映射爲一系列的值,而後這些值再與原RDD中的Key組成一系列新的KV對。
代碼
x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])]) def f(x): return x x.flatMapValues(f).collect()
打印結果
[('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]
filter
過濾操做,知足filter內function函數爲true的RDD內全部元素組成一個新的數據集。
(4)groupByKey
主要做用是將相同的全部的鍵值對分組到一個集合序列當中,其順序是不肯定的。
(5)reduceByKey
與groupByKey相似,卻有不一樣。如(a,1), (a,2), (b,1), (b,2)。groupByKey產生中間結果爲( (a,1), (a,2) ), ( (b,1), (b,2) )。而reduceByKey爲(a,3), (b,3)。
reduceByKey主要做用是聚合,groupByKey主要做用是分組。
(6)take
特色:當即觸發執行
Spark SQL is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed. Internally, Spark SQL uses this extra information to perform extra optimizations. There are several ways to interact with Spark SQL including SQL and the Dataset API. When computing a result the same execution engine is used, independent of which API/language you are using to express the computation. This unification means that developers can easily switch back and forth between different APIs based on which provides the most natural way to express a given transformation.
SparkSQL是Spark的一個用來處理結構化數據的模塊。使用相似SQL的方式訪問Hadoop,實現MR計算。
A Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. A Dataset can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, etc.). The Dataset API is available in Scala and Java. Python does not have the support for the Dataset API. But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName). The case for R is similar.
Dataset是分佈式數據集合。
A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. The DataFrame API is available in Scala, Java, Python, and R. In Scala and Java, a DataFrame is represented by a Dataset of Rows. In the Scala API, DataFrame is simply a type alias of Dataset[Row]. While, in Java API, users need to use Dataset<Row> to represent a DataFrame.
(1)建立DataFrames
數據
{"id":"1","name":"zhangsan","age":"12"} {"id":"2","name":"lisi","age":"12"} {"id":"3","name":"wangwu","age":"12"}
代碼
object SparkSqlDemo { def main(args: Array[String]): Unit = { //建立Spark配置對象 val conf = new SparkConf().setMaster("local[4]").setAppName("MyApp"); val spark = SparkSession .builder() .appName("Spark SQL basic example") .config(conf) .getOrCreate() val df = spark.read.json("/Users/opensource/dev-problem/source/people_sample_json.json"); df.show() } }
(2)查詢
val df = spark.read.json("/Users/fangzhijie/opensource/dev-problem/source/people_sample_json.json"); df.createOrReplaceTempView("people") val sqlDF = spark.sql("SELECT * FROM people WHERE name = 'zhangsan'") sqlDF.show()
Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window. Finally, processed data can be pushed out to filesystems, databases, and live dashboards. In fact, you can apply Spark’s machine learning and graph processing algorithms on data streams.
(1)簡單使用
object SparkStreamingDemo { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") //建立Spark流上下文 val ssc = new StreamingContext(conf, Seconds(1)) //建立Socket文本流 val lines = ssc.socketTextStream("localhost", 9999) val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) // Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print() //啓動 ssc.start() //等待結束 ssc.awaitTermination() // Wait for the computation to terminate } }
使用shell命令監聽端口,輸入待計算內容
$ nc -lk 9999
SparkStreaming的編程抽象是離散化流(DStream),它是一個RDD序列,每一個RDD表明數據流中一個時間片內的數據。
Spark Quick Start Spark32個經常使用算子總結 SparkSQL Guide SparkSQL官方文檔 《Spark快速大數據分析》 SparkStream官方文檔
原文出處:https://www.cnblogs.com/fonxian/p/11887518.html