Flink 之 Data Sink

首先 Sink 的中文釋義爲:java

下沉; 下陷; 沉沒; 使下沉; 使沉沒; 倒下; 坐下;

 因此,對應 Data sink 意思有點把數據存儲下來(落庫)的意思;ide

 

Source  數據源  ---- > Compute  計算 -----> sink 落庫

如上圖,Source 就是數據的來源,中間的 Compute 其實就是 Flink 乾的事情,能夠作一系列的操做,操做完後就把計算後的數據結果 Sink 到某個地方。(能夠是 MySQL、ElasticSearch、Kafka、Cassandra 等)。this

這裏我說下本身目前作告警這塊就是把 Compute 計算後的結果 Sink 直接告警出來了(發送告警消息到釘釘羣、郵件、短信等),這個 sink 的意思也不必定非得說成要把數據存儲到某個地方去。spa

其實官網用的 Connector 來形容要去的地方更合適,這個 Connector 能夠有 MySQL、ElasticSearch、Kafka、Cassandra RabbitMQ 等。3d

 

Data Source 介紹了 Flink Data Source 有哪些,這裏也看看 Flink Data Sink 支持的有哪些:code

 

 

 

看下源碼有哪些呢?orm

能夠看到有 Kafka、ElasticSearch、Socket、RabbitMQ、JDBC、Cassandra POJO、File、Print 等 Sink 的方式。blog

 

 

從上圖能夠看到 SinkFunction 接口有 invoke 方法,它有一個 RichSinkFunction 抽象類。繼承

上面的那些自帶的 Sink 能夠看到都是繼承了 RichSinkFunction 抽象類,實現了其中的方法,那麼咱們要是本身定義本身的 Sink 的話其實也是要按照這個套路來作的。接口

這裏就拿個較爲簡單的 PrintSinkFunction 源碼來說下:

@PublicEvolving
public class PrintSinkFunction<IN> extends RichSinkFunction<IN> {
	private static final long serialVersionUID = 1L;

	private static final boolean STD_OUT = false;
	private static final boolean STD_ERR = true;

	private boolean target;
	private transient PrintStream stream;
	private transient String prefix;

	/**
	 * Instantiates a print sink function that prints to standard out.
	 */
	public PrintSinkFunction() {}

	/**
	 * Instantiates a print sink function that prints to standard out.
	 *
	 * @param stdErr True, if the format should print to standard error instead of standard out.
	 */
	public PrintSinkFunction(boolean stdErr) {
		target = stdErr;
	}

	public void setTargetToStandardOut() {
		target = STD_OUT;
	}

	public void setTargetToStandardErr() {
		target = STD_ERR;
	}

	@Override
	public void open(Configuration parameters) throws Exception {
		super.open(parameters);
		StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
		// get the target stream
		stream = target == STD_OUT ? System.out : System.err;

		// set the prefix if we have a >1 parallelism
		prefix = (context.getNumberOfParallelSubtasks() > 1) ?
				((context.getIndexOfThisSubtask() + 1) + "> ") : null;
	}

	@Override
	public void invoke(IN record) {
		if (prefix != null) {
			stream.println(prefix + record.toString());
		}
		else {
			stream.println(record.toString());
		}
	}

	@Override
	public void close() {
		this.stream = null;
		this.prefix = null;
	}

	@Override
	public String toString() {
		return "Print to " + (target == STD_OUT ? "System.out" : "System.err");
	}
}

  

能夠看到它就是實現了 RichSinkFunction 抽象類,而後實現了 invoke 方法,這裏 invoke 方法就是把記錄打印出來了就是,沒作其餘的額外操做。

如何使用?

SingleOutputStreamOperator.addSink(new PrintSinkFunction<>();

  

這樣就能夠了,若是是其餘的 Sink Function 的話須要換成對應的。

使用這個 Function 其效果就是打印從 Source 過來的數據,和直接 Source.print() 效果同樣。

 

 

 

下篇文章咱們將講解下如何自定義本身的 Sink Function,並使用一個 demo 來教你們,讓你們知道這個套路,且可以在本身工做中自定義本身須要的 Sink Function,來完成本身的工做需求。

最後

本文主要講了下 Flink 的 Data Sink,並介紹了常見的 Data Sink,也看了下源碼的 SinkFunction,介紹了一個簡單的 Function 使用, 告訴了你們自定義 Sink Function 的套路,下篇文章帶你們寫個。

 

原創地址爲:http://www.54tianzhisheng.cn/2018/10/29/flink-sink/

相關文章
相關標籤/搜索