前面講了經常使用的DataSource的用法,DataSource實際上是把數據加載進來,加載進來以後就須要作Transformation操做了。html
Data transformations transform one or more DataSets into a new DataSet. Programs can combine multiple transformations into sophisticated assemblies.vue
數據轉化能夠將一個或多個DataSets轉化到一個新的DataSet。就是一個算法的綜合使用。java
新建一個Objectlinux
object DataSetTransformationApp { def main(args: Array[String]): Unit = { val environment = ExecutionEnvironment.getExecutionEnvironment } def mapFunction(env: ExecutionEnvironment): Unit = { val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10)) } }
這裏的數據源是一個1到10的list集合。Map的原理是:假設data數據集中有N個元素,將每個元素進行轉化:算法
data.map { x => x.toInt }
比如:y=f(x)spring
// 對data中的每個元素都去作一個+1操做 data.map((x:Int) => x + 1 ).print()
而後對每個元素都作一個+1操做。數據庫
簡單寫法:apache
若是這個裏面只有一個元素,就能夠直接寫成下面形式:api
data.map((x) => x + 1).print()
更簡潔的寫法:springboot
data.map(x => x + 1).print()
更簡潔的方法:
data.map(_ + 1).print()
輸出結果:
2 3 4 5 6 7 8 9 10 11
public static void main(String[] args) throws Exception { ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment(); mapFunction(executionEnvironment); } public static void mapFunction(ExecutionEnvironment executionEnvironment) throws Exception { List<String> list = new ArrayList<>(); for (int i = 1; i <= 10; i++) { list.add(i + ""); } DataSource<String> data = executionEnvironment.fromCollection(list); data.map(new MapFunction<String, Integer>() { public Integer map(String input) { return Integer.parseInt(input) + 1; } }).print(); }
由於咱們定義的List是一個String的泛型,所以MapFunction的泛型是<String, Integer>,第一個參數表示輸入的類型,第二個參數表示輸出是一個Integer類型。
將每一個元素執行+1操做,並取出大於5的元素。
def filterFunction(env: ExecutionEnvironment): Unit = { val data = env.fromCollection(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) data.map(_ + 1).filter(_ > 5).print() }
filter只會返回知足條件的記錄。
public static void filterFunction(ExecutionEnvironment env) throws Exception { List<Integer> list = new ArrayList<>(); for (int i = 1; i <= 10; i++) { list.add(i); } DataSource<Integer> data = env.fromCollection(list); data.map(new MapFunction<Integer, Integer>() { public Integer map(Integer input) { return input + 1; } }).filter(new FilterFunction<Integer>() { @Override public boolean filter(Integer input) throws Exception { return input > 5; } }).print(); }
map function 與 MapPartition function有什麼區別?
需求:DataSource 中有100個元素,把結果存儲在數據庫中
若是使用map function ,那麼實現方法以下:
// DataSource 中有100個元素,把結果存儲在數據庫中 def mapPartitionFunction(env: ExecutionEnvironment): Unit = { val students = new ListBuffer[String] for (i <- 1 to 100) { students.append("Student" + i) } val data = env.fromCollection(students) data.map(x=>{ // 每個元素要存儲到數據庫中去,確定須要先獲取到connection val connection = DBUtils.getConnection() println(connection + " ... ") // TODO .... 保存數據到DB DBUtils.returnConnection(connection) }).print() }
打印結果,將會打印100個獲取DBUtils.getConnection()的請求。若是數據量增多,顯然不停的獲取鏈接是不現實的。
所以MapPartition就應運而生了,轉換一個分區裏面的數據,也就是說一個分區中的數據調用一次。
所以要首先設置分區:
val data = env.fromCollection(students).setParallelism(4)
設置4個分區,也就是並行度,而後使用mapPartition來處理:
data.mapPartition(x => { val connection = DBUtils.getConnection() println(connection + " ... ") // TODO .... 保存數據到DB DBUtils.returnConnection(connection) x }).print()
那麼就會的到4次鏈接請求,每個分區獲取一個connection。
public static void mapPartitionFunction(ExecutionEnvironment env) throws Exception { List<String> list = new ArrayList<>(); for (int i = 1; i <= 100; i++) { list.add("student:" + i); } DataSource<String> data = env.fromCollection(list); /*data.map(new MapFunction<String, String>() { @Override public String map(String input) throws Exception { String connection = DBUtils.getConnection(); System.out.println("connection = [" + connection + "]"); DBUtils.returnConnection(connection); return input; } }).print();*/ data.mapPartition(new MapPartitionFunction<String, Object>() { @Override public void mapPartition(Iterable<String> values, Collector<Object> out) throws Exception { String connection = DBUtils.getConnection(); System.out.println("connection = [" + connection + "]"); DBUtils.returnConnection(connection); } }).print(); }
first表示獲取前幾個,groupBy表示分組,sortGroup表示分組內排序
def firstFunction(env:ExecutionEnvironment): Unit = { val info = ListBuffer[(Int, String)]() info.append((1, "hadoop")) info.append((1, "spark")) info.append((1, "flink")) info.append((2, "java")) info.append((2, "springboot")) info.append((3, "linux")) info.append((4, "vue")) val data = env.fromCollection(info) data.first(3).print() //輸出:(1,hadoop) //(1,spark) //(1,flink) data.groupBy(0).first(2).print()//根據第一個字段分組,每一個分組獲取前兩個數據 //(3,linux) //(1,hadoop) //(1,spark) //(2,java) //(2,springboot) //(4,vue) data.groupBy(0).sortGroup(1, Order.ASCENDING).first(2).print() //根據第一個字段分組,而後在分組內根據第二個字段升序排序,並取出前兩個數據 //輸出(3,linux) //(1,flink) //(1,hadoop) //(2,java) //(2,springboot) //(4,vue) }
public static void firstFunction(ExecutionEnvironment env) throws Exception { List<Tuple2<Integer, String>> info = new ArrayList<>(); info.add(new Tuple2<>(1, "hadoop")); info.add(new Tuple2<>(1, "spark")); info.add(new Tuple2<>(1, "flink")); info.add(new Tuple2<>(2, "java")); info.add(new Tuple2<>(2, "springboot")); info.add(new Tuple2<>(3, "linux")); info.add(new Tuple2<>(4, "vue")); DataSource<Tuple2<Integer, String>> data = env.fromCollection(info); data.first(3).print(); data.groupBy(0).first(2).print(); data.groupBy(0).sortGroup(1, Order.ASCENDING).first(2).print(); }
獲取一個元素,而後產生0個、1個或多個元素
def flatMapFunction(env: ExecutionEnvironment): Unit = { val info = ListBuffer[(String)]() info.append("hadoop,spark"); info.append("hadoop,flink"); info.append("flink,flink"); val data = env.fromCollection(info) data.flatMap(_.split(",")).print() }
輸出:
hadoop spark hadoop flink flink flink
FlatMap將每一個元素都用逗號分割,而後變成多個。
經典例子:
data.flatMap(_.split(",")).map((_,1)).groupBy(0).sum(1).print()
將每一個元素用逗號分割,而後每一個元素作map,而後根據第一個字段分組,而後根據第二個字段求和。
輸出結果以下:
(hadoop,2) (flink,3) (spark,1)
一樣實現一個經典案例wordcount
public static void flatMapFunction(ExecutionEnvironment env) throws Exception { List<String> info = new ArrayList<>(); info.add("hadoop,spark"); info.add("hadoop,flink"); info.add("flink,flink"); DataSource<String> data = env.fromCollection(info); data.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String input, Collector<String> out) throws Exception { String[] splits = input.split(","); for(String split: splits) { //發送出去 out.collect(split); } } }).map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { return new Tuple2<>(value,1); } }).groupBy(0).sum(1).print(); }
去重操做
def distinctFunction(env: ExecutionEnvironment): Unit = { val info = ListBuffer[(String)]() info.append("hadoop,spark"); info.append("hadoop,flink"); info.append("flink,flink"); val data = env.fromCollection(info) data.flatMap(_.split(",")).distinct().print() }
這樣就將每個元素都作了去重操做。輸出以下:
hadoop flink spark
public static void distinctFunction(ExecutionEnvironment env) throws Exception { List<String> info = new ArrayList<>(); info.add("hadoop,spark"); info.add("hadoop,flink"); info.add("flink,flink"); DataSource<String> data = env.fromCollection(info); data.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String input, Collector<String> out) throws Exception { String[] splits = input.split(","); for(String split: splits) { //發送出去 out.collect(split); } } }).distinct().print(); }
Joins two data sets by creating all pairs of elements that are equal on their keys. Optionally uses a JoinFunction to turn the pair of elements into a single element, or a FlatJoinFunction to turn the pair of elements into arbitrarily many (including none) elements. See the keys section to learn how to define join keys.
result = input1.join(input2) .where(0) // key of the first input (tuple field 0) .equalTo(1); // key of the second input (tuple field 1)
表示第一個tuple input1中的第0個字段,與第二個tuple input2中的第一個字段進行join。
def joinFunction(env: ExecutionEnvironment): Unit = { val info1 = ListBuffer[(Int, String)]() //編號 名字 info1.append((1, "hadoop")) info1.append((2, "spark")) info1.append((3, "flink")) info1.append((4, "java")) val info2 = ListBuffer[(Int, String)]() //編號 城市 info2.append((1, "北京")) info2.append((2, "上海")) info2.append((3, "深圳")) info2.append((5, "廣州")) val data1 = env.fromCollection(info1) val data2 = env.fromCollection(info2) data1.join(data2).where(0).equalTo(0).apply((first, second)=>{ (first._1, first._2, second._2) }).print() }
輸出結果以下:
(3,flink,深圳) (1,hadoop,北京) (2,spark,上海)
public static void joinFunction(ExecutionEnvironment env) throws Exception { List<Tuple2<Integer, String>> info1 = new ArrayList<>(); //編號 名字 info1.add(new Tuple2<>(1, "hadoop")); info1.add(new Tuple2<>(2, "spark")); info1.add(new Tuple2<>(3, "flink")); info1.add(new Tuple2<>(4, "java")); List<Tuple2<Integer, String>> info2 = new ArrayList<>(); //編號 城市 info2.add(new Tuple2<>(1, "北京")); info2.add(new Tuple2<>(2, "上海")); info2.add(new Tuple2<>(3, "深圳")); info2.add(new Tuple2<>(5, "廣州")); DataSource<Tuple2<Integer, String>> data1 = env.fromCollection(info1); DataSource<Tuple2<Integer, String>> data2 = env.fromCollection(info2); data1.join(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() { @Override public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception { return new Tuple3<Integer, String, String>(first.f0, first.f1,second.f1); } }).print(); }
Tuple2<Integer, String>, Tuple2<Integer, String>表示兩個輸入的集合,Tuple3<Integer, String, String>>表示輸出的Tuple3
上面講的join是內鏈接,這個OuterJoin是外鏈接,包括左外鏈接,右外鏈接,全鏈接在兩個數據集上。
def outJoinFunction(env: ExecutionEnvironment): Unit = { val info1 = ListBuffer[(Int, String)]() //編號 名字 info1.append((1, "hadoop")) info1.append((2, "spark")) info1.append((3, "flink")) info1.append((4, "java")) val info2 = ListBuffer[(Int, String)]() //編號 城市 info2.append((1, "北京")) info2.append((2, "上海")) info2.append((3, "深圳")) info2.append((5, "廣州")) val data1 = env.fromCollection(info1) val data2 = env.fromCollection(info2) data1.leftOuterJoin(data2).where(0).equalTo(0).apply((first, second) => { if (second == null) { (first._1, first._2, "-") }else { (first._1, first._2, second._2) } }).print() //左外鏈接 把左邊的全部數據展現出來 }
左外鏈接,當左邊的數據在右邊沒有對應的數據時,須要進行處理,不然會出現空指針異常。輸出以下:
(3,flink,深圳) (1,hadoop,北京) (2,spark,上海) (4,java,-)
右外鏈接:
data1.rightOuterJoin(data2).where(0).equalTo(0).apply((first, second) => { if (first == null) { (second._1, "-", second._2) }else { (first._1, first._2, second._2) } }).print()
右外鏈接,輸出:
(3,flink,深圳) (1,hadoop,北京) (5,-,廣州) (2,spark,上海)
全鏈接:
data1.fullOuterJoin(data2).where(0).equalTo(0).apply((first, second) => { if (first == null) { (second._1, "-", second._2) }else if (second == null){ (second._1, "-", second._2) } else { (first._1, first._2, second._2) } }).print()
(3,flink,深圳) (1,hadoop,北京) (5,-,廣州) (2,spark,上海) (4,java,-)
左外鏈接:
public static void outjoinFunction(ExecutionEnvironment env) throws Exception { List<Tuple2<Integer, String>> info1 = new ArrayList<>(); //編號 名字 info1.add(new Tuple2<>(1, "hadoop")); info1.add(new Tuple2<>(2, "spark")); info1.add(new Tuple2<>(3, "flink")); info1.add(new Tuple2<>(4, "java")); List<Tuple2<Integer, String>> info2 = new ArrayList<>(); //編號 城市 info2.add(new Tuple2<>(1, "北京")); info2.add(new Tuple2<>(2, "上海")); info2.add(new Tuple2<>(3, "深圳")); info2.add(new Tuple2<>(5, "廣州")); DataSource<Tuple2<Integer, String>> data1 = env.fromCollection(info1); DataSource<Tuple2<Integer, String>> data2 = env.fromCollection(info2); data1.leftOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() { @Override public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception { if(second == null) { return new Tuple3<Integer, String, String>(first.f0, first.f1, "-"); } return new Tuple3<Integer, String, String>(first.f0, first.f1,second.f1); } }).print(); }
右外鏈接:
data1.rightOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() { @Override public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception { if (first == null) { return new Tuple3<Integer, String, String>(second.f0, "-", second.f1); } return new Tuple3<Integer, String, String>(first.f0, first.f1, second.f1); } }).print();
全鏈接:
data1.fullOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() { @Override public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception { if (first == null) { return new Tuple3<Integer, String, String>(second.f0, "-", second.f1); } else if (second == null) { return new Tuple3<Integer, String, String>(first.f0, first.f1, "-"); } return new Tuple3<Integer, String, String>(first.f0, first.f1, second.f1); } }).print();
笛卡爾積,左邊與右邊交叉處理
def crossFunction(env: ExecutionEnvironment): Unit = { val info1 = List("喬峯", "慕容復") val info2 = List(3,1,0) val data1 = env.fromCollection(info1) val data2 = env.fromCollection(info2) data1.cross(data2).print() }
輸出:
(喬峯,3) (喬峯,1) (喬峯,0) (慕容復,3) (慕容復,1) (慕容復,0)
public static void crossFunction(ExecutionEnvironment env) throws Exception { List<String> info1 = new ArrayList<>(); info1.add("喬峯"); info1.add("慕容復"); List<String> info2 = new ArrayList<>(); info2.add("3"); info2.add("1"); info2.add("0"); DataSource<String> data1 = env.fromCollection(info1); DataSource<String> data2 = env.fromCollection(info2); data1.cross(data2).print(); }
廣播變量是一組數據,這些數據能夠用來清洗數據,廣播變量常駐在內存中,因此數據量必定不能太大