《從0到1學習Flink》—— Data Source 介紹

前言

Data Sources 是什麼呢?就字面意思其實就能夠知道:數據來源。框架

Flink 作爲一款流式計算框架,它可用來作批處理,即處理靜態的數據集、歷史的數據集;也能夠用來作流處理,即實時的處理些實時數據流,實時的產生數據流結果,只要數據源源不斷的過來,Flink 就可以一直計算下去,這個 Data Sources 就是數據的來源地。socket

Flink 中你可使用 StreamExecutionEnvironment.addSource(sourceFunction) 來爲你的程序添加數據來源。學習

Flink 已經提供了若干實現好了的 source functions,固然你也能夠經過實現 SourceFunction 來自定義非並行的 source 或者實現 ParallelSourceFunction 接口或者擴展 RichParallelSourceFunction 來自定義並行的 source,測試

Flink

StreamExecutionEnvironment 中可使用如下幾個已實現的 stream sources,ui

總的來講能夠分爲下面幾大類:spa

基於集合

一、fromCollection(Collection) - 從 Java 的 Java.util.Collection 建立數據流。集合中的全部元素類型必須相同。code

二、fromCollection(Iterator, Class) - 從一個迭代器中建立數據流。Class 指定了該迭代器返回元素的類型。orm

三、fromElements(T …) - 從給定的對象序列中建立數據流。全部對象類型必須相同。對象

1
2
3
4
5
6
7
8
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Event> input = env.fromElements(
	new Event(1, "barfoo", 1.0),
	new Event(2, "start", 2.0),
	new Event(3, "foobar", 3.0),
	...
);

四、fromParallelCollection(SplittableIterator, Class) - 從一個迭代器中建立並行數據流。Class 指定了該迭代器返回元素的類型。blog

五、generateSequence(from, to) - 建立一個生成指定區間範圍內的數字序列的並行數據流。

基於文件

一、readTextFile(path) - 讀取文本文件,即符合 TextInputFormat 規範的文件,並將其做爲字符串返回。

1
2
3
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = env.readTextFile("file:///path/to/file");

二、readFile(fileInputFormat, path) - 根據指定的文件輸入格式讀取文件(一次)。

三、readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 這是上面兩個方法內部調用的方法。它根據給定的 fileInputFormat 和讀取路徑讀取文件。根據提供的 watchType,這個 source 能夠按期(每隔 interval 毫秒)監測給定路徑的新數據(FileProcessingMode.PROCESS_CONTINUOUSLY),或者處理一次路徑對應文件的數據並退出(FileProcessingMode.PROCESS_ONCE)。你能夠經過 pathFilter 進一步排除掉須要處理的文件。

1
2
3
4
5
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<MyEvent> stream = env.readFile(
        myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
        FilePathFilter.createDefaultFilter(), typeInfo);

實現:

在具體實現上,Flink 把文件讀取過程分爲兩個子任務,即目錄監控和數據讀取。每一個子任務都由單獨的實體實現。目錄監控由單個非並行(並行度爲1)的任務執行,而數據讀取由並行運行的多個任務執行。後者的並行性等於做業的並行性。單個目錄監控任務的做用是掃描目錄(根據 watchType 按期掃描或僅掃描一次),查找要處理的文件並把文件分割成切分片(splits),而後將這些切分片分配給下游 reader。reader 負責讀取數據。每一個切分片只能由一個 reader 讀取,但一個 reader 能夠逐個讀取多個切分片。

重要注意:

若是 watchType 設置爲 FileProcessingMode.PROCESS_CONTINUOUSLY,則當文件被修改時,其內容將被從新處理。這會打破「exactly-once」語義,由於在文件末尾附加數據將致使其全部內容被從新處理。

若是 watchType 設置爲 FileProcessingMode.PROCESS_ONCE,則 source 僅掃描路徑一次而後退出,而不等待 reader 完成文件內容的讀取。固然 reader 會繼續閱讀,直到讀取全部的文件內容。關閉 source 後就不會再有檢查點。這可能致使節點故障後的恢復速度較慢,由於該做業將從最後一個檢查點恢復讀取。

基於 Socket:

socketTextStream(String hostname, int port) - 從 socket 讀取。元素能夠用分隔符切分。

1
2
3
4
5
6
7
8
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Tuple2<String, Integer>> dataStream = env
        .socketTextStream("localhost", 9999) // 監聽 localhost 的 9999 端口過來的數據
        .flatMap(new Splitter())
        .keyBy(0)
        .timeWindow(Time.seconds(5))
        .sum(1);

這個在 《從0到1學習Flink》—— Mac 上搭建 Flink 1.6.0 環境並構建運行簡單程序入門 文章裏用的就是基於 Socket 的 Word Count 程序。

自定義:

addSource - 添加一個新的 source function。例如,你能夠 addSource(new FlinkKafkaConsumer011<>(…)) 以從 Apache Kafka 讀取數據

說下上面幾種的特色吧

一、基於集合:有界數據集,更偏向於本地測試用

二、基於文件:適合監聽文件修改並讀取其內容

三、基於 Socket:監聽主機的 host port,從 Socket 中獲取數據

四、自定義 addSource:大多數的場景數據都是無界的,會源源不斷的過來。好比去消費 Kafka 某個 topic 上的數據,這時候就須要用到這個 addSource,可能由於用的比較多的緣由吧,Flink 直接提供了 FlinkKafkaConsumer011 等類可供你直接使用。你能夠去看看 FlinkKafkaConsumerBase 這個基礎類,它是 Flink Kafka 消費的最根本的類。

1
2
3
4
5
6
7
8
9
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<KafkaEvent> input = env
		.addSource(
			new FlinkKafkaConsumer011<>(
				parameterTool.getRequired("input-topic"), //從參數中獲取傳進來的 topic 
				new KafkaEventSchema(),
				parameterTool.getProperties())
			.assignTimestampsAndWatermarks(new CustomWatermarkExtractor()));

Flink 目前支持以下圖裏面常見的 Source:

若是你想本身自定義本身的 Source 呢?

那麼你就須要去了解一下 SourceFunction 接口了,它是全部 stream source 的根接口,它繼承自一個標記接口(空接口)Function。

SourceFunction 定義了兩個接口方法:

一、run : 啓動一個 source,即對接一個外部數據源而後 emit 元素造成 stream(大部分狀況下會經過在該方法裏運行一個 while 循環的形式來產生 stream)。

二、cancel : 取消一個 source,也即將 run 中的循環 emit 元素的行爲終止。

正常狀況下,一個 SourceFunction 實現這兩個接口方法就能夠了。其實這兩個接口方法也固定了一種實現模板。

好比,實現一個 XXXSourceFunction,那麼大體的模板是這樣的:(直接拿 FLink 源碼的實例給你看看)

最後

本文主要講了下 Flink 的常見 Source 有哪些而且簡單的提了下如何自定義 Source。

相關文章
相關標籤/搜索