關於Java:選用1.7或者1.8.爲了通用性,本章內容使用1.7進行編寫。
html
關於Scala:工程不須要增長scala nature,即不需Add Scala Nature。若增長在java代碼中調用scala library會有異常。java
關於Spark版本:使用1.6.3進行編寫。python
maven 依賴shell
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.3</version> </dependency> </dependencies> |
Spark 編程的第一步是須要建立一個JavaSparkContext對象,用來告訴 Spark 如何訪問集羣。在建立 JavaSparkContext以前,你須要構建一個 SparkConf對象, SparkConf 對象包含了一些你應用程序的信息。apache
SparkConf conf = new SparkConf().setAppName("JavaApiLearn").setMaster("local"); @SuppressWarnings("resource") JavaSparkContext jsc = new JavaSparkContext(conf) |
其中編程
setAppName方法指定了spark程序的名稱。api
setMaster("local")指定了spark程序運行的方式,分爲以下幾種。
local |
Run Spark locally with one worker thread (i.e. no parallelism at all). |
local[K] |
Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine). |
local[*] |
Run Spark locally with as many worker threads as logical cores on your machine. |
spark://HOST:PORT |
Connect to the given Spark standalone cluster master. The port must be whichever one your master is configured to use, which is 7077 by default. |
mesos://HOST:PORT |
Connect to the given Mesos cluster. The port must be whichever one your is configured to use, which is 5050 by default. Or, for a Mesos cluster using ZooKeeper, use mesos://zk://... . To submit with --deploy-mode cluster , the HOST:PORT should be configured to connect to the MesosClusterDispatcher. |
yarn |
Connect to a YARN cluster in client or cluster mode depending on the value of --deploy-mode . The cluster location will be found based on the HADOOP_CONF_DIR or YARN_CONF_DIR variable. |
yarn-client |
Equivalent to yarn with --deploy-mode client , which is preferred to `yarn-client` |
yarn-cluster |
Equivalent to yarn with --deploy-mode cluster , which is preferred to `yarn-cluster` |
除了在eclipse、Intellij中運行local模式的任務,也能夠打成jar包,使用spark-submit來進行任務提交。數組
經過List來進行轉化RDDbash
// List to RDD List<String> list = new ArrayList<String>(); list.add("11,22,33,44,55"); list.add("aa,bb,cc,dd,ee"); JavaRDD<String> jrl = jsc.parallelize(list); |
JavaRDD<String> jrf = jsc.textFile("data/README.md"); //JavaRDD<String> jrfFromHDFS = jsc.textFile("hdfs:///data/README.md"); //from hdfs to rdd //JavaRDD<String> jrfFromLocal = jsc.textFile("file:///data/README.md"); //from localfile to rdd |
測試代碼:網絡
List<String> list = new ArrayList<String>(); list.add("11 22,33,44,55"); list.add("aa bb,cc,dd,ee"); list.add("aa bb,cc,dd,ee"); JavaRDD<String> jRDD = jsc.parallelize(list,1); JavaPairRDD<String, String> jPRDD = jRDD.mapToPair(new PairFunction<String, String, String>() { public Tuple2<String, String> call(String s) throws Exception { return new Tuple2<String, String>(s.split("\\s+")[0], s.substring(s.indexOf(" ")+1)); } }); PrintUtilPro.printList(jPRDD.collect()); |
輸出:
11,22,33,44,55 aa,bb,cc,dd,ee aa,bb,cc,dd,ee |
備註:輸出中第一個逗號是tuple中key和value分隔符。
關於Transformation和Actions的操做解釋能夠參照培訓手冊。
The following table lists some of the common transformations supported by Spark. Refer to the RDD API doc (Scala, Java, Python, R) and pair RDD functions doc (Scala, Java) for details.
Transformation | Meaning |
---|---|
map(func) | Return a new distributed dataset formed by passing each element of the source through a function func. 將原來RDD的每一個數據項,使用map中用戶自定義的函數func進行映射,轉變爲一個新的元素,並返回一個新的RDD。 |
filter(func) | Return a new dataset formed by selecting those elements of the source on which funcreturns true. 使用函數func對原RDD中數據項進行過濾,將符合func中條件的數據項組成新的RDD返回。 |
flatMap(func) | Similar to map, but each input item can be mapped to 0 or more output items (so funcshould return a Seq rather than a single item). 相似於map,可是輸入數據項能夠被映射到0個或多個輸出數據集合中,因此函數func的返回值是一個數據項集合而不是一個單一的數據項。 |
mapPartitions(func) | Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T. 相似於map,可是該操做是在每一個分區上分別執行,因此當操做一個類型爲T的RDD時func的格式必須是 |
mapPartitionsWithIndex(func) | Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T. 相似於mapPartitions,可是須要提供給func一個整型值,這個整型值是分區的索引,因此當處理T類型的RDD時,func的格式必須爲 |
sample(withReplacement, fraction, seed) | Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed. 對數據採樣。用戶能夠設定是否有放回(withReplacement)、採樣的百分比(fraction)、隨機種子(seed)。 |
union(otherDataset) | Return a new dataset that contains the union of the elements in the source dataset and the argument. 返回原數據集和參數指定的數據集合並後的數據集。使用union函數時須要保證兩個RDD元素的數據類型相同,返回的RDD數據類型和被合併的RDD元素數據類型相同。該操做不進行去重操做,返回的結果會保存全部元素。若是想去重,可使用distinct()。 |
intersection(otherDataset) | Return a new RDD that contains the intersection of elements in the source dataset and the argument. 返回兩個數據集的交集。 |
subtract(otherDataset) | Return an RDD with the elements from `this` that are not in `other`. 返回this RDD中但不在other RDD中的元素 |
distinct([numTasks])) | Return a new dataset that contains the distinct elements of the source dataset. 將RDD中的元素進行去重操做。 |
groupByKey([numTasks]) | When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs. 操做(K,V)格式的數據集,返回 (K, Iterable)格式的數據集。 |
reduceByKey(func, [numTasks]) | When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in 使用給定的func,將(K,V)對格式的數據集中key相同的值進行彙集,其中func的格式必須爲(V,V) => V。可選參數numTasks能夠指定reduce任務的數目。 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in 對(K,V)格式的數據按key進行聚合操做,聚合時使用給定的合併函數和一個初試值,返回一個(K,U)對格式數據。須要指定的三個參數:zeroValue爲在每一個分區中,對key值第一次讀取V類型的值時,使用的U類型的初始變量;seqOp用於在每一個分區中,相同的key中V類型的值合併到zeroValue建立的U類型的變量中。combOp是對從新分區後兩個分區中傳入的U類型數據的合併函數。 |
sortByKey([ascending], [numTasks]) | When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean (K,V)格式的數據集,其中K已實現了Ordered,通過sortByKey操做返回排序後的數據集。指定布爾值參數ascending來指定升序或降序排列。 |
join(otherDataset, [numTasks]) | When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through 用於操做兩個鍵值對格式的數據集,操做兩個數據集(K,V)和(K,W)返回(K, (V, W))格式的數據集。經過 |
cogroup(otherDataset, [numTasks]) | When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called 用於操做兩個鍵值對格式數據集(K,V)和(K,W),返回數據集格式爲 (K,(Iterable, Iterable)) 。這個操做也稱爲 |
cartesian(otherDataset) | When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements). 對類型爲T和U的兩個數據集進行操做,返回包含兩個數據集全部元素對的(T,U)格式的數據集。即對兩個RDD內的全部元素進行笛卡爾積操做。 |
pipe(command, [envVars]) | Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings. 以管道(pipe)方式將 RDD的各個分區(partition)使用 shell命令處理(好比一個 Perl或 bash腳本)。 RDD的元素會被寫入進程的標準輸入(stdin),將進程返回的一個字符串型 RDD(RDD of strings),以一行文本的形式寫入進程的標準輸出(stdout)中。 |
coalesce(numPartitions) | Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset. 把RDD的分區數下降到經過參數numPartitions指定的值。在獲得的更大一些數據集上執行操做,會更加高效。 |
repartition(numPartitions) | Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network. 隨機地對RDD的數據從新洗牌(Reshuffle),從而建立更多或更少的分區,以平衡數據。老是對網絡上的全部數據進行洗牌(shuffles)。
|
repartitionAndSortWithinPartitions(partitioner) | Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling 根據給定的分區器對RDD進行從新分區,在每一個結果分區中,按照key值對記錄排序。這在每一個分區中比先調用repartition再排序效率更高,由於它能夠將排序過程在shuffle操做的機器上進行。 |
The following table lists some of the common actions supported by Spark. Refer to the RDD API doc (Scala, Java, Python, R)
and pair RDD functions doc (Scala, Java) for details.
Action | Meaning |
---|---|
reduce(func) | Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel. 使用函數func彙集數據集中的元素,這個函數func輸入爲兩個元素,返回爲一個元素。這個函數應該符合結合律和交換了,這樣才能保證數據集中各個元素計算的正確性。 |
collect() | Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data. 在驅動程序中,以數組的形式返回數據集的全部元素。一般用於filter或其它產生了大量小數據集的狀況。 |
count() | Return the number of elements in the dataset. 返回數據集中元素的個數。 |
first() | Return the first element of the dataset (similar to take(1)). 返回數據集中的第一個元素(相似於 |
take(n) | Return an array with the first n elements of the dataset. 返回數據集中的前n個元素。 |
takeSample(withReplacement,num, [seed]) | Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed. 對一個數據集隨機抽樣,返回一個包含num個隨機抽樣元素的數組,參數 |
takeOrdered(n, [ordering]) | Return the first n elements of the RDD using either their natural order or a custom comparator. 返回RDD按天然順序或自定義順序排序後的前n個元素。 |
saveAsTextFile(path) | Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file. 將數據集中的元素以文本文件(或文本文件集合)的形式保存到指定的本地文件系統、HDFS或其它Hadoop支持的文件系統中。Spark將在每一個元素上調用 |
saveAsSequenceFile(path) (Java and Scala) |
Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc). 將數據集中的元素以Hadoop Sequence文件的形式保存到指定的本地文件系統、HDFS或其它Hadoop支持的文件系統中。該操做只支持對實現了Hadoop的 |
saveAsObjectFile(path) (Java and Scala) |
Write the elements of the dataset in a simple format using Java serialization, which can then be loaded using 將數據集中的元素以簡單的Java序列化的格式寫入指定的路徑。這些保存該數據的文件,可使用SparkContext.objectFile()進行加載。 |
countByKey() | Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key. 僅支持對(K,V)格式的鍵值對類型的RDD進行操做。返回(K,Int)格式的Hashmap,(K,Int)爲每一個key值對應的記錄數目。 |
foreach(func) | Run a function func on each element of the dataset. This is usually done for side effects such as updating anAccumulator or interacting with external storage systems. 對數據集中每一個元素使用函數func進行處理。該操做一般用於更新一個累加器(Accumulator)或與外部數據源進行交互。注意:在foreach()以外修改累加器變量可能引發不肯定的後果。詳細介紹請閱讀Understanding closures部分。 |
類型 | 算子 |
---|---|
輸入分區與輸出分區一對一型 | map、flatMap、mapPartitions |
輸入分區與輸出分區多對一型 | union、cartesian、intersection |
輸入分區與輸出分區多對多型 | groupBy、groupByKey |
輸出分區爲輸入分區子集型 | filter、distinct、subtract、sample、takeSample |
Cache型 | cache、persist |
類型 | 算子 |
---|---|
輸入分區與輸出分區一對一 | mapValues |
對單個RDD | combineByKey、reduceByKey、partitionBy、aggregateByKey、SortByKey |
兩個RDD彙集 | Cogroup |
鏈接 | join、leftOutJoin、rightOutJoin |
類型 | 算子 |
---|---|
無輸出 | foreach |
HDFS | saveAsTextFile、saveAsObjectFile |
Scala集合和數據類型 | collect、collectAsMap、reduceByKeyLocally、lookup、count、top、reduce、fold、aggregate |
/* * Function<T,R> * 接收一個輸入值並返回一個輸出值,用於相似map()和filter()的操做中 * R call(T) */ //過濾RDD數據集中包含result的表項,新建RDD數據集resultLines JavaRDD<String> resultLines=lines.filter( new Function<String, Boolean>() { public Boolean call(String v1)throws Exception { return v1.contains("result"); } } ); |
/* * Function<T1,T2,R> * 接收兩個輸入值並返回一個輸出值,用於相似aggregate()和fold()等操做中 * R call(T1,T2) */ List<String> strLine=new ArrayList<String>(); strLine.add("hello world"); strLine.add("This is Spark"); strLine.add("This is JavaRDD") JavaRDD<String> input=sc.parallelize(strLine); //如下代碼的功能是wordcount,其中的reduceByKey操做的Function2函數定義了遇到相同的key時,value是如何reduce的————直接將二者的value相加。 //將文本行的單詞過濾出來,並將全部的單詞保存在RDD數據集words中。切分爲單詞,扁平化處理。見FlatMapFunction< T,R> JavaRDD<String> words=input.flatMap( new FlatMapFunction<String, String>() { public Iterable<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")); } } ); //轉化爲鍵值對 JavaPairRDD<String,Integer> counts=words.mapToPair( new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2(s, 1); } } ); //對每一個詞語進行計數 JavaPairRDD <String,Integer> results=counts.reduceByKey( new Function2<Integer, Integer, Integer>() { public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } } ) ; |
/* * FlatMapFunction<T,R> * 接收一個輸入值並返回任意個輸出,用於相似flatMap()這樣的操做中 * Iterable <R> call(T) */ List<String> strLine=new ArrayList<String>(); strLine.add("hello world"); strLine.add("This is Spark"); strLine.add("This is JavaRDD") JavaRDD<String> input=sc.parallelize(strLine); //將文本行的單詞過濾出來,並將全部的單詞保存在RDD數據集words中。 JavaRDD<String> words=input.flatMap( new FlatMapFunction<String, String>() { public Iterable<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")); } } ); |
/* * PairFunction<T,K,R> * 接收一個輸入值並返回一個Tuple,用於相似mapToPair()這樣的操做中,將一個元素變爲一個鍵值對(PairRDD) * Tuple2<K, V> call(T) */ //轉化爲鍵值對 JavaPairRDD<String,Integer> counts=words.mapToPair( new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2(s, 1); } } ); |