聊聊flink的PrintSinkFunction

本文主要研究一下flink的PrintSinkFunctionhtml

DataStream.print

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.javajava

/**
     * Writes a DataStream to the standard output stream (stdout).
     *
     * <p>For each element of the DataStream the result of {@link Object#toString()} is written.
     *
     * <p>NOTE: This will print to stdout on the machine where the code is executed, i.e. the Flink
     * worker.
     *
     * @return The closed DataStream.
     */
    @PublicEvolving
    public DataStreamSink<T> print() {
        PrintSinkFunction<T> printFunction = new PrintSinkFunction<>();
        return addSink(printFunction).name("Print to Std. Out");
    }

    /**
     * Writes a DataStream to the standard output stream (stderr).
     *
     * <p>For each element of the DataStream the result of {@link Object#toString()} is written.
     *
     * <p>NOTE: This will print to stderr on the machine where the code is executed, i.e. the Flink
     * worker.
     *
     * @return The closed DataStream.
     */
    @PublicEvolving
    public DataStreamSink<T> printToErr() {
        PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(true);
        return addSink(printFunction).name("Print to Std. Err");
    }

    /**
     * Writes a DataStream to the standard output stream (stdout).
     *
     * <p>For each element of the DataStream the result of {@link Object#toString()} is written.
     *
     * <p>NOTE: This will print to stdout on the machine where the code is executed, i.e. the Flink
     * worker.
     *
     * @param sinkIdentifier The string to prefix the output with.
     * @return The closed DataStream.
     */
    @PublicEvolving
    public DataStreamSink<T> print(String sinkIdentifier) {
        PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(sinkIdentifier, false);
        return addSink(printFunction).name("Print to Std. Out");
    }

    /**
     * Writes a DataStream to the standard output stream (stderr).
     *
     * <p>For each element of the DataStream the result of {@link Object#toString()} is written.
     *
     * <p>NOTE: This will print to stderr on the machine where the code is executed, i.e. the Flink
     * worker.
     *
     * @param sinkIdentifier The string to prefix the output with.
     * @return The closed DataStream.
     */
    @PublicEvolving
    public DataStreamSink<T> printToErr(String sinkIdentifier) {
        PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(sinkIdentifier, true);
        return addSink(printFunction).name("Print to Std. Err");
    }

    /**
     * Adds the given sink to this DataStream. Only streams with sinks added
     * will be executed once the {@link StreamExecutionEnvironment#execute()}
     * method is called.
     *
     * @param sinkFunction
     *            The object containing the sink's invoke function.
     * @return The closed DataStream.
     */
    public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {

        // read the output type of the input Transform to coax out errors about MissingTypeInfo
        transformation.getOutputType();

        // configure the type if needed
        if (sinkFunction instanceof InputTypeConfigurable) {
            ((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());
        }

        StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));

        DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);

        getExecutionEnvironment().addOperator(sink.getTransformation());
        return sink;
    }
  • DataStream提供了幾個print開頭的方法,內部是建立了PrintSinkFunction,經過調用addSink操做把該PrintSinkFunction添加進去
  • addSink方法的註釋代表帶有sinks的streams,會在StreamExecutionEnvironment.execute()調用的時候被執行
  • SinkFunction先是被StreamSink包裝,而後被DataStreamSink包裝,最後經過DataStreamSink.getTransformation做爲operator添加到ExecutionEnvironment

SinkFunction

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/sink/SinkFunction.javaapache

/**
 * Interface for implementing user defined sink functionality.
 *
 * @param <IN> Input type parameter.
 */
@Public
public interface SinkFunction<IN> extends Function, Serializable {

    /**
     * @deprecated Use {@link #invoke(Object, Context)}.
     */
    @Deprecated
    default void invoke(IN value) throws Exception {}

    /**
     * Writes the given value to the sink. This function is called for every record.
     *
     * <p>You have to override this method when implementing a {@code SinkFunction}, this is a
     * {@code default} method for backward compatibility with the old-style method only.
     *
     * @param value The input record.
     * @param context Additional context about the input record.
     *
     * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
     *                   to fail and may trigger recovery.
     */
    default void invoke(IN value, Context context) throws Exception {
        invoke(value);
    }

    /**
     * Context that {@link SinkFunction SinkFunctions } can use for getting additional data about
     * an input record.
     *
     * <p>The context is only valid for the duration of a
     * {@link SinkFunction#invoke(Object, Context)} call. Do not store the context and use
     * afterwards!
     *
     * @param <T> The type of elements accepted by the sink.
     */
    @Public // Interface might be extended in the future with additional methods.
    interface Context<T> {

        /** Returns the current processing time. */
        long currentProcessingTime();

        /** Returns the current event-time watermark. */
        long currentWatermark();

        /**
         * Returns the timestamp of the current input record or {@code null} if the element does not
         * have an assigned timestamp.
         */
        Long timestamp();
    }
}
  • SinkFunction接口定義了invoke方法,用來觸發sink邏輯;invoke方法裏頭傳遞了一個Context,該接口定義了currentProcessingTime、currentWatermark、timestamp三個方法

RichSinkFunction

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.javaapi

/**
 * A {@link org.apache.flink.api.common.functions.RichFunction} version of {@link SinkFunction}.
 */
@Public
public abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN> {

    private static final long serialVersionUID = 1L;
}
  • RichSinkFunction抽象類繼承了AbstractRichFunction類,同時也聲明實現SinkFunction接口;大部份內置的sink function都繼承了RichSinkFunction;AbstractRichFunction主要是提供了RuntimeContext屬性,能夠用來獲取function運行時的上下文

PrintSinkFunction

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.javaide

/**
 * Implementation of the SinkFunction writing every tuple to the standard
 * output or standard error stream.
 *
 * <p>
 * Four possible format options:
 *    {@code sinkIdentifier}:taskId> output  <- {@code sinkIdentifier} provided, parallelism > 1
 *    {@code sinkIdentifier}> output         <- {@code sinkIdentifier} provided, parallelism == 1
 *  taskId> output                            <- no {@code sinkIdentifier} provided, parallelism > 1
 *  output                                    <- no {@code sinkIdentifier} provided, parallelism == 1
 * </p>
 *
 * @param <IN> Input record type
 */
@PublicEvolving
public class PrintSinkFunction<IN> extends RichSinkFunction<IN> {

    private static final long serialVersionUID = 1L;

    private final PrintSinkOutputWriter<IN> writer;

    /**
     * Instantiates a print sink function that prints to standard out.
     */
    public PrintSinkFunction() {
        writer = new PrintSinkOutputWriter<>(false);
    }

    /**
     * Instantiates a print sink function that prints to standard out.
     *
     * @param stdErr True, if the format should print to standard error instead of standard out.
     */
    public PrintSinkFunction(final boolean stdErr) {
        writer = new PrintSinkOutputWriter<>(stdErr);
    }

    /**
     * Instantiates a print sink function that prints to standard out and gives a sink identifier.
     *
     * @param stdErr True, if the format should print to standard error instead of standard out.
     * @param sinkIdentifier Message that identify sink and is prefixed to the output of the value
     */
    public PrintSinkFunction(final String sinkIdentifier, final boolean stdErr) {
        writer = new PrintSinkOutputWriter<>(sinkIdentifier, stdErr);
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
        writer.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks());
    }

    @Override
    public void invoke(IN record) {
        writer.write(record);
    }

    @Override
    public String toString() {
        return writer.toString();
    }
}
  • PrintSinkFunction繼承了RichSinkFunction,它主要是使用了PrintSinkOutputWriter,在invoke的時候調用PrintSinkOutputWriter的write方法來執行輸出

PrintSinkOutputWriter

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/functions/util/PrintSinkOutputWriter.javathis

/**
 * Print sink output writer for DataStream and DataSet print API.
 */
@Internal
public class PrintSinkOutputWriter<IN> implements Serializable {

    private static final long serialVersionUID = 1L;

    private static final boolean STD_OUT = false;
    private static final boolean STD_ERR = true;

    private final boolean target;
    private transient PrintStream stream;
    private final String sinkIdentifier;
    private transient String completedPrefix;

    public PrintSinkOutputWriter() {
        this("", STD_OUT);
    }

    public PrintSinkOutputWriter(final boolean stdErr) {
        this("", stdErr);
    }

    public PrintSinkOutputWriter(final String sinkIdentifier, final boolean stdErr) {
        this.target = stdErr;
        this.sinkIdentifier = (sinkIdentifier == null ? "" : sinkIdentifier);
    }

    public void open(int subtaskIndex, int numParallelSubtasks) {
        // get the target stream
        stream = target == STD_OUT ? System.out : System.err;

        completedPrefix = sinkIdentifier;

        if (numParallelSubtasks > 1) {
            if (!completedPrefix.isEmpty()) {
                completedPrefix += ":";
            }
            completedPrefix += (subtaskIndex + 1);
        }

        if (!completedPrefix.isEmpty()) {
            completedPrefix += "> ";
        }
    }

    public void write(IN record) {
        stream.println(completedPrefix + record.toString());
    }

    @Override
    public String toString() {
        return "Print to " + (target == STD_OUT ? "System.out" : "System.err");
    }
}
  • PrintSinkOutputWriter的構造器最多能夠接收兩個參數,分別是sinkIdentifier以及stdErr;sinkIdentifier即爲輸出的前綴,stdErr用於表示是否輸出到System.err
  • open方法主要用於作一些準備工做,它在PrintSinkFunction的open方法裏頭會被調用,PrintSinkFunction的open方法會從AbstractRichFunction定義的RuntimeContext裏頭獲取subtaskIndex及numParallelSubtasks傳遞過來;這裏的open方法根據sinkIdentifier以及subtaskIndex、numParallelSubtasks信息構建completedPrefix
  • write方法就是調用System.out或者System.err的println方法,帶上completedPrefix及record的信息

小結

  • DataStream的幾個print開頭的方法內部建立的是PrintSinkFunction,而後調用addSink方法添加到ExecutionEnvironment中(先是被StreamSink包裝,而後被DataStreamSink包裝,最後經過DataStreamSink.getTransformation做爲operator添加到ExecutionEnvironment)
  • SinkFunction是sink function的基礎接口,它主要定義了invoke方法,該方法裏頭傳遞了一個Context;而內置的一些sink function大可能是繼承的RichSinkFunction,RichSinkFunction主要是繼承了AbstractRichFunction,能夠提供funtion運行時的RuntimeContext信息
  • PrintSinkFunction繼承了RichSinkFunction,它主要是使用了PrintSinkOutputWriter,在invoke的時候調用PrintSinkOutputWriter的write方法來執行輸出

doc

相關文章
相關標籤/搜索