Flink1.7.2 local WordCount源碼分析

Flink1.7.2 local WordCount源碼分析

概述

時序圖

輸入數據

  • nc -lk 1234
a b a b a

客戶端程序

SocketWindowWordCountLocal.scala

package com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

/**
  * nc -lk 1234  輸入數據
  */
object SocketWindowWordCountLocal {

  def main(args: Array[String]): Unit = {


    val port = 1234
    // get the execution environment
   // val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment


    val configuration : Configuration = new Configuration()
    val timeout = "100000 s"
    val timeoutHeartbeatPause = "1000000 s"
    configuration.setString("akka.ask.timeout",timeout)
    configuration.setString("akka.lookup.timeout",timeout)
    configuration.setString("akka.tcp.timeout",timeout)
    configuration.setString("akka.transport.heartbeat.interval",timeout)
    configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause)
    configuration.setString("akka.watch.heartbeat.pause",timeout)
    configuration.setInteger("heartbeat.interval",10000000)
    configuration.setInteger("heartbeat.timeout",50000000)
    val env:StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1,configuration)





    // get input data by connecting to the socket
    val dataStream = env.socketTextStream("localhost", port, '\n')



    import org.apache.flink.streaming.api.scala._
    val textResult = dataStream.flatMap( w => w.split("\\s") ).map( w => WordWithCount(w,1))
      .keyBy("word")
      /**
        * 每20秒刷新一次,至關於從新開始計數,
        * 好處,不須要一直拿全部的數據統計
        * 只須要在指定時間間隔內的增量數據,減小了數據規模
        */
      .timeWindow(Time.seconds(20))
      //.countWindow(3)
      //.countWindow(3,1)
      //.countWindowAll(3)


      .sum("count" )

    textResult.print().setParallelism(1)



    if(args == null || args.size ==0){
      env.execute("默認做業")

      //執行計劃
      //println(env.getExecutionPlan)
      //StreamGraph
     //println(env.getStreamGraph.getStreamingPlanAsJSON)



      //JsonPlanGenerator.generatePlan(jobGraph)

    }else{
      env.execute(args(0))
    }

    println("結束")

  }


  // Data type for words with count
  case class WordWithCount(word: String, count: Long)

}

Flink源碼分析

Source(讀取數據)

SocketTextStreamFunction

  • SocketTextStreamFunction.run函數,只要task在運行,就一直經過Socket鏈接流,BufferedReader.read進行讀取,每次讀8kb,而後對緩存中的數據進行按行處理
  • NonTimestampContext.collect函數進行處理
@Override
	public void run(SourceContext<String> ctx) throws Exception {
		final StringBuilder buffer = new StringBuilder();
		long attempt = 0;

		while (isRunning) {

			try (Socket socket = new Socket()) {
				currentSocket = socket;

				LOG.info("Connecting to server socket " + hostname + ':' + port);
				socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME);
				try (BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {

					char[] cbuf = new char[8192];
					int bytesRead;
					while (isRunning && (bytesRead = reader.read(cbuf)) != -1) {
						buffer.append(cbuf, 0, bytesRead);
						int delimPos;
						while (buffer.length() >= delimiter.length() && (delimPos = buffer.indexOf(delimiter)) != -1) {
							String record = buffer.substring(0, delimPos);
							// truncate trailing carriage return
							if (delimiter.equals("\n") && record.endsWith("\r")) {
								record = record.substring(0, record.length() - 1);
							}
							ctx.collect(record);
							buffer.delete(0, delimPos + delimiter.length());
						}
					}
				}
			}

			// if we dropped out of this loop due to an EOF, sleep and retry
			if (isRunning) {
				attempt++;
				if (maxNumRetries == -1 || attempt < maxNumRetries) {
					LOG.warn("Lost connection to server socket. Retrying in " + delayBetweenRetries + " msecs...");
					Thread.sleep(delayBetweenRetries);
				}
				else {
					// this should probably be here, but some examples expect simple exists of the stream source
					// throw new EOFException("Reached end of stream and reconnects are not enabled.");
					break;
				}
			}
		}

		// collect trailing data
		if (buffer.length() > 0) {
			ctx.collect(buffer.toString());
		}
	}

NonTimestampContext

  • collect
  • element參數爲讀取到source中的一行數據
  • 調用AbstractStreamOperator.CountingOutput.collect
public void collect(T element) {
			synchronized (lock) {
				output.collect(reuse.replace(element));
			}
		}

AbstractStreamOperator.CountingOutput

  • collect
  • 調用CopyingChainingOutput.collect
@Override
		public void collect(StreamRecord<OUT> record) {
			numRecordsOut.inc();
			output.collect(record);
		}

CopyingChainingOutput.collect

  • collect
  • 調用pushToOperator()
public void collect(StreamRecord<T> record) {
			if (this.outputTag != null) {
				// we are only responsible for emitting to the main input
				return;
			}

			pushToOperator(record);
		}

pushToOperator

  • 調用StreamFlatMap.processElement
protected <X> void pushToOperator(StreamRecord<X> record) {
			try {
				// we know that the given outputTag matches our OutputTag so the record
				// must be of the type that our operator (and Serializer) expects.
				@SuppressWarnings("unchecked")
				StreamRecord<T> castRecord = (StreamRecord<T>) record;

				numRecordsIn.inc();
				StreamRecord<T> copy = castRecord.copy(serializer.copy(castRecord.getValue()));
				operator.setKeyContextElement1(copy);
				operator.processElement(copy);
			} catch (ClassCastException e) {
				if (outputTag != null) {
					// Enrich error message
					ClassCastException replace = new ClassCastException(
						String.format(
							"%s. Failed to push OutputTag with id '%s' to operator. " +
								"This can occur when multiple OutputTags with different types " +
								"but identical names are being used.",
							e.getMessage(),
							outputTag.getId()));

					throw new ExceptionInChainedOperatorException(replace);
				} else {
					throw new ExceptionInChainedOperatorException(e);
				}
			} catch (Exception e) {
				throw new ExceptionInChainedOperatorException(e);
			}

		}
	}

Operator(FlatMap)

StreamFlatMap

  • processElement
  • userFunction爲自定義函數,即flatMap( w => w.split("\s") ),括號中的表達式
  • element.getValue()爲source中的一行數據
  • 調用DataStream.flatMap
public void processElement(StreamRecord<IN> element) throws Exception {
		collector.setTimestamp(element);
		userFunction.flatMap(element.getValue(), collector);
	}

DataStream

  • flatMap
  • cleanFun(in) 至關因而,source中的一行數據,執行完flatMap函數後返回的結果數據,而後進行foreach遍歷,即取出集合中的一個元素,調用out.collect函數,即調用TimestampedCollector.collect
/**
   * Creates a new DataStream by applying the given function to every element and flattening
   * the results.
   */
  def flatMap[R: TypeInformation](fun: T => TraversableOnce[R]): DataStream[R] = {
    if (fun == null) {
      throw new NullPointerException("FlatMap function must not be null.")
    }
    val cleanFun = clean(fun)
    val flatMapper = new FlatMapFunction[T, R] {
      def flatMap(in: T, out: Collector[R]) { cleanFun(in) foreach out.collect }
    }
    flatMap(flatMapper)
  }

Operator(Map)

TimestampedCollector

  • collect
  • 調用CountingOutput.collect()
public void collect(T record) {
		output.collect(reuse.replace(record));
	}

CountingOutput

  • 調用CopyingChainingOutput.collect
public void collect(StreamRecord<OUT> record) {
			numRecordsOut.inc();
			output.collect(record);
		}

CopyingChainingOutput

  • 調用函數pushToOperator()
public void collect(StreamRecord<T> record) {
			if (this.outputTag != null) {
				// we are only responsible for emitting to the main input
				return;
			}

			pushToOperator(record);
		}
  • 調用operator.processElement(copy);即StreamMap.processElement
protected <X> void pushToOperator(StreamRecord<X> record) {
			try {
				// we know that the given outputTag matches our OutputTag so the record
				// must be of the type that our operator (and Serializer) expects.
				@SuppressWarnings("unchecked")
				StreamRecord<T> castRecord = (StreamRecord<T>) record;

				numRecordsIn.inc();
				StreamRecord<T> copy = castRecord.copy(serializer.copy(castRecord.getValue()));
				operator.setKeyContextElement1(copy);
				operator.processElement(copy);
			} catch (ClassCastException e) {
				if (outputTag != null) {
					// Enrich error message
					ClassCastException replace = new ClassCastException(
						String.format(
							"%s. Failed to push OutputTag with id '%s' to operator. " +
								"This can occur when multiple OutputTags with different types " +
								"but identical names are being used.",
							e.getMessage(),
							outputTag.getId()));

					throw new ExceptionInChainedOperatorException(replace);
				} else {
					throw new ExceptionInChainedOperatorException(e);
				}
			} catch (Exception e) {
				throw new ExceptionInChainedOperatorException(e);
			}

		}
	}

StreamMap

  • userFunction 至關於map( w => WordWithCount(w,1)) 括號中的表達式
  • userFunction.map(element.getValue()) 至關於,拿到Source中一行數據,進行FlatMap操做後,取集合中的一個元素,再進行flatMap操做,獲得的值:(a,1)
  • 再調用output.collect,即 CountingOutput.collect
public void processElement(StreamRecord<IN> element) throws Exception {
		output.collect(element.replace(userFunction.map(element.getValue())));
	}

CountingOutput

  • 調用RecordWriterOutput.collect
public void collect(StreamRecord<OUT> record) {
			numRecordsOut.inc();
			output.collect(record);
		}

RecordWriterOutput

  • 調用函數pushToRecordWriter
public void collect(StreamRecord<OUT> record) {
		if (this.outputTag != null) {
			// we are only responsible for emitting to the main input
			return;
		}

		pushToRecordWriter(record);
	}
  • pushToRecordWriter
  • 調用StreamRecordWriter.emit
private <X> void pushToRecordWriter(StreamRecord<X> record) {
		serializationDelegate.setInstance(record);

		try {
			recordWriter.emit(serializationDelegate);
		}
		catch (Exception e) {
			throw new RuntimeException(e.getMessage(), e);
		}
	}

StreamRecordWriter

  • 調用RecordWriter.emit
public void emit(T record) throws IOException, InterruptedException {
		checkErroneous();
		super.emit(record);
	}

RecordWriter

  • 調用emit
public void emit(T record) throws IOException, InterruptedException {
		emit(record, channelSelector.selectChannels(record, numChannels));
	}
  • emit
  • 調用copyFromSerializerToTargetChannel(),該函數會往Channel中寫數據,會觸發WindowOperator
private void emit(T record, int[] targetChannels) throws IOException, InterruptedException {
		serializer.serializeRecord(record);

		boolean pruneAfterCopying = false;
		for (int channel : targetChannels) {
			if (copyFromSerializerToTargetChannel(channel)) {
				pruneAfterCopying = true;
			}
		}

		// Make sure we don't hold onto the large intermediate serialization buffer for too long
		if (pruneAfterCopying) {
			serializer.prune();
		}
	}
  • copyFromSerializerToTargetChannel
/**
	 * @param targetChannel
	 * @return <tt>true</tt> if the intermediate serialization buffer should be pruned
	 */
	private boolean copyFromSerializerToTargetChannel(int targetChannel) throws IOException, InterruptedException {
		// We should reset the initial position of the intermediate serialization buffer before
		// copying, so the serialization results can be copied to multiple target buffers.
		serializer.reset();

		boolean pruneTriggered = false;
		BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
		SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder);
		while (result.isFullBuffer()) {
			numBytesOut.inc(bufferBuilder.finish());
			numBuffersOut.inc();

			// If this was a full record, we are done. Not breaking out of the loop at this point
			// will lead to another buffer request before breaking out (that would not be a
			// problem per se, but it can lead to stalls in the pipeline).
			if (result.isFullRecord()) {
				pruneTriggered = true;
				bufferBuilders[targetChannel] = Optional.empty();
				break;
			}

			bufferBuilder = requestNewBufferBuilder(targetChannel);
			result = serializer.copyToBufferBuilder(bufferBuilder);
		}
		checkState(!serializer.hasSerializedData(), "All data should be written at once");

		if (flushAlways) {
			targetPartition.flush(targetChannel);
		}
		return pruneTriggered;
	}

window operator(reduce)

WindowOperator

  • processElement,該函數,每次source進行flatMap,map,以後,即(a,1) 這樣的元素調用emit以後,就會觸發該函數調用,每個元素進行emit以後,都會調用該函數git

  • windowAssigner.assignWindows,把每個元素分配給對應的windowgithub

  • 把該元素存到HeapReducingState.add()中, 這個state值在WindowOperator.windowState.stateTable.primaryTable.state 這個裏邊存着apache

  • add()調用transform,最終調用ReduceTransformation.apply,該函數會調用reduce函數,在同一次window中,每來一個相同key,就更新一次,實現累加,windows

    public V apply(V previousState, V value) throws Exception { return previousState != null ? reduceFunction.reduce(previousState, value) : value; }
  • 每個元素都關聯trigger,TriggerResult triggerResult = triggerContext.onElement(element)api

  • triggerResult.isFire(),只有當前window完成才爲true緩存

public void processElement(StreamRecord<IN> element) throws Exception {
		final Collection<W> elementWindows = windowAssigner.assignWindows(
			element.getValue(), element.getTimestamp(), windowAssignerContext);

		//if element is handled by none of assigned elementWindows
		boolean isSkippedElement = true;

		final K key = this.<K>getKeyedStateBackend().getCurrentKey();

		if (windowAssigner instanceof MergingWindowAssigner) {
			MergingWindowSet<W> mergingWindows = getMergingWindowSet();

			for (W window: elementWindows) {

				// adding the new window might result in a merge, in that case the actualWindow
				// is the merged window and we work with that. If we don't merge then
				// actualWindow == window
				W actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() {
					@Override
					public void merge(W mergeResult,
							Collection<W> mergedWindows, W stateWindowResult,
							Collection<W> mergedStateWindows) throws Exception {

						if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) {
							throw new UnsupportedOperationException("The end timestamp of an " +
									"event-time window cannot become earlier than the current watermark " +
									"by merging. Current watermark: " + internalTimerService.currentWatermark() +
									" window: " + mergeResult);
						} else if (!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= internalTimerService.currentProcessingTime()) {
							throw new UnsupportedOperationException("The end timestamp of a " +
									"processing-time window cannot become earlier than the current processing time " +
									"by merging. Current processing time: " + internalTimerService.currentProcessingTime() +
									" window: " + mergeResult);
						}

						triggerContext.key = key;
						triggerContext.window = mergeResult;

						triggerContext.onMerge(mergedWindows);

						for (W m: mergedWindows) {
							triggerContext.window = m;
							triggerContext.clear();
							deleteCleanupTimer(m);
						}

						// merge the merged state windows into the newly resulting state window
						windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);
					}
				});

				// drop if the window is already late
				if (isWindowLate(actualWindow)) {
					mergingWindows.retireWindow(actualWindow);
					continue;
				}
				isSkippedElement = false;

				W stateWindow = mergingWindows.getStateWindow(actualWindow);
				if (stateWindow == null) {
					throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
				}

				windowState.setCurrentNamespace(stateWindow);
				windowState.add(element.getValue());

				triggerContext.key = key;
				triggerContext.window = actualWindow;

				TriggerResult triggerResult = triggerContext.onElement(element);

				if (triggerResult.isFire()) {
					ACC contents = windowState.get();
					if (contents == null) {
						continue;
					}
					emitWindowContents(actualWindow, contents);
				}

				if (triggerResult.isPurge()) {
					windowState.clear();
				}
				registerCleanupTimer(actualWindow);
			}

			// need to make sure to update the merging state in state
			mergingWindows.persist();
		} else {
			for (W window: elementWindows) {

				// drop if the window is already late
				if (isWindowLate(window)) {
					continue;
				}
				isSkippedElement = false;

				windowState.setCurrentNamespace(window);
				windowState.add(element.getValue());

				triggerContext.key = key;
				triggerContext.window = window;

				TriggerResult triggerResult = triggerContext.onElement(element);

				if (triggerResult.isFire()) {
					ACC contents = windowState.get();
					if (contents == null) {
						continue;
					}
					emitWindowContents(window, contents);
				}

				if (triggerResult.isPurge()) {
					windowState.clear();
				}
				registerCleanupTimer(window);
			}
		}

		// side output input event if
		// element not handled by any window
		// late arriving tag has been set
		// windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
		if (isSkippedElement && isElementLate(element)) {
			if (lateDataOutputTag != null){
				sideOutput(element);
			} else {
				this.numLateRecordsDropped.inc();
			}
		}
	}
  • onProcessingTime
  • 調window完成會調用onProcessingTime()函數
  • WindowOperator.processElement()中triggerContext.onElement(element),中的trigger最終當完成window時,會調用WindowOperator.onProcessingTime()
  • 取state中的數據,調用emitWindowContents()函數
public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
		triggerContext.key = timer.getKey();
		triggerContext.window = timer.getNamespace();

		MergingWindowSet<W> mergingWindows;

		if (windowAssigner instanceof MergingWindowAssigner) {
			mergingWindows = getMergingWindowSet();
			W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
			if (stateWindow == null) {
				// Timer firing for non-existent window, this can only happen if a
				// trigger did not clean up timers. We have already cleared the merging
				// window and therefore the Trigger state, however, so nothing to do.
				return;
			} else {
				windowState.setCurrentNamespace(stateWindow);
			}
		} else {
			windowState.setCurrentNamespace(triggerContext.window);
			mergingWindows = null;
		}

		TriggerResult triggerResult = triggerContext.onProcessingTime(timer.getTimestamp());

		if (triggerResult.isFire()) {
			ACC contents = windowState.get();
			if (contents != null) {
				emitWindowContents(triggerContext.window, contents);
			}
		}

		if (triggerResult.isPurge()) {
			windowState.clear();
		}

		if (!windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
			clearAllState(triggerContext.window, windowState, mergingWindows);
		}

		if (mergingWindows != null) {
			// need to make sure to update the merging state in state
			mergingWindows.persist();
		}
	}

emitWindowContents

private void emitWindowContents(W window, ACC contents) throws Exception {
		timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
		processContext.window = window;
		userFunction.process(triggerContext.key, window, processContext, contents, timestampedCollector);
	}

SinkStream(PrintSinkFunction)

InternalSingleValueWindowFunction

  • PassThroughWindowFunction.apply
public void process(KEY key, W window, InternalWindowContext context, IN input, Collector<OUT> out) throws Exception {
		wrappedFunction.apply(key, window, Collections.singletonList(input), out);
	}

PassThroughWindowFunction

  • TimestampedCollector.collect
public void apply(K k, W window, Iterable<T> input, Collector<T> out) throws Exception {
		for (T in: input) {
			out.collect(in);
		}
	}

TimestampedCollector

  • AbstractStreamOperator.CountingOutput.collect
public void collect(T record) {
		output.collect(reuse.replace(record));
	}

AbstractStreamOperator.CountingOutput

  • OperatorChain.CopyingChainingOutput.collect
public void collect(StreamRecord<OUT> record) {
			numRecordsOut.inc();
			output.collect(record);
		}

OperatorChain.CopyingChainingOutput

  • pushToOperator
public void collect(StreamRecord<T> record) {
			if (this.outputTag != null) {
				// we are only responsible for emitting to the main input
				return;
			}

			pushToOperator(record);
		}
  • pushToOperator
  • StreamSink.processElement
protected <X> void pushToOperator(StreamRecord<X> record) {
			try {
				// we know that the given outputTag matches our OutputTag so the record
				// must be of the type that our operator (and Serializer) expects.
				@SuppressWarnings("unchecked")
				StreamRecord<T> castRecord = (StreamRecord<T>) record;

				numRecordsIn.inc();
				StreamRecord<T> copy = castRecord.copy(serializer.copy(castRecord.getValue()));
				operator.setKeyContextElement1(copy);
				operator.processElement(copy);
			} catch (ClassCastException e) {
				if (outputTag != null) {
					// Enrich error message
					ClassCastException replace = new ClassCastException(
						String.format(
							"%s. Failed to push OutputTag with id '%s' to operator. " +
								"This can occur when multiple OutputTags with different types " +
								"but identical names are being used.",
							e.getMessage(),
							outputTag.getId()));

					throw new ExceptionInChainedOperatorException(replace);
				} else {
					throw new ExceptionInChainedOperatorException(e);
				}
			} catch (Exception e) {
				throw new ExceptionInChainedOperatorException(e);
			}

		}
	}

StreamSink

  • PrintSinkFunction.invoke 打印輸出
public void processElement(StreamRecord<IN> element) throws Exception {
		sinkContext.element = element;
		userFunction.invoke(element.getValue(), sinkContext);
	}

endapp

相關文章
相關標籤/搜索