Flink 1.7.2 dataset transformation 示例
源碼
概述
- Flink transformation示例
- map,flatMap,filter,reduce,groupBy reduceGroup combineGroup Aggregate(sum,max,min)
- distinct join join funtion leftOuterJoin rightOuterJoin fullOuterJoin union first coGroup cross
transformation
map
- 對集合元素,進行一一遍歷處理
- 示例功能:給集合中的每一一行,都拼接字符串
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.map
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
object Run {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements("c a b d a c","d c a b c d")
val dataSet2 = dataSet.map(_.toUpperCase + "字符串鏈接")
dataSet2.print()
}
}
C A B D A C字符串鏈接
D C A B C D字符串鏈接
flatMap
- 對集合元素,進行一一遍歷處理,並把子集合中的數據拉到一個集合中
- 示例功能:把行進行拆分後,再把不一樣的行拆分以後的元素,彙總到一個集合中
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.flatmap
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
object Run {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements("c a b d a c","d c a b c d")
val dataSet2 = dataSet.flatMap(_.toUpperCase().split(" "))
dataSet2.print()
}
}
C
A
B
D
A
C
D
C
A
B
C
D
filter
- 對集合元素,進行一一遍歷處理,只過濾知足條件的元素
- 示例功能:過濾空格數據
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.filter
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
/**
* filter 過濾器,對數據進行過濾處理
*/
object Run {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements("c a b d a c","d c a b c d")
val dataSet2 = dataSet.flatMap(_.toUpperCase().split(" ")).filter(_.nonEmpty)
dataSet2.print()
}
}
C
A
B
D
A
C
D
C
A
B
C
D
reduce
- 對集合中全部元素,兩兩之間進行reduce函數表達式的計算
- 示例功能:統計全部數據的和
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.map
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.reduce
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
/**
* 至關於進行全部元素的累加操做,求和操做
*/
object Run {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements(3,5,8,9)
// 3 + 5 + 8 + 9
val dataSet2 = dataSet.reduce((a,b) => {
println(s"${a} + ${b} = ${a +b}")
a + b
})
dataSet2.print()
}
}
3 + 5 = 8
8 + 8 = 16
16 + 9 = 25
25
reduce (先groupBy)
- 對集合中全部元素,按指定的key分組,按組執行reduce
- 示例功能:按key分組統計全部數據的和
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.reduce
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
/**
* 至關於按key進行分組,而後對組內的元素進行的累加操做,求和操做
*/
object ReduceGroupRun2 {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements(("a",1),("b",1),("c",1),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))
/**
* (a,1)
* (b,1)
* (c,1)
* (a,1)
* (c,1)
* (d,1)
* (f,1)
* (g,1)
*/
val dataSet2 = dataSet.groupBy(0).reduce((x,y) => {
(x._1,x._2 + y._2)
}
)
dataSet2.print()
}
}
(d,1)
(a,2)
(f,2)
(b,1)
(c,2)
(g,1)
groupBy (class Fields)
- 對集合中全部元素,按用例類中的屬性,進行分組
- 示例功能:按key分組統計全部數據的和
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.groupByClassFields
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
/**
* 至關於按key進行分組,而後對組內的元素進行的累加操做,求和操做
*/
object ReduceGroupRun {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements("a","b","c","a","c","d","f","g","f")
/**
* (a,1)
* (b,1)
* (c,1)
* (a,1)
* (c,1)
* (d,1)
* (f,1)
* (g,1)
*/
val dataSet2 = dataSet.map(WordCount(_,1)).groupBy("word").reduce((x,y) => WordCount(x.word, x.count + y.count))
dataSet2.print()
}
case class WordCount(word:String,count:Int)
}
WordCount(d,1)
WordCount(a,2)
WordCount(f,2)
WordCount(b,1)
WordCount(c,2)
WordCount(g,1)
groupBy (key Selector)
- 對集合中全部元素,按key 選擇器進行分組
- 示例功能:按key分組統計全部數據的和
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.groupByKeySelector
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
/**
* 至關於按key進行分組,而後對組內的元素進行的累加操做,求和操做
*/
object ReduceGroupRun {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements("a","b","c","a","c","d","f","g","f")
/**
* (a,1)
* (b,1)
* (c,1)
* (a,1)
* (c,1)
* (d,1)
* (f,1)
* (g,1)
*/
val dataSet2 = dataSet.map((_,1)).groupBy(_._1).reduce((x,y) => (x._1,x._2 +y._2))
dataSet2.print()
}
}
WordCount(d,1)
WordCount(a,2)
WordCount(f,2)
WordCount(b,1)
WordCount(c,2)
WordCount(g,1)
reduceGroup
- 對集合中全部元素,按指定的key分組,把相同key的元素,作爲參數,調用reduceGroup()函數
- 示例功能:按key分組統計全部數據的和
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.reduceGroup
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.util.Collector
/**
* 相同的key的元素,都一次作爲參數傳進來了
*/
object ReduceGroupRun {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val dataSet = env.fromElements("a","a","c","b","a")
/**
* 中間數據
* (a,1)
* (a,1)
* (c,1)
* (b,1)
* (a,1)
*/
val result = dataSet.map((_,1)).groupBy(0).reduceGroup(
(in, out: Collector[(String,Int)]) =>{
var count = 0 ;
var word = "";
while (in.hasNext){
val next = in.next()
word = next._1
count = count + next._2
}
out.collect((word,count))
}
)
result.print()
}
}
(a,3)
(b,1)
(c,1)
combineGroup
- 對集合中全部元素,按指定的key分組,把相同key的元素,作爲參數,調用combineGroup()函數,會在本地進行合併
- 示例功能:按key分組統計全部數據的和
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.combineGroup
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.util.Collector
/**
* 相同的key的元素,都一次作爲參數傳進來了
*/
object Run {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val dataSet = env.fromElements("a","a","c","b","a")
/**
* 中間數據
* (a,1)
* (a,1)
* (c,1)
* (b,1)
* (a,1)
*/
val result = dataSet.map((_,1)).groupBy(0).combineGroup(
(in, out: Collector[(String,Int)]) =>{
var count = 0 ;
var word = "";
while (in.hasNext){
val next = in.next()
word = next._1
count = count + next._2
}
out.collect((word,count))
}
)
result.print()
}
}
(a,3)
(b,1)
(c,1)
Aggregate sum
- 按key分組 對Tuple2(String,Int) 中value進行求和操做
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.sum
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
/**
* 至關於按key進行分組,而後對組內的元素進行的累加操做,求和操做
*/
object Run {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))
/**
* (a,1)
* (b,1)
* (c,1)
* (a,1)
* (c,1)
* (d,1)
* (f,1)
* (g,1)
*/
val dataSet2 = dataSet.sum(1)
dataSet2.print()
}
}
(f,15)
Aggregate max
- 按key分組 對Tuple2(String,Int) 中value進行求最大值操做
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.max
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
/**
* 至關於按key進行分組,而後對組內的元素進行的累加操做,求和操做
*/
object Run {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))
/**
* (a,1)
* (b,1)
* (c,1)
* (a,1)
* (c,1)
* (d,1)
* (f,1)
* (g,1)
*/
val dataSet2 = dataSet.max(1)
dataSet2.print()
}
}
(f,5)
Aggregate min
- 按key分組 對Tuple2(String,Int) 中value進行求最小值操做
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.min
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
/**
* 至關於按key進行分組,而後對組內的元素進行的累加操做,求和操做
*/
object Run {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))
/**
* (a,1)
* (b,1)
* (c,1)
* (a,1)
* (c,1)
* (d,1)
* (f,1)
* (g,1)
*/
val dataSet2 = dataSet.min(1)
dataSet2.print()
}
}
(f,1)
Aggregate sum (groupBy)
- 按key分組 對Tuple2(String,Int) 中的全部元素進行求和操做
- 示例功能:按key分組統計全部數據的和
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.sum
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
/**
* 至關於按key進行分組,而後對組內的元素進行的累加操做,求和操做
*/
object Run {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements(("a",1),("b",1),("c",1),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))
/**
* (a,1)
* (b,1)
* (c,1)
* (a,1)
* (c,1)
* (d,1)
* (f,1)
* (g,1)
*/
val dataSet2 = dataSet.groupBy(0).sum(1)
dataSet2.print()
}
}
(d,1)
(a,2)
(f,2)
(b,1)
(c,2)
(g,1)
Aggregate max (groupBy) 等於 maxBy
- 按key分組 對Tuple2(String,Int) 中value 進行求最大值
- 示例功能:按key分組統計最大值
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.max
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
/**
* 至關於按key進行分組,而後對組內的元素進行的累加操做,求和操做
*/
object Run {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements(("a",2),("b",1),("c",4),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))
/**
* (a,1)
* (b,1)
* (c,1)
* (a,1)
* (c,1)
* (d,1)
* (f,1)
* (g,1)
*/
val dataSet2 = dataSet.groupBy(0).max(1)
dataSet2.print()
}
}
(d,1)
(a,2)
(f,1)
(b,1)
(c,4)
(g,1)
Aggregate min (groupBy) 等於minBy
- 按key分組 對Tuple2(String,Int) 中value 進行求最小值
- 示例功能:按key分組統計最小值
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.max
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
/**
* 至關於按key進行分組,而後對組內的元素進行的累加操做,求和操做
*/
object Run {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements(("a",2),("b",1),("c",4),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))
/**
* (a,1)
* (b,1)
* (c,1)
* (a,1)
* (c,1)
* (d,1)
* (f,1)
* (g,1)
*/
val dataSet2 = dataSet.groupBy(0).min(1)
dataSet2.print()
}
}
(d,1)
(a,1)
(f,1)
(b,1)
(c,1)
(g,1)
distinct 去重
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.distinct
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
/**
* 至關於按key進行分組,而後對組內的元素進行的累加操做,求和操做
*/
object Run {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))
/**
* (a,1)
* (b,1)
* (c,1)
* (a,1)
* (c,1)
* (d,1)
* (f,1)
* (g,1)
*/
val dataSet2 = dataSet.distinct(1)
dataSet2.print()
}
}
(a,3)
(b,1)
(c,5)
join
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.join
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
object Run {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))
val dataSet2 = env.fromElements(("d",1),("f",1),("g",1),("f",1))
//全外鏈接
val dataSet3 = dataSet.join(dataSet2).where(0).equalTo(0)
dataSet3.print()
}
}
((d,1),(d,1))
((f,1),(f,1))
((f,1),(f,1))
((f,1),(f,1))
((f,1),(f,1))
((g,1),(g,1))
join (Function)
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.joinFunction
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
object Run {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",2),("g",5))
val dataSet2 = env.fromElements(("g",1),("f",1))
//全外鏈接
val dataSet3 = dataSet.join(dataSet2).where(0).equalTo(0){
(x,y) => (x._1,x._2+ y._2)
}
dataSet3.print()
}
}
(f,3)
(g,6)
leftOuterJoin
- 左外鏈接,左邊的Dataset中的每個元素,去鏈接右邊的元素
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.leftOuterJoin
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
object Run {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",2),("g",5))
val dataSet2 = env.fromElements(("g",1),("f",1))
//全外鏈接
val dataSet3 = dataSet.leftOuterJoin(dataSet2).where(0).equalTo(0){
(x,y) => {
var count = 0;
if(y != null ){
count = y._2
}
(x._1,x._2+ count)
}
}
dataSet3.print()
}
}
(d,1)
(a,3)
(a,1)
(f,3)
(b,1)
(c,5)
(c,1)
(g,6)
rightOuterJoin
- 右外鏈接,左邊的Dataset中的每個元素,去鏈接左邊的元素
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.rightOuterJoin
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
object Run {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",2),("g",5))
val dataSet2 = env.fromElements(("g",1),("f",1))
//全外鏈接
val dataSet3 = dataSet.rightOuterJoin(dataSet2).where(0).equalTo(0){
(x,y) => {
var count = 0;
if(x != null ){
count = x._2
}
(x._1,y._2 + count)
}
}
dataSet3.print()
}
}
(f,2)
(g,2)
fullOuterJoin
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.fullOuterJoin
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
object Run {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",2),("g",5))
val dataSet2 = env.fromElements(("g",1),("f",1))
//全外鏈接
val dataSet3 = dataSet.fullOuterJoin(dataSet2).where(0).equalTo(0){
(x,y) => {
var countY = 0;
if(y != null ){
countY = y._2
}
var countX = 0;
if(x != null ){
countX = x._2
}
(x._1,countX + countY)
}
}
dataSet3.print()
}
}
(f,2)
(g,2)
union
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.union
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
object Run {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements(("a",1),("g",1),("f",1))
val dataSet2 = env.fromElements(("d",1),("f",1),("g",1),("f",1))
//全外鏈接
val dataSet3 = dataSet.union(dataSet2)
dataSet3.print()
}
}
(a,1)
(d,1)
(g,1)
(f,1)
(f,1)
(g,1)
(f,1)
first n
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.first
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
object Run {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))
//全外鏈接
val dataSet3 = dataSet.first(3)
dataSet3.print()
}
}
(a,3)
(b,1)
(c,5)
coGroup
- 至關於,取出兩個數據集的全部去重的key,而後,再把第一個DataSet中的這個key的全部元素放到可迭代對象中,再把第二個DataSet中的這個key的全部元素放到可迭代對象中
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.cogroup
import java.lang
import org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.util.Collector
object Run {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements(("a",1),("g",1),("a",1))
val dataSet2 = env.fromElements(("a",1),("f",1))
//全外鏈接
val dataSet3 = dataSet.coGroup(dataSet2).where(0).equalTo(0)
{
new CoGroupFunction[(String,Int),(String,Int), Collector[(String,Int)]] {
override def coGroup(first: lang.Iterable[(String, Int)], second: lang.Iterable[(String, Int)], out: Collector[Collector[(String, Int)]]): Unit = {
println("==============開始")
println("first")
println(first)
val iteratorFirst = first.iterator()
while (iteratorFirst.hasNext()){
println(iteratorFirst.next())
}
println("second")
println(second)
val iteratorSecond = second.iterator()
while (iteratorSecond.hasNext()){
println(iteratorSecond.next())
}
println("==============結束")
}
}
}
dataSet3.print()
}
}
==============開始
first
org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator@3500e7b0
(a,1)
(a,1)
second
org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator@41230ea2
(a,1)
==============結束
==============開始
first
org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator@14602d0a
(g,1)
second
[]
==============結束
==============開始
first
[]
second
org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator@2b0a15b5
(f,1)
==============結束
Process finished with exit code 0
cross
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.cross
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
object Run {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromElements(("a",1),("g",1),("f",1))
val dataSet2 = env.fromElements(("d",1),("f",1),("g",1),("f",1))
//全外鏈接
val dataSet3 = dataSet.cross(dataSet2)
dataSet3.print()
}
}
((a,1),(d,1))
((a,1),(f,1))
((a,1),(g,1))
((a,1),(f,1))
((g,1),(d,1))
((g,1),(f,1))
((g,1),(g,1))
((g,1),(f,1))
((f,1),(d,1))
((f,1),(f,1))
((f,1),(g,1))
((f,1),(f,1))