傳統的大數據處理方式通常是批處理式的,也就是說,今天所收集的數據,咱們明天再把今天收集到的數據算出來,以供你們使用,可是在不少狀況下,數據的時效性對於業務的成敗是很是關鍵的。數據庫
Spark 和 Flink 都是通用的開源大規模處理引擎,目標是在一個系統中支持全部的數據處理以帶來效能的提高。二者都有相對比較成熟的生態系統。是下一代大數據引擎最有力的競爭者。apache
Spark 的生態整體更完善一些,在機器學習的集成和易用性上暫時領先。windows
Flink 在流計算上有明顯優點,核心架構和模型也更透徹和靈活一些。api
本文主要經過實例來分析flink的流式處理過程,並經過源碼的方式來介紹流式處理的內部機制。網絡
DataStream總體概述架構
主要分5部分,下面咱們來分別介紹:機器學習
1.運行環境StreamExecutionEnvironmentsocket
StreamExecutionEnvironment是個抽象類,是流式處理的容器,實現類有兩個,分別是ide
LocalStreamEnvironment:
RemoteStreamEnvironment:
/** * The StreamExecutionEnvironment is the context in which a streaming program is executed. A * {@link LocalStreamEnvironment} will cause execution in the current JVM, a * {@link RemoteStreamEnvironment} will cause execution on a remote setup. * * <p>The environment provides methods to control the job execution (such as setting the parallelism * or the fault tolerance/checkpointing parameters) and to interact with the outside world (data access). * * @see org.apache.flink.streaming.api.environment.LocalStreamEnvironment * @see org.apache.flink.streaming.api.environment.RemoteStreamEnvironment */
2.數據源DataSource數據輸入學習
包含了輸入格式InputFormat
/** * Creates a new data source. * * @param context The environment in which the data source gets executed. * @param inputFormat The input format that the data source executes. * @param type The type of the elements produced by this input format. */ public DataSource(ExecutionEnvironment context, InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> type, String dataSourceLocationName) { super(context, type); this.dataSourceLocationName = dataSourceLocationName; if (inputFormat == null) { throw new IllegalArgumentException("The input format may not be null."); } this.inputFormat = inputFormat; if (inputFormat instanceof NonParallelInput) { this.parallelism = 1; } }
flink將數據源主要分爲內置數據源和第三方數據源,內置數據源有 文件,網絡socket端口及集合類型數據;第三方數據源實用Connector的方式來鏈接如kafka Connector,es connector等,本身定義的話,能夠實現SourceFunction,封裝成Connector來作。
3.DataStream轉換
DataStream:同一個類型的流元素,DataStream能夠經過transformation轉換成另外的DataStream,示例以下
@link DataStream#map
@link DataStream#filter
StreamOperator:流式算子的基本接口,三個實現類
AbstractStreamOperator:
OneInputStreamOperator:
TwoInputStreamOperator:
/** * Basic interface for stream operators. Implementers would implement one of * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator} or * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator} to create operators * that process elements. * * <p>The class {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator} * offers default implementation for the lifecycle and properties methods. * * <p>Methods of {@code StreamOperator} are guaranteed not to be called concurrently. Also, if using * the timer service, timer callbacks are also guaranteed not to be called concurrently with * methods on {@code StreamOperator}. * * @param <OUT> The output type of the operator */
4.DataStreamSink輸出
/** * Adds the given sink to this DataStream. Only streams with sinks added * will be executed once the {@link StreamExecutionEnvironment#execute()} * method is called. * * @param sinkFunction * The object containing the sink's invoke function. * @return The closed DataStream. */ public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) { // read the output type of the input Transform to coax out errors about MissingTypeInfo transformation.getOutputType(); // configure the type if needed if (sinkFunction instanceof InputTypeConfigurable) { ((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig()); } StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction)); DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator); getExecutionEnvironment().addOperator(sink.getTransformation()); return sink; }
5.執行
/** * Executes the JobGraph of the on a mini cluster of ClusterUtil with a user * specified name. * * @param jobName * name of the job * @return The result of the job execution, containing elapsed time and accumulators. */ @Override public JobExecutionResult execute(String jobName) throws Exception { // transform the streaming program into a JobGraph StreamGraph streamGraph = getStreamGraph(); streamGraph.setJobName(jobName); JobGraph jobGraph = streamGraph.getJobGraph(); jobGraph.setAllowQueuedScheduling(true); Configuration configuration = new Configuration(); configuration.addAll(jobGraph.getJobConfiguration()); configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0"); // add (and override) the settings with what the user defined configuration.addAll(this.configuration); if (!configuration.contains(RestOptions.BIND_PORT)) { configuration.setString(RestOptions.BIND_PORT, "0"); } int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism()); MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder() .setConfiguration(configuration) .setNumSlotsPerTaskManager(numSlotsPerTaskManager) .build(); if (LOG.isInfoEnabled()) { LOG.info("Running job on local embedded Flink mini cluster"); } MiniCluster miniCluster = new MiniCluster(cfg); try { miniCluster.start(); configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().get().getPort()); return miniCluster.executeJobBlocking(jobGraph); } finally { transformations.clear(); miniCluster.close(); } }
6.總結
Flink的執行方式相似於管道,它借鑑了數據庫的一些執行原理,實現了本身獨特的執行方式。
7.展望
Stream涉及的內容還包括Watermark,window等概念,因篇幅限制,這篇僅介紹flink DataStream API使用及原理。
下篇將介紹Watermark,下下篇是windows窗口計算。
參考資料
【1】https://baijiahao.baidu.com/s?id=1625545704285534730&wfr=spider&for=pc