在上一節Spark經典的單詞統計中,瞭解了幾個RDD操做,包括flatMap,map,reduceByKey,以及後面簡化的方案,countByValue。那麼這一節將介紹更多經常使用的RDD操做,而且爲每一種RDD咱們分解來看其運做的狀況。java
flatMap,有着一對多的表現,輸入一輸出多。而且會將每個輸入對應的多個輸出整合成一個大的集合,固然不用擔憂這個集合會超出內存的範圍,由於spark會自覺地將過多的內容溢寫到磁盤。固然若是對運行的機器的內存有着足夠的信心,也能夠將內容存儲到內存中。python
爲了更好地理解flatMap,咱們將舉一個例子來講明。固然和往常同樣,會準備好例子對應的數據文本,文本名稱爲uv.txt,該文本和示例程序能夠從github上下載。如下會用三種語言:scala、java、python去描述,同時在java中會對比採用java和java8來實現各個例子。其中java和scala程序在github能直接下載,而python則暫時不提供,後續會補上。git
import org.apache.spark.{SparkConf, SparkContext}
object SparkFlatMap {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("SparkFlatMap")
val sc = new SparkContext(conf)
//設置數據路徑
val textData = sc.textFile("./uv.txt")
//輸出處理前總行數
println("before:"+textData.count()+"行")
//輸出處理前第一行數據
println("first line:"+textData.first())
//進行flatMap處理
val flatData = textData.flatMap(line => line.split(" "))
//輸出處理後總行數
println("after:"+flatData.count())
//輸出處理後第一行數據
println("first line:"+flatData.first())
//將結果保存在flatResultScala文件夾中
flatData.saveAsTextFile("./flatResultScala")
}
}
複製代碼
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import java.util.Arrays;
import java.util.Iterator;
public class SparkFlatMapJava {
public static void main(String[] args){
SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkFlatMapJava");
JavaSparkContext sc = new JavaSparkContext(conf);
//java實現
flatMapJava(sc);
//java8實現
flatMapJava8(sc);
}
public static void flatMapJava(JavaSparkContext sc){
//設置數據路徑
JavaRDD<String> textData = sc.textFile("./uv.txt");
//輸出處理前總行數
System.out.println("before:"+textData.count()+"行");
//輸出處理前第一行數據
System.out.println("first line:"+textData.first()+"行");
//進行flatMap處理
JavaRDD<String> flatData = textData.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split(" ")).iterator();
}
});
//輸出處理後總行數
System.out.println("after:"+flatData.count()+"行");
//輸出處理後第一行數據
System.out.println("first line:"+flatData.first()+"行");
//將結果保存在flatResultScala文件夾中
flatData.saveAsTextFile("./flatResultJava");
}
public static void flatMapJava8(JavaSparkContext sc){
sc.textFile("./uv.txt")
.flatMap(line -> Arrays.asList(line.split(" ")).iterator())
.saveAsTextFile("./flatResultJava8");
}
}
複製代碼
from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local").setAppName("FlatMapPython")
sc = SparkContext(conf=conf)
textData = sc.textFile("./uv.txt")
print("before:"+str(textData.count())+"行")
print("first line"+textData.first())
flatData = textData.flatMap(lambda line:line.split(" "))
print("after:"+str(flatData.count())+"行")
print("first line"+flatData.first())
flatData.saveAsTextFile("./resultFlatMap")
複製代碼
運行任意程序,獲得相同結果github
before:86400行
first line:2015-08-24_00:00:00 55311 buy
after:259200
first line:2015-08-24_00:00:00
複製代碼
查看文件apache
很顯然每一行都按照空格拆分紅了三行,所以總行數是拆分前的三倍,第一行的內容只剩下原第一行的第一個數據,時間。這樣flatMap的做用就很明顯了。api
用一樣的方法來展現map操做,與flatMap不一樣的是,map一般是一對一,即輸入一個,對應輸出一個。可是輸出的結果能夠是一個元組,一個元組則可能包含多個數據,可是一個元組是一個總體,所以算是一個元素。這裏注意到在輸出的結果是元組時,scala和python可以很正常處理,而在java中則有一點不一樣。bash
import org.apache.spark.{SparkConf, SparkContext}
object SparkMap {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("SparkFlatMap")
val sc = new SparkContext(conf)
val textData = sc.textFile("./uv.txt")
//獲得一個最後一個操做值,前面的時間和次數捨棄
val mapData1 = textData.map(line => line.split(" ")(2))
println(mapData1.count())
println(mapData1.first())
mapData1.saveAsTextFile("./resultMapScala")
//獲得一個最後兩個值,前面的時間捨棄
val mapData2 = textData.map(line => (line.split(" ")(1),line.split(" ")(2)))
println(mapData2.count())
println(mapData2.first())
//將全部值存到元組中去
val mapData3 = textData.map(line => (line.split(" ")(1),line.split(" ")(1),line.split(" ")(2)))
println(mapData3.count())
println(mapData3.first())
}
}
複製代碼
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.codehaus.janino.Java;
import scala.Tuple2;
import scala.Tuple3;
public class SparkMapJava {
public static void main(String[] args){
SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkFlatMapJava");
JavaSparkContext sc = new JavaSparkContext(conf);
//java實現
mapJava(sc);
//java8實現
mapJava8(sc);
}
public static void mapJava(JavaSparkContext sc){
JavaRDD<String> txtData = sc.textFile("./uv.txt");
//保留最後一個值
JavaRDD<String> mapData1 = txtData.map(new Function<String, String>() {
@Override
public String call(String s) throws Exception {
return s.split(" ")[2];
}
});
System.out.println(mapData1.count());
System.out.println(mapData1.first());
//保留最後兩個值
JavaRDD<Tuple2<String,String>> mapData2 = txtData.map(new Function<String, Tuple2<String,String>>() {
@Override
public Tuple2<String,String> call(String s) throws Exception {
return new Tuple2<>(s.split(" ")[1],s.split(" ")[2]);
}
});
System.out.println(mapData2.count());
System.out.println(mapData2.first());
//保留最後三個值
JavaRDD<Tuple3<String,String,String>> mapData3 = txtData.map(new Function<String, Tuple3<String,String,String>>() {
@Override
public Tuple3<String,String,String> call(String s) throws Exception {
return new Tuple3<>(s.split(" ")[0],s.split(" ")[1],s.split(" ")[2]);
}
});
System.out.println(mapData2.count());
System.out.println(mapData2.first());
}
public static void mapJava8(JavaSparkContext sc){
JavaRDD<String> mapData1 = sc.textFile("./uv.txt").map(line -> line.split(" ")[2]);
System.out.println(mapData1.count());
System.out.println(mapData1.first());
JavaRDD<Tuple2<String,String>> mapData2 = sc.textFile("./uv.txt").map(line -> new Tuple2<String, String>(line.split(" ")[1],line.split(" ")[2]));
System.out.println(mapData2.count());
System.out.println(mapData2.first());
JavaRDD<Tuple3<String,String,String>> mapData3 = sc.textFile("./uv.txt").map(line -> new Tuple3<String, String, String>(line.split(" ")[0],line.split(" ")[1],line.split(" ")[2]));
System.out.println(mapData3.count());
System.out.println(mapData3.first());
}
}
複製代碼
from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local").setAppName("FlatMapPython")
sc = SparkContext(conf=conf)
textData = sc.textFile("./uv.txt")
mapData1 = textData.map(lambda line : line.split(" ")[2])
print(mapData1.count())
print(mapData1.first())
mapData2 = textData.map(lambda line : (line.split(" ")[1],line.split(" ")[2]))
print(mapData2.count())
print(mapData2.first())
mapData3 = textData.map(lambda line : (line.split(" ")[0],line.split(" ")[1],line.split(" ")[2]))
print(mapData3.count())
print(mapData3.first())
複製代碼
運行任意程序,獲得相同結果ide
86400
buy
86400
(55311,buy)
86400
(55311,55311,buy)
複製代碼
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
public class SparkMapToPair {
public static void main(String[] args){
SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkFlatMapJava");
JavaSparkContext sc = new JavaSparkContext(conf);
mapToPairJava(sc);
mapToPairJava8(sc);
}
public static void mapToPairJava(JavaSparkContext sc){
JavaPairRDD<String,String> pairRDD = sc.textFile("./uv.txt").mapToPair(new PairFunction<String, String, String>() {
@Override
public Tuple2<String, String> call(String s) throws Exception {
return new Tuple2<>(s.split(" ")[1],s.split(" ")[2]);
}
});
System.out.println(pairRDD.count());
System.out.println(pairRDD.first());
}
public static void mapToPairJava8(JavaSparkContext sc){
JavaPairRDD<String,String> pairRDD = sc.textFile("./uv.txt").mapToPair(line -> new Tuple2<>(line.split(" ")[1],line.split(" ")[2]));
System.out.println(pairRDD.count());
System.out.println(pairRDD.first());
}
}
複製代碼
運行獲得結果post
86400
(55311,buy)
複製代碼
顯然咱們發現這個結果,和用map處理保留後兩個的結果是一致的。靈活使用map、flatMap、mapToPair將很是重要,後面還將有運用多種操做去處理複雜的數據。以上全部程序的代碼都可以在GitHub上下載ui