「Flink」使用Managed Keyed State實現計數窗口功能

先上代碼:併發

public class WordCountKeyedState {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 初始化測試單詞數據流
        DataStreamSource<String> lineDS = env.addSource(new RichSourceFunction<String>() {
            private boolean isCanaled = false;

            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                while(!isCanaled) {
                    ctx.collect("hadoop flink spark");
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {
                isCanaled = true;
            }
        });

        // 切割單詞,並轉換爲元組
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordTupleDS = lineDS.flatMap((String line, Collector<Tuple2<String, Integer>> ctx) -> {
            Arrays.stream(line.split(" ")).forEach(word -> ctx.collect(Tuple2.of(word, 1)));
        }).returns(Types.TUPLE(Types.STRING, Types.INT));

        // 按照單詞進行分組
        KeyedStream<Tuple2<String, Integer>, Integer> keyedWordTupleDS = wordTupleDS.keyBy(t -> t.f0);

        // 對單詞進行計數
        keyedWordTupleDS.flatMap(new RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {

            private transient ValueState<Tuple2<Integer, Integer>> countSumValueState;

            @Override
            public void open(Configuration parameters) throws Exception {
                // 初始化ValueState
                ValueStateDescriptor<Tuple2<Integer, Integer>> countSumValueStateDesc = new ValueStateDescriptor("countSumValueState",
                        TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {})
                );
                countSumValueState = getRuntimeContext().getState(countSumValueStateDesc);
            }

            @Override
            public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
                if(countSumValueState.value() == null) {
                    countSumValueState.update(Tuple2.of(0, 0));
                }

                Integer count = countSumValueState.value().f0;
                count++;
                Integer valueSum = countSumValueState.value().f1;
                valueSum += value.f1;

                countSumValueState.update(Tuple2.of(count, valueSum));

                // 每當達到3次,發送到下游
                if(count > 3) {
                    out.collect(Tuple2.of(value.f0, valueSum));
                    // 清除計數
                    countSumValueState.update(Tuple2.of(0, valueSum));
                }
            }
        }).print();

        env.execute("KeyedState State");
    }
}

代碼說明:ide

一、構建測試數據源,每秒鐘發送一次文本,爲了測試方便,這裏就發一個包含三個單詞的文本行oop

image

二、對句子按照空格切分,並將單詞轉換爲元組,每一個單詞初始出現的次數爲1測試

image

三、按照單詞進行分組spa

 

 

四、自定義FlatMap3d

初始化ValueState,注意:ValueState只能在KeyedStream中使用,並且每個ValueState都對一個一個key。每當一個併發處理ValueState,都會從上下文獲取到Key的取值,因此每一個處理邏輯拿到的ValueStated都是對應指定key的ValueState,這個部分是由Flink自動完成的。code

image

注意:orm

帶默認初始值的ValueStateDescriptor已通過期了,官方推薦讓咱們手動在處理時檢查是否爲空blog

instead and manually manage the default value by checking whether the contents of the state is null.ip

/**
* Creates a new {@code ValueStateDescriptor} with the given name, default value, and the specific
* serializer.
*
* @deprecated Use {@link #ValueStateDescriptor(String, TypeSerializer)} instead and manually
* manage the default value by checking whether the contents of the state is {@code null}.
*
* @param name The (unique) name for the state.
* @param typeSerializer The type serializer of the values in the state.
* @param defaultValue The default value that will be set when requesting state without setting
* a value before.
*/
@Deprecated
public ValueStateDescriptor(String name, TypeSerializer<T> typeSerializer, T defaultValue) {
super(name, typeSerializer, defaultValue);
}

五、邏輯實現

在flatMap邏輯中判斷ValueState是否已經初始化,若是沒有手動給一個初始值。並進行累加後更新。每當count > 3發送計算結果到下游,並清空計數。

image

相關文章
相關標籤/搜索