版權聲明:本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。期待加入IOT時代最具戰鬥力的團隊。QQ郵箱地址:1120746959@qq.com,若有任何學術交流,可隨時聯繫。html
source是程序的數據源輸入,你能夠經過StreamExecutionEnvironment.addSource(sourceFunction)來爲你的程序添加一個source。java
flink提供了大量的已經實現好的source方法,能夠自定義source 經過實現sourceFunction接口來自定義無並行度的source。redis
1 使用並行度爲1的source
public class MyNoParalleSource implements SourceFunction<Long>{
private long count = 1L;
private boolean isRunning = true;
/**
* 主要的方法
* 啓動一個source
* 大部分狀況下,都須要在這個run方法中實現一個循環,這樣就能夠循環產生數據了
*
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext<Long> ctx) throws Exception {
while(isRunning){
ctx.collect(count);
count++;
//每秒產生一條數據
Thread.sleep(1000);
}
}
* 取消一個cancel的時候會調用的方法
@Override
public void cancel() {
isRunning = false;
}
}
2 Main方法執行
public class StreamingDemoWithMyNoPralalleSource {
public static void main(String[] args) throws Exception {
//獲取Flink的運行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//獲取數據源
DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1); //注意:針對此source,並行度只能設置爲1
DataStream<Long> num = text.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
System.out.println("接收到數據:" + value);
return value;
}
});
//每2秒鐘處理一次數據
DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);
//打印結果
sum.print().setParallelism(1);
String jobName = StreamingDemoWithMyNoPralalleSource.class.getSimpleName();
env.execute(jobName);
}
}
複製代碼
版權聲明:本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。QQ郵箱地址:1120746959@qq.com,若有任何學術交流,可隨時聯繫。數據庫
能夠經過實現ParallelSourceFunction接口或者繼承RichParallelSourceFunction來自定義有並行度的source。繼承RichParallelSourceFunction的那些SourceFunction意味着它們都是並行執行的而且可能有一些資源須要open/closeapache
public class MyParalleSource implements ParallelSourceFunction<Long> {
private long count = 1L;
private boolean isRunning = true;
/**
* 主要的方法
* 啓動一個source
* 大部分狀況下,都須要在這個run方法中實現一個循環,這樣就能夠循環產生數據了
*
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext<Long> ctx) throws Exception {
while(isRunning){
ctx.collect(count);
count++;
//每秒產生一條數據
Thread.sleep(1000);
}
}
/**
* 取消一個cancel的時候會調用的方法
*
*/
@Override
public void cancel() {
isRunning = false;
}
}
public class StreamingDemoWithMyPralalleSource {
public static void main(String[] args) throws Exception {
//獲取Flink的運行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//獲取數據源
DataStreamSource<Long> text = env.addSource(new MyParalleSource()).setParallelism(2);
DataStream<Long> num = text.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
System.out.println("接收到數據:" + value);
return value;
}
});
//每2秒鐘處理一次數據
DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);
//打印結果
sum.print().setParallelism(1);
String jobName = StreamingDemoWithMyPralalleSource.class.getSimpleName();
env.execute(jobName);
}
}
public class MyRichParalleSource extends RichParallelSourceFunction<Long> {
private long count = 1L;
private boolean isRunning = true;
/**
* 主要的方法
* 啓動一個source
* 大部分狀況下,都須要在這個run方法中實現一個循環,這樣就能夠循環產生數據了
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext<Long> ctx) throws Exception {
while(isRunning){
ctx.collect(count);
count++;
//每秒產生一條數據
Thread.sleep(1000);
}
}
/**
* 取消一個cancel的時候會調用的方法
*
*/
@Override
public void cancel() {
isRunning = false;
}
/**
* 這個方法只會在最開始的時候被調用一次
* 實現獲取連接的代碼
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("open.............");
super.open(parameters);
}
/**
* 實現關閉連接的代碼
* @throws Exception
*/
@Override
public void close() throws Exception {
super.close();
}
}
public class StreamingDemoWithMyRichPralalleSource {
public static void main(String[] args) throws Exception {
//獲取Flink的運行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//獲取數據源
DataStreamSource<Long> text = env.addSource(new MyRichParalleSource()).setParallelism(2);
DataStream<Long> num = text.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
System.out.println("接收到數據:" + value);
return value;
}
});
//每2秒鐘處理一次數據
DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);
//打印結果
sum.print().setParallelism(1);
String jobName = StreamingDemoWithMyRichPralalleSource.class.getSimpleName();
env.execute(jobName);
}
}
複製代碼
基於文件 readTextFile(path) 讀取文本文件,文件遵循TextInputFormat 讀取規則,逐行讀取並返回。api
基於socket socketTextStream從socker中讀取數據,元素能夠經過一個分隔符切開。數組
public class SocketDemoFullCount {
public static void main(String[] args) throws Exception{
//獲取須要的端口號
int port;
try {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
port = parameterTool.getInt("port");
}catch (Exception e){
System.err.println("No port set. use default port 9010--java");
port = 9010;
}
//獲取flink的運行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String hostname = "SparkMaster";
String delimiter = "\n";
//鏈接socket獲取輸入的數據
DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);
DataStream<Tuple2<Integer,Integer>> intData = text.map(new MapFunction<String, Tuple2<Integer,Integer>>() {
@Override
public Tuple2<Integer,Integer> map(String value) throws Exception {
return new Tuple2<>(1,Integer.parseInt(value));
}
});
intData.keyBy(0)
.timeWindow(Time.seconds(5))
.process(new ProcessWindowFunction<Tuple2<Integer,Integer>, String, Tuple, TimeWindow>() {
@Override
public void process(Tuple key, Context context, Iterable<Tuple2<Integer, Integer>> elements, Collector<String> out) throws Exception {
System.out.println("執行process");
long count = 0;
for(Tuple2<Integer,Integer> element: elements){
count++;
}
out.collect("window:"+context.window()+",count:"+count);
}
}).print();
//這一行代碼必定要實現,不然程序不執行
env.execute("Socket window count");
}
}
複製代碼
基於集合 fromCollection(Collection) 經過java 的collection集合建立一個數據流,集合中的全部元素必須是相同類型的。併發
public class StreamingFromCollection {
public static void main(String[] args) throws Exception {
//獲取Flink的運行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ArrayList<Integer> data = new ArrayList<>();
data.add(10);
data.add(15);
data.add(20);
//指定數據源
DataStreamSource<Integer> collectionData = env.fromCollection(data);
//通map對數據進行處理
DataStream<Integer> num = collectionData.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return value + 1;
}
});
//直接打印
num.print().setParallelism(1);
env.execute("StreamingFromCollection");
}
複製代碼
}app
自定義輸入 addSource 能夠實現讀取第三方數據源的數據 系統內置提供了一批connectors,鏈接器會提供對應的source支持【kafka】dom
map:輸入一個元素,而後返回一個元素,中間能夠作一些清洗轉換等操做
flatmap:輸入一個元素,能夠返回零個,一個或者多個元素
keyBy:根據指定的key進行分組,相同key的數據會進入同一個分區
dataStream.keyBy("someKey") // 指定對象中的 "someKey"字段做爲分組key
dataStream.keyBy(0) //指定Tuple中的第一個元素做爲分組key
注意:如下類型是沒法做爲key的
1:一個實體類對象,沒有重寫hashCode方法,而且依賴object的hashCode方法
2:一個任意形式的數組類型
3:基本數據類型,int,long
複製代碼
filter:過濾函數,對傳入的數據進行判斷,符合條件的數據會被留下。
public class StreamingDemoFilter {
public static void main(String[] args) throws Exception {
//獲取Flink的運行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//獲取數據源
DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:針對此source,並行度只能設置爲1
DataStream<Long> num = text.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
System.out.println("原始接收到數據:" + value);
return value;
}
});
//執行filter過濾,知足條件的數據會被留下
DataStream<Long> filterData = num.filter(new FilterFunction<Long>() {
//把全部的奇數過濾掉
@Override
public boolean filter(Long value) throws Exception {
return value % 2 == 0;
}
});
DataStream<Long> resultData = filterData.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
System.out.println("過濾以後的數據:" + value);
return value;
}
});
//每2秒鐘處理一次數據
DataStream<Long> sum = resultData.timeWindowAll(Time.seconds(2)).sum(0);
//打印結果
sum.print().setParallelism(1);
String jobName = StreamingDemoFilter.class.getSimpleName();
env.execute(jobName);
}
}
複製代碼
版權聲明:本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。QQ郵箱地址:1120746959@qq.com,若有任何學術交流,可隨時聯繫。
reduce:對數據進行聚合操做,結合當前元素和上一次reduce返回的值進行聚合操做,而後返回一個新的值
aggregations:sum(),min(),max()等
window:在後面單獨詳解
Union:合併多個流,新的流會包含全部流中的數據,可是union是一個限制,就是全部合併的流類型必須是一致的。
public class StreamingDemoUnion {
public static void main(String[] args) throws Exception {
//獲取Flink的運行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//獲取數據源
DataStreamSource<Long> text1 = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:針對此source,並行度只能設置爲1
DataStreamSource<Long> text2 = env.addSource(new MyNoParalleSource()).setParallelism(1);
//把text1和text2組裝到一塊兒
DataStream<Long> text = text1.union(text2);
DataStream<Long> num = text.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
System.out.println("原始接收到數據:" + value);
return value;
}
});
//每2秒鐘處理一次數據
DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);
//打印結果
sum.print().setParallelism(1);
String jobName = StreamingDemoUnion.class.getSimpleName();
env.execute(jobName);
}
複製代碼
}
Connect:和union相似,可是隻能鏈接兩個流,兩個流的數據類型能夠不一樣,會對兩個流中的數據應用不一樣的處理方法。
CoMap, CoFlatMap:在ConnectedStreams中須要使用這種函數,相似於map和flatmap
public class StreamingDemoConnect {
public static void main(String[] args) throws Exception {
//獲取Flink的運行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//獲取數據源
DataStreamSource<Long> text1 = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:針對此source,並行度只能設置爲1
DataStreamSource<Long> text2 = env.addSource(new MyNoParalleSource()).setParallelism(1);
SingleOutputStreamOperator<String> text2_str = text2.map(new MapFunction<Long, String>() {
@Override
public String map(Long value) throws Exception {
return "str_" + value;
}
});
ConnectedStreams<Long, String> connectStream = text1.connect(text2_str);
SingleOutputStreamOperator<Object> result = connectStream.map(new CoMapFunction<Long, String, Object>() {
@Override
public Object map1(Long value) throws Exception {
return value;
}
@Override
public Object map2(String value) throws Exception {
return value;
}
});
//打印結果
result.print().setParallelism(1);
String jobName = StreamingDemoConnect.class.getSimpleName();
env.execute(jobName);
}
}
複製代碼
Split:根據規則把一個數據流切分爲多個流:
Select:和split配合使用,選擇切分後的流
public class StreamingDemoSplit {
public static void main(String[] args) throws Exception {
//獲取Flink的運行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//獲取數據源
DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:針對此source,並行度只能設置爲1
//對流進行切分,按照數據的奇偶性進行區分
SplitStream<Long> splitStream = text.split(new OutputSelector<Long>() {
@Override
public Iterable<String> select(Long value) {
ArrayList<String> outPut = new ArrayList<>();
if (value % 2 == 0) {
outPut.add("even");//偶數
} else {
outPut.add("odd");//奇數
}
return outPut;
}
});
//選擇一個或者多個切分後的流
DataStream<Long> evenStream = splitStream.select("even");
DataStream<Long> oddStream = splitStream.select("odd");
DataStream<Long> moreStream = splitStream.select("odd","even");
//打印結果
moreStream.print().setParallelism(1);
String jobName = StreamingDemoSplit.class.getSimpleName();
env.execute(jobName);
}
}
複製代碼
Random partitioning:隨機分區
dataStream.shuffle()
Rebalancing:對數據集進行再平衡,重分區,消除數據傾斜
dataStream.rebalance()
Rescaling:若是上游操做有2個併發,而下游操做有4個併發,那麼上游的一個併發結果分配給下游的兩個併發操做,另外的一個併發結果分配給了下游的另外兩個併發操做。另外一方面,下游有兩個併發操做而上游又4個併發操做,那麼上游的其中兩個操做的結果分配給下游的一個併發操做而另外兩個併發操做的結果則分配給另一個併發操做。
Rescaling與Rebalancing的區別: Rebalancing會產生全量重分區,而Rescaling不會。
dataStream.rescale()
Custom partitioning:自定義分區須要實現Partitioner接口
dataStream.partitionCustom(partitioner, "someKey") 針對對象
dataStream.partitionCustom(partitioner, 0) 針對Tuple
public class MyPartition implements Partitioner<Long> {
@Override
public int partition(Long key, int numPartitions) {
System.out.println("分區總數:"+numPartitions);
if(key % 2 == 0){
return 0;
}else{
return 1;
}
}
}
public class SteamingDemoWithMyParitition {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataStreamSource<Long> text = env.addSource(new MyNoParalleSource());
//對數據進行轉換,把long類型轉成tuple1類型
DataStream<Tuple1<Long>> tupleData = text.map(new MapFunction<Long, Tuple1<Long>>() {
@Override
public Tuple1<Long> map(Long value) throws Exception {
return new Tuple1<>(value);
}
});
//分區以後的數據
DataStream<Tuple1<Long>> partitionData= tupleData.partitionCustom(new MyPartition(), 0);
DataStream<Long> result = partitionData.map(new MapFunction<Tuple1<Long>, Long>() {
@Override
public Long map(Tuple1<Long> value) throws Exception {
System.out.println("當前線程id:" + Thread.currentThread().getId() + ",value: " + value);
return value.getField(0);
}
});
result.print().setParallelism(1);
env.execute("SteamingDemoWithMyParitition");
}
}
複製代碼
Broadcasting:在後面單獨詳解
writeAsText():將元素以字符串形式逐行寫入,這些字符串經過調用每一個元素的toString()方法來獲取
print() / printToErr():打印每一個元素的toString()方法的值到標準輸出或者標準錯誤輸出流中
自定義輸出addSink【kafka、redis】
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
public class StreamingDemoToRedis {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> text = env.socketTextStream("hadoop100", 9000, "\n");
//對數據進行組裝,把string轉化爲tuple2<String,String>
DataStream<Tuple2<String, String>> l_wordsData = text.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String value) throws Exception {
return new Tuple2<>("l_words", value);
}
});
//建立redis的配置
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("SparkMaster").setPort(6379).build();
//建立redissink
RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());
l_wordsData.addSink(redisSink);
env.execute("StreamingDemoToRedis");
}
public static class MyRedisMapper implements RedisMapper<Tuple2<String, String>>{
//表示從接收的數據中獲取須要操做的redis key
@Override
public String getKeyFromData(Tuple2<String, String> data) {
return data.f0;
}
//表示從接收的數據中獲取須要操做的redis value
@Override
public String getValueFromData(Tuple2<String, String> data) {
return data.f1;
}
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.LPUSH);
}
}
}
複製代碼
基於文件 readTextFile(path)
public class BatchWordCountJava {
public static void main(String[] args) throws Exception{
String inputPath = "D:\\data\\file";
String outPath = "D:\\data\\result";
//獲取運行環境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//獲取文件中的內容
DataSource<String> text = env.readTextFile(inputPath);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).groupBy(0).sum(1);
counts.writeAsCsv(outPath,"\n"," ").setParallelism(1);
env.execute("batch word count");
}
public static class Tokenizer implements FlatMapFunction<String,Tuple2<String,Integer>>{
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] tokens = value.toLowerCase().split("\\W+");
for (String token: tokens) {
if(token.length()>0){
out.collect(new Tuple2<String, Integer>(token,1));
}
}
}
}
}
複製代碼
基於集合 fromCollection(Collection)
Map:輸入一個元素,而後返回一個元素,中間能夠作一些清洗轉換等操做
FlatMap:輸入一個元素,能夠返回零個,一個或者多個元素
MapPartition:相似map,一次處理一個分區的數據【若是在進行map處理的時候須要獲取第三方資源連接,建議使用MapPartition】
public class BatchDemoMapPartition {
public static void main(String[] args) throws Exception{
//獲取運行環境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<String> data = new ArrayList<>();
data.add("hello you");
data.add("hello me");
DataSource<String> text = env.fromCollection(data);
/*text.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
//獲取數據庫鏈接--注意,此時是每過來一條數據就獲取一次連接
//處理數據
//關閉鏈接
return value;
}
});*/
DataSet<String> mapPartitionData = text.mapPartition(new MapPartitionFunction<String, String>() {
@Override
public void mapPartition(Iterable<String> values, Collector<String> out) throws Exception {
//獲取數據庫鏈接--注意,此時是一個分區的數據獲取一次鏈接【優勢,每一個分區獲取一次連接】
//values中保存了一個分區的數據
//處理數據
Iterator<String> it = values.iterator();
while (it.hasNext()) {
String next = it.next();
String[] split = next.split("\\W+");
for (String word : split) {
out.collect(word);
}
}
//關閉連接
}
});
mapPartitionData.print();
}
}
複製代碼
Filter:過濾函數,對傳入的數據進行判斷,符合條件的數據會被留下
Reduce:對數據進行聚合操做,結合當前元素和上一次reduce返回的值進行聚合操做,而後返回一個新的值
Aggregate:sum、max、min等
Distinct:返回一個數據集中去重以後的元素,data.distinct()
public class BatchDemoDistinct {
public static void main(String[] args) throws Exception{
//獲取運行環境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<String> data = new ArrayList<>();
data.add("hello you");
data.add("hello me");
DataSource<String> text = env.fromCollection(data);
FlatMapOperator<String, String> flatMapData = text.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] split = value.toLowerCase().split("\\W+");
for (String word : split) {
System.out.println("單詞:"+word);
out.collect(word);
}
}
});
flatMapData.distinct()// 對數據進行總體去重
.print();
}
}
複製代碼
Join:內鏈接
public class BatchDemoJoin {
public static void main(String[] args) throws Exception{
//獲取運行環境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//tuple2<用戶id,用戶姓名>
ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<>();
data1.add(new Tuple2<>(1,"zs"));
data1.add(new Tuple2<>(2,"ls"));
data1.add(new Tuple2<>(3,"ww"));
//tuple2<用戶id,用戶所在城市>
ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<>();
data2.add(new Tuple2<>(1,"beijing"));
data2.add(new Tuple2<>(2,"shanghai"));
data2.add(new Tuple2<>(3,"guangzhou"));
DataSource<Tuple2<Integer, String>> text1 = env.fromCollection(data1);
DataSource<Tuple2<Integer, String>> text2 = env.fromCollection(data2);
text1.join(text2).where(0)//指定第一個數據集中須要進行比較的元素角標
.equalTo(0)//指定第二個數據集中須要進行比較的元素角標
.with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {
@Override
public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second)
throws Exception {
return new Tuple3<>(first.f0,first.f1,second.f1);
}
}).print();
//注意,這裏用map和上面使用的with最終效果是一致的。
/*text1.join(text2).where(0)//指定第一個數據集中須要進行比較的元素角標
.equalTo(0)//指定第二個數據集中須要進行比較的元素角標
.map(new MapFunction<Tuple2<Tuple2<Integer,String>,Tuple2<Integer,String>>, Tuple3<Integer,String,String>>() {
@Override
public Tuple3<Integer, String, String> map(Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>> value) throws Exception {
return new Tuple3<>(value.f0.f0,value.f0.f1,value.f1.f1);
}
}).print();*/
}
}
複製代碼
OuterJoin:外連接
public class BatchDemoOuterJoin {
public static void main(String[] args) throws Exception{
//獲取運行環境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//tuple2<用戶id,用戶姓名>
ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<>();
data1.add(new Tuple2<>(1,"zs"));
data1.add(new Tuple2<>(2,"ls"));
data1.add(new Tuple2<>(3,"ww"));
//tuple2<用戶id,用戶所在城市>
ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<>();
data2.add(new Tuple2<>(1,"beijing"));
data2.add(new Tuple2<>(2,"shanghai"));
data2.add(new Tuple2<>(4,"guangzhou"));
DataSource<Tuple2<Integer, String>> text1 = env.fromCollection(data1);
DataSource<Tuple2<Integer, String>> text2 = env.fromCollection(data2);
/**
* 左外鏈接
*
* 注意:second這個tuple中的元素可能爲null
*
*/
text1.leftOuterJoin(text2)
.where(0)
.equalTo(0)
.with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {
@Override
public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {
if(second==null){
return new Tuple3<>(first.f0,first.f1,"null");
}else{
return new Tuple3<>(first.f0,first.f1,second.f1);
}
}
}).print();
/**
* 右外鏈接
*
* 注意:first這個tuple中的數據可能爲null
*
*/
text1.rightOuterJoin(text2)
.where(0)
.equalTo(0)
.with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {
@Override
public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {
if(first==null){
return new Tuple3<>(second.f0,"null",second.f1);
}
return new Tuple3<>(first.f0,first.f1,second.f1);
}
}).print();
/**
* 全外鏈接
*
* 注意:first和second這兩個tuple都有可能爲null
*
*/
text1.fullOuterJoin(text2)
.where(0)
.equalTo(0)
.with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {
@Override
public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {
if(first==null){
return new Tuple3<>(second.f0,"null",second.f1);
}else if(second == null){
return new Tuple3<>(first.f0,first.f1,"null");
}else{
return new Tuple3<>(first.f0,first.f1,second.f1);
}
}
}).print();
}
}
複製代碼
Cross:獲取兩個數據集的笛卡爾積
public class BatchDemoCross {
public static void main(String[] args) throws Exception{
//獲取運行環境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//tuple2<用戶id,用戶姓名>
ArrayList<String> data1 = new ArrayList<>();
data1.add("zs");
data1.add("ww");
//tuple2<用戶id,用戶所在城市>
ArrayList<Integer> data2 = new ArrayList<>();
data2.add(1);
data2.add(2);
DataSource<String> text1 = env.fromCollection(data1);
DataSource<Integer> text2 = env.fromCollection(data2);
CrossOperator.DefaultCross<String, Integer> cross = text1.cross(text2);
cross.print();
}
複製代碼
Union:返回兩個數據集的總和,數據類型須要一致
First-n:獲取集合中的前N個元素
Sort Partition:在本地對數據集的全部分區進行排序,經過sortPartition()的連接調用來完成對多個字段的排序
public class BatchDemoFirstN {
public static void main(String[] args) throws Exception{
//獲取運行環境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
data.add(new Tuple2<>(2,"zs"));
data.add(new Tuple2<>(4,"ls"));
data.add(new Tuple2<>(3,"ww"));
data.add(new Tuple2<>(1,"xw"));
data.add(new Tuple2<>(1,"aw"));
data.add(new Tuple2<>(1,"mw"));
DataSource<Tuple2<Integer, String>> text = env.fromCollection(data);
//獲取前3條數據,按照數據插入的順序
text.first(3).print();
System.out.println("==============================");
//根據數據中的第一列進行分組,獲取每組的前2個元素
text.groupBy(0).first(2).print();
System.out.println("==============================");
//根據數據中的第一列分組,再根據第二列進行組內排序[升序],獲取每組的前2個元素
text.groupBy(0).sortGroup(1, Order.ASCENDING).first(2).print();
System.out.println("==============================");
//不分組,全局排序獲取集合中的前3個元素,針對第一個元素升序,第二個元素倒序
text.sortPartition(0,Order.ASCENDING).sortPartition(1,Order.DESCENDING).first(3).print();
}
複製代碼
}
Rebalance:對數據集進行再平衡,重分區,消除數據傾斜
Hash-Partition:根據指定key的哈希值對數據集進行分區
partitionByHash()
Custom Partitioning:自定義分區規則
自定義分區須要實現Partitioner接口
partitionCustom(partitioner, "someKey")
partitionCustom(partitioner, 0)
public class BatchDemoHashRangePartition {
public static void main(String[] args) throws Exception{
//獲取運行環境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
data.add(new Tuple2<>(1,"hello1"));
data.add(new Tuple2<>(2,"hello2"));
data.add(new Tuple2<>(2,"hello3"));
data.add(new Tuple2<>(3,"hello4"));
data.add(new Tuple2<>(3,"hello5"));
data.add(new Tuple2<>(3,"hello6"));
data.add(new Tuple2<>(4,"hello7"));
data.add(new Tuple2<>(4,"hello8"));
data.add(new Tuple2<>(4,"hello9"));
data.add(new Tuple2<>(4,"hello10"));
data.add(new Tuple2<>(5,"hello11"));
data.add(new Tuple2<>(5,"hello12"));
data.add(new Tuple2<>(5,"hello13"));
data.add(new Tuple2<>(5,"hello14"));
data.add(new Tuple2<>(5,"hello15"));
data.add(new Tuple2<>(6,"hello16"));
data.add(new Tuple2<>(6,"hello17"));
data.add(new Tuple2<>(6,"hello18"));
data.add(new Tuple2<>(6,"hello19"));
data.add(new Tuple2<>(6,"hello20"));
data.add(new Tuple2<>(6,"hello21"));
DataSource<Tuple2<Integer, String>> text = env.fromCollection(data);
/*text.partitionByHash(0).mapPartition(new MapPartitionFunction<Tuple2<Integer,String>, Tuple2<Integer,String>>() {
@Override
public void mapPartition(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
Iterator<Tuple2<Integer, String>> it = values.iterator();
while (it.hasNext()){
Tuple2<Integer, String> next = it.next();
System.out.println("當前線程id:"+Thread.currentThread().getId()+","+next);
}
}
}).print();*/
text.partitionByRange(0).mapPartition(new MapPartitionFunction<Tuple2<Integer,String>, Tuple2<Integer,String>>() {
@Override
public void mapPartition(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
Iterator<Tuple2<Integer, String>> it = values.iterator();
while (it.hasNext()){
Tuple2<Integer, String> next = it.next();
System.out.println("當前線程id:"+Thread.currentThread().getId()+","+next);
}
}
}).print();
}
}
複製代碼
Flink自帶了針對諸如int,long,String等標準類型的序列化器
針對Flink沒法實現序列化的數據類型,咱們能夠交給Avro和Kryo
使用方法:ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
使用avro序列化:env.getConfig().enableForceAvro();
使用kryo序列化:env.getConfig().enableForceKryo();
使用自定義序列化:env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/custom_serializers.html
複製代碼
Java Tuple 和 Scala case class
Java POJOs:java實體類
Primitive Types
默認支持java和scala基本數據類型
General Class Types
默認支持大多數java和scala class
Hadoop Writables 支持hadoop中實現了org.apache.hadoop.Writable的數據類型
Special Types
例如scala中的Either Option 和Try
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/api_concepts.html#supported-data-types
複製代碼
秦凱新 於深圳 201812022220