本篇博客將介紹Spark RDD的Map系算子的基本用法。java
一、mapapache
map將RDD的元素一個個傳入call方法,通過call方法的計算以後,逐個返回,生成新的RDD,計算以後,記錄數不會縮減。示例代碼,將每一個數字加10以後再打印出來, 代碼以下api
import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.VoidFunction; public class Map { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("spark map").setMaster("local[*]"); JavaSparkContext javaSparkContext = new JavaSparkContext(conf); JavaRDD<Integer> listRDD = javaSparkContext.parallelize(Arrays.asList(1, 2, 3, 4)); JavaRDD<Integer> numRDD = listRDD.map(new Function<Integer, Integer>() { @Override public Integer call(Integer num) throws Exception { return num + 10; } }); numRDD.foreach(new VoidFunction<Integer>() { @Override public void call(Integer num) throws Exception { System.out.println(num); } }); } }
執行結果:ide
二、flatMap函數
flatMap和map的處理方式同樣,都是把原RDD的元素逐個傳入進行計算,可是與之不一樣的是,flatMap返回值是一個Iterator,也就是會一輩子多,超生spa
import java.util.Arrays; import java.util.Iterator; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.VoidFunction; public class FlatMap { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("spark map").setMaster("local[*]"); JavaSparkContext javaSparkContext = new JavaSparkContext(conf); JavaRDD<String> listRDD = javaSparkContext .parallelize(Arrays.asList("hello wold", "hello java", "hello spark")); JavaRDD<String> rdd = listRDD.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; @Override public Iterator<String> call(String input) throws Exception { return Arrays.asList(input.split(" ")).iterator(); } }); rdd.foreach(new VoidFunction<String>() { private static final long serialVersionUID = 1L; @Override public void call(String num) throws Exception { System.out.println(num); } }); } }
執行結果:3d
三、mapPartitionscode
mapPartitions一次性將整個分區的數據傳入函數進行計算,適用於一次性聚會整個分區的場景blog
public class MapPartitions { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("spark map").setMaster("local[*]"); JavaSparkContext javaSparkContext = new JavaSparkContext(conf); JavaRDD<String> listRDD = javaSparkContext.parallelize(Arrays.asList("hello", "java", "wold", "spark"), 2); /** * mapPartitions回調的接口也是FlatMapFunction,FlatMapFunction的第一個泛型是Iterator表示傳入的數據, * 第二個泛型表示返回數據的類型 * * mapPartitions傳入FlatMapFunction接口處理的數據是一個分區的數據,因此,若是一個分區數據過大,會致使內存溢出 * */ JavaRDD<String> javaRDD = listRDD.mapPartitions(new FlatMapFunction<Iterator<String>, String>() { int i = 0; @Override public Iterator<String> call(Iterator<String> input) throws Exception { List<String> list = new ArrayList<String>(); while (input.hasNext()) { list.add(input.next() + i); ++i; } return list.iterator(); } }); javaRDD.foreach(new VoidFunction<String>() { @Override public void call(String t) throws Exception { System.out.println(t); } }); } }
運行結果:接口
上面的運算結果,後面的尾標只有0和1,說明FlatMapFunction被調用了兩次,與MapPartitions的功能吻合。
四、mapPartitionsWithIndex
mapPartitionsWithIndex和mapPartitions同樣,一次性傳入整個分區的數據進行處理,可是不一樣的是,這裏會傳入分區編號進來
public class mapPartitionsWithIndex { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("spark map").setMaster("local[*]"); JavaSparkContext javaSparkContext = new JavaSparkContext(conf); JavaRDD<String> listRDD = javaSparkContext.parallelize(Arrays.asList("hello", "java", "wold", "spark"), 2); /** *和mapPartitions同樣,一次性傳入整個分區的數據進行處理,可是不一樣的是,這裏會傳入分區編號進來 * */ JavaRDD<String> javaRDD = listRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() { @Override public Iterator<String> call(Integer v1, Iterator<String> v2) throws Exception { List<String> list = new ArrayList<String>(); while (v2.hasNext()) { list.add(v2.next() + "====分區編號:"+v1); } return list.iterator(); } },true); javaRDD.foreach(new VoidFunction<String>() { @Override public void call(String t) throws Exception { System.out.println(t); } }); } }
執行結果: