Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.html
Spark是一個快速且多功能的集羣計算系統。它爲多種不一樣語言提供高級API,和支持通常執行圖的優化引擎。它也有豐富的高級工具集,Spark SQL進行結構化數據的處理,MLib處理機器學習,GraphX進行圖處理,以及Spark Streaming流計算。java
object WordCountDemo { def main(args: Array[String]): Unit = { //建立Spark配置對象 val conf = new SparkConf().setMaster("local").setAppName("MyApp") //經過conf建立sc val sc = new SparkContext(conf) //讀取文件 val rdd1 = sc.textFile("/Users/README.md") //計算 val rdd2 = rdd1.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_) //打印 rdd2.take(10).foreach(println) } }
public class WordCountJavaDemo { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setAppName("myapp").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> rdd1 = sc.textFile("/Users/README.md"); JavaRDD<String> rdd2 = rdd1.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) throws Exception { List<String> list = new ArrayList<>(); String[] arr = s.split(" "); for (String ss : arr) { list.add(ss); } return list.iterator(); } }); JavaPairRDD<String, Integer> rdd3 = rdd2.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s, 1); } }); JavaPairRDD<String, Integer> rdd4 = rdd3.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); List<Tuple2<String, Integer>> list = rdd4.collect(); for (Tuple2<String, Integer> t : list) { System.out.println(t._1() + " " + t._2()); } } }
x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])]) def f(x): return x x.flatMapValues(f).collect()
[('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]
與groupByKey相似,卻有不一樣。如(a,1), (a,2), (b,1), (b,2)。groupByKey產生中間結果爲( (a,1), (a,2) ), ( (b,1), (b,2) )。而reduceByKey爲(a,3), (b,3)。
Spark SQL is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed. Internally, Spark SQL uses this extra information to perform extra optimizations. There are several ways to interact with Spark SQL including SQL and the Dataset API. When computing a result the same execution engine is used, independent of which API/language you are using to express the computation. This unification means that developers can easily switch back and forth between different APIs based on which provides the most natural way to express a given transformation.
A Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. A Dataset can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, etc.). The Dataset API is available in Scala and Java. Python does not have the support for the Dataset API. But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName). The case for R is similar.
A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. The DataFrame API is available in Scala, Java, Python, and R. In Scala and Java, a DataFrame is represented by a Dataset of Rows. In the Scala API, DataFrame is simply a type alias of Dataset[Row]. While, in Java API, users need to use Dataset
to represent a DataFrame.
{"id":"1","name":"zhangsan","age":"12"} {"id":"2","name":"lisi","age":"12"} {"id":"3","name":"wangwu","age":"12"}
object SparkSqlDemo { def main(args: Array[String]): Unit = { //建立Spark配置對象 val conf = new SparkConf().setMaster("local[4]").setAppName("MyApp"); val spark = SparkSession .builder() .appName("Spark SQL basic example") .config(conf) .getOrCreate() val df = spark.read.json("/Users/opensource/dev-problem/source/people_sample_json.json"); df.show() } }
val df = spark.read.json("/Users/fangzhijie/opensource/dev-problem/source/people_sample_json.json"); df.createOrReplaceTempView("people") val sqlDF = spark.sql("SELECT * FROM people WHERE name = 'zhangsan'") sqlDF.show()
Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window. Finally, processed data can be pushed out to filesystems, databases, and live dashboards. In fact, you can apply Spark’s machine learning and graph processing algorithms on data streams.
object SparkStreamingDemo { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") //建立Spark流上下文 val ssc = new StreamingContext(conf, Seconds(1)) //建立Socket文本流 val lines = ssc.socketTextStream("localhost", 9999) val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) // Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print() //啓動 ssc.start() //等待結束 ssc.awaitTermination() // Wait for the computation to terminate } }
$ nc -lk 9999
