聊聊flink KeyedStream的KeySelector

本文主要研究一下flink KeyedStream的KeySelectorhtml

KeyedStream

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.javajava

@Public
public class KeyedStream<T, KEY> extends DataStream<T> {

    /**
     * The key selector that can get the key by which the stream if partitioned from the elements.
     */
    private final KeySelector<T, KEY> keySelector;

    /** The type of the key by which the stream is partitioned. */
    private final TypeInformation<KEY> keyType;

    /**
     * Creates a new {@link KeyedStream} using the given {@link KeySelector}
     * to partition operator state by key.
     *
     * @param dataStream
     *            Base stream of data
     * @param keySelector
     *            Function for determining state partitions
     */
    public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) {
        this(dataStream, keySelector, TypeExtractor.getKeySelectorTypes(keySelector, dataStream.getType()));
    }

    /**
     * Creates a new {@link KeyedStream} using the given {@link KeySelector}
     * to partition operator state by key.
     *
     * @param dataStream
     *            Base stream of data
     * @param keySelector
     *            Function for determining state partitions
     */
    public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) {
        this(
            dataStream,
            new PartitionTransformation<>(
                dataStream.getTransformation(),
                new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)),
            keySelector,
            keyType);
    }

    /**
     * Creates a new {@link KeyedStream} using the given {@link KeySelector} and {@link TypeInformation}
     * to partition operator state by key, where the partitioning is defined by a {@link PartitionTransformation}.
     *
     * @param stream
     *            Base stream of data
     * @param partitionTransformation
     *            Function that determines how the keys are distributed to downstream operator(s)
     * @param keySelector
     *            Function to extract keys from the base stream
     * @param keyType
     *            Defines the type of the extracted keys
     */
    @Internal
    KeyedStream(
        DataStream<T> stream,
        PartitionTransformation<T> partitionTransformation,
        KeySelector<T, KEY> keySelector,
        TypeInformation<KEY> keyType) {

        super(stream.getExecutionEnvironment(), partitionTransformation);
        this.keySelector = clean(keySelector);
        this.keyType = validateKeyType(keyType);
    }

    //......
}
  • 這裏能夠看到KeyedStream的不一樣構造器中都須要一個KeySelector類型的參數

KeySelector

flink-core-1.7.0-sources.jar!/org/apache/flink/api/java/functions/KeySelector.javaexpress

@Public
@FunctionalInterface
public interface KeySelector<IN, KEY> extends Function, Serializable {

    /**
     * User-defined function that deterministically extracts the key from an object.
     *
     * <p>For example for a class:
     * <pre>
     *     public class Word {
     *         String word;
     *         int count;
     *     }
     * </pre>
     * The key extractor could return the word as
     * a key to group all Word objects by the String they contain.
     *
     * <p>The code would look like this
     * <pre>
     *     public String getKey(Word w) {
     *         return w.word;
     *     }
     * </pre>
     *
     * @param value The object to get the key from.
     * @return The extracted key.
     *
     * @throws Exception Throwing an exception will cause the execution of the respective task to fail,
     *                   and trigger recovery or cancellation of the program.
     */
    KEY getKey(IN value) throws Exception;
}
  • KeySelector接口繼承了Function接口,定義了getKey方法,用於從IN類型中提取出KEY

DataStream.keyBy

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.javaapache

/**
     * It creates a new {@link KeyedStream} that uses the provided key for partitioning
     * its operator states.
     *
     * @param key
     *            The KeySelector to be used for extracting the key for partitioning
     * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
     */
    public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key) {
        Preconditions.checkNotNull(key);
        return new KeyedStream<>(this, clean(key));
    }

    /**
     * It creates a new {@link KeyedStream} that uses the provided key with explicit type information
     * for partitioning its operator states.
     *
     * @param key The KeySelector to be used for extracting the key for partitioning.
     * @param keyType The type information describing the key type.
     * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
     */
    public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key, TypeInformation<K> keyType) {
        Preconditions.checkNotNull(key);
        Preconditions.checkNotNull(keyType);
        return new KeyedStream<>(this, clean(key), keyType);
    }

    /**
     * Partitions the operator state of a {@link DataStream} by the given key positions.
     *
     * @param fields
     *            The position of the fields on which the {@link DataStream}
     *            will be grouped.
     * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
     */
    public KeyedStream<T, Tuple> keyBy(int... fields) {
        if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
            return keyBy(KeySelectorUtil.getSelectorForArray(fields, getType()));
        } else {
            return keyBy(new Keys.ExpressionKeys<>(fields, getType()));
        }
    }

    /**
     * Partitions the operator state of a {@link DataStream} using field expressions.
     * A field expression is either the name of a public field or a getter method with parentheses
     * of the {@link DataStream}'s underlying type. A dot can be used to drill
     * down into objects, as in {@code "field1.getInnerField2()" }.
     *
     * @param fields
     *            One or more field expressions on which the state of the {@link DataStream} operators will be
     *            partitioned.
     * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
     **/
    public KeyedStream<T, Tuple> keyBy(String... fields) {
        return keyBy(new Keys.ExpressionKeys<>(fields, getType()));
    }

    private KeyedStream<T, Tuple> keyBy(Keys<T> keys) {
        return new KeyedStream<>(this, clean(KeySelectorUtil.getSelectorForKeys(keys,
                getType(), getExecutionConfig())));
    }
  • DataStream的keyBy方法用於將DataStream轉換爲KeyedStream,該方法有不一樣的重載
  • 一個是支持變長int數組,這個一般用於簡單tuple類型,int爲tuple的小標,從0開始,若是是多個int,表示是組合key,好比keyBy(0,1)表示要用tuple的第一個和第二個字段做爲key;
  • 一個是支持變長String數組,這個一般用於複雜tuple類型及POJO類型,對於POJO,String用於指定字段名,也支持對象/tuple嵌套屬性,好比user.zip,對於對象類型的tuple,f0表示該tuple的第一個字段
  • 一個是支持KeySelector,經過Key Selector Function能夠自由指定key,好比從對象提取而後作些處理
  • keyBy(int... fields)及keyBy(String... fields)裏頭均有調用到私有的keyBy(Keys<T> keys)方法,因爲KeyedStream的構造器都須要KeySelector參數,因此該方法最後也是經過KeySelectorUtil.getSelectorForKeys將Keys轉換爲KeySelector對象

Keys.ExpressionKeys

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/operators/Keys.javaapi

/**
     * Represents (nested) field access through string and integer-based keys
     */
    public static class ExpressionKeys<T> extends Keys<T> {
        
        public static final String SELECT_ALL_CHAR = "*";
        public static final String SELECT_ALL_CHAR_SCALA = "_";
        private static final Pattern WILD_CARD_REGEX = Pattern.compile("[\\.]?("
                + "\\" + SELECT_ALL_CHAR + "|"
                + "\\" + SELECT_ALL_CHAR_SCALA +")$");

        // Flattened fields representing keys fields
        private List<FlatFieldDescriptor> keyFields;
        private TypeInformation<?>[] originalKeyTypes;

        //......

        /**
         * Create String-based (nested) field expression keys on a composite type.
         */
        public ExpressionKeys(String[] keyExpressions, TypeInformation<T> type) {
            checkNotNull(keyExpressions, "Field expression cannot be null.");

            this.keyFields = new ArrayList<>(keyExpressions.length);

            if (type instanceof CompositeType){
                CompositeType<T> cType = (CompositeType<T>) type;
                this.originalKeyTypes = new TypeInformation<?>[keyExpressions.length];

                // extract the keys on their flat position
                for (int i = 0; i < keyExpressions.length; i++) {
                    String keyExpr = keyExpressions[i];

                    if (keyExpr == null) {
                        throw new InvalidProgramException("Expression key may not be null.");
                    }
                    // strip off whitespace
                    keyExpr = keyExpr.trim();

                    List<FlatFieldDescriptor> flatFields = cType.getFlatFields(keyExpr);

                    if (flatFields.size() == 0) {
                        throw new InvalidProgramException("Unable to extract key from expression '" + keyExpr + "' on key " + cType);
                    }
                    // check if all nested fields can be used as keys
                    for (FlatFieldDescriptor field : flatFields) {
                        if (!field.getType().isKeyType()) {
                            throw new InvalidProgramException("This type (" + field.getType() + ") cannot be used as key.");
                        }
                    }
                    // add flat fields to key fields
                    keyFields.addAll(flatFields);

                    String strippedKeyExpr = WILD_CARD_REGEX.matcher(keyExpr).replaceAll("");
                    if (strippedKeyExpr.isEmpty()) {
                        this.originalKeyTypes[i] = type;
                    } else {
                        this.originalKeyTypes[i] = cType.getTypeAt(strippedKeyExpr);
                    }
                }
            }
            else {
                if (!type.isKeyType()) {
                    throw new InvalidProgramException("This type (" + type + ") cannot be used as key.");
                }

                // check that all key expressions are valid
                for (String keyExpr : keyExpressions) {
                    if (keyExpr == null) {
                        throw new InvalidProgramException("Expression key may not be null.");
                    }
                    // strip off whitespace
                    keyExpr = keyExpr.trim();
                    // check that full type is addressed
                    if (!(SELECT_ALL_CHAR.equals(keyExpr) || SELECT_ALL_CHAR_SCALA.equals(keyExpr))) {
                        throw new InvalidProgramException(
                            "Field expression must be equal to '" + SELECT_ALL_CHAR + "' or '" + SELECT_ALL_CHAR_SCALA + "' for non-composite types.");
                    }
                    // add full type as key
                    keyFields.add(new FlatFieldDescriptor(0, type));
                }
                this.originalKeyTypes = new TypeInformation[] {type};
            }
        }

        //......
    }
  • ExpressionKeys是Keys裏頭的一個靜態類,它繼承了Keys對象;keyBy(int... fields)及keyBy(String... fields)裏頭均有經過new Keys.ExpressionKeys,將fields轉換爲Keys.ExpressionKeys,最後調用私有的keyBy(Keys<T> keys)方法

KeySelectorUtil.getSelectorForKeys

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/util/keys/KeySelectorUtil.java數組

@Internal
public final class KeySelectorUtil {

    public static <X> KeySelector<X, Tuple> getSelectorForKeys(Keys<X> keys, TypeInformation<X> typeInfo, ExecutionConfig executionConfig) {
        if (!(typeInfo instanceof CompositeType)) {
            throw new InvalidTypesException(
                    "This key operation requires a composite type such as Tuples, POJOs, or Case Classes.");
        }

        CompositeType<X> compositeType = (CompositeType<X>) typeInfo;

        int[] logicalKeyPositions = keys.computeLogicalKeyPositions();
        int numKeyFields = logicalKeyPositions.length;

        TypeInformation<?>[] typeInfos = keys.getKeyFieldTypes();
        // use ascending order here, the code paths for that are usually a slight bit faster
        boolean[] orders = new boolean[numKeyFields];
        for (int i = 0; i < numKeyFields; i++) {
            orders[i] = true;
        }

        TypeComparator<X> comparator = compositeType.createComparator(logicalKeyPositions, orders, 0, executionConfig);
        return new ComparableKeySelector<>(comparator, numKeyFields, new TupleTypeInfo<>(typeInfos));
    }

    //......
}
  • KeySelectorUtil.getSelectorForKeys方法用於將Keys轉換爲KeySelector類型

小結

  • KeyedStream的不一樣構造器中都須要一個KeySelector參數
  • DataStream的keyBy方法有不一樣的重載,支持變長int數組,變長String數組以及KeySelector類型
  • keyBy(int... fields)及keyBy(String... fields)裏頭均有經過new Keys.ExpressionKeys,將fields轉換爲Keys.ExpressionKeys,最後調用私有的keyBy(Keys<T> keys)方法,該方法經過調用KeySelectorUtil.getSelectorForKeys方法將Keys轉換爲KeySelector類型

doc

相關文章
相關標籤/搜索