數據的來源是flink程序從中讀取輸入的地方。咱們能夠使用StreamExecutionEnvironment.addSource(sourceFunction)將源添加到程序中。
flink附帶大量預先實現好的各類讀取數據源的函數,也能夠經過爲非並行源去實現SourceFunction接口或者爲並行源實現ParallelSourceFunction接口或擴展RichParallelSourceFunction來編寫知足本身業務須要的定製源。java
下面有幾個預約義的流源能夠從StreamExecutionEnvironment訪問apache
readTextFile(path): 讀取文本文件,該文件要符合TextInputFormat規範,逐行讀取並做爲字符串返回。
readFile(fileInputFormat,path): 根據指定的文件輸入格式指定讀取文件。
readFile(fileInputFormat,path,watchType,interval,pathFilter,typeInfo): 這是前兩個方法在內部調用的方法。它根據給定的fileInputFormat讀取路徑中的文件。根據提供的watchType,該源可能會按期監視(每間隔ms)該路徑下來到的新數據(FileProcessingMode.PROCESS_CONTINUOUSLY),或者處理當前路徑中的數據後並退出(FileProcessingMode.PROCESS_ONCE)。使用pathFilter,用戶能夠進一步排除文件的處理。api
socketTextStream : 從套接字讀取。元素能夠用分隔符分隔。dom
fromCollection(Collection) : 從Java Java.util.Collection建立一個數據流。集合中的全部元素必須是相同的類型。
fromCollection(Iterator,Class) :從迭代器建立數據流。該類要指定迭代器返回的元素的數據類型。
fromElements(T ...) :根據給定的對象序列建立數據流。全部對象必須是相同的類型。
fromParallelCollection(SplittableIterator,Class) : 並行地從迭代器建立數據流。該類指定迭代器返回的元素的數據類型。
generateSequence(from,to) : 在給定的區間內並行生成數字序列 。socket
package com.intsmaze.flink.streaming.source; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.Random; /** * @Description: 自定義數據源的模板 * @Author: intsmaze * @Date: 2019/1/4 */ public class CustomSource { private static final int BOUND = 100; public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<Integer, Integer>> inputStream= env.addSource(new RandomFibonacciSource()); inputStream.map(new InputMap()).print(); env.execute("Intsmaze Custom Source"); } /** * @Description: * @Author: intsmaze * @Date: 2019/1/5 */ private static class RandomFibonacciSource implements SourceFunction<Tuple2<Integer, Integer>> { private static final long serialVersionUID = 1L; private Random rnd = new Random(); private volatile boolean isRunning = true; private int counter = 0; /** * @Description: * @Param: * @return: * @Author: intsmaze * @Date: 2019/1/5 */ @Override public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception { while (isRunning && counter < BOUND) { int first = rnd.nextInt(BOUND / 2 - 1) + 1; int second = rnd.nextInt(BOUND / 2 - 1) + 1; ctx.collect(new Tuple2<>(first, second)); counter++; Thread.sleep(50L); } } @Override public void cancel() { isRunning = false; } } /** * @Description: * @Param: * @return: * @Author: intsmaze * @Date: 2019/1/5 */ public static class InputMap implements MapFunction<Tuple2<Integer, Integer>, Tuple5<Integer, Integer, Integer, Integer, Integer>> { private static final long serialVersionUID = 1L; @Override public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception { return new Tuple5<>(value.f0, value.f1, value.f0, value.f1, 0); } } }