聊聊flink的Queryable State

本文主要研究一下flink的Queryable Statehtml

實例

Job

@Test
    public void testValueStateForQuery() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment
                .createRemoteEnvironment("192.168.99.100", 8081, SubmitTest.JAR_FILE);
        env.addSource(new RandomTuple2Source())
                .keyBy(0) //key by first value of tuple
                .flatMap(new CountWindowAverage())
                .print();
        JobExecutionResult result = env.execute("testQueryableState");
        LOGGER.info("submit job result:{}",result);
    }
複製代碼
  • 這裏運行一個job,它對tuple的第一個值做爲key,而後flatMap操做使用的是CountWindowAverage

CountWindowAverage

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    private transient ValueState<Tuple2<Long, Long>> sum; // a tuple containing the count and the sum

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
        Tuple2<Long, Long> currentSum = sum.value();
        if(currentSum == null){
            currentSum = Tuple2.of(1L,input.f1);
        }else{
            currentSum.f0 += 1;
            currentSum.f1 += input.f1;
        }

        sum.update(currentSum);

        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})); // type information
        descriptor.setQueryable("query-name");
        sum = getRuntimeContext().getState(descriptor);
    }
}
複製代碼
  • CountWindowAverage經過ValueStateDescriptor的setQueryable("query-name")方法,將state聲明爲是queryable的

QueryableStateClient

@Test
    public void testQueryStateByJobId() throws InterruptedException, IOException {
        //get jobId from flink ui running job page
        JobID jobId = JobID.fromHexString("793edfa93f354aa0274f759cb13ce79e");
        long key = 1L;
        //flink-core-1.7.0-sources.jar!/org/apache/flink/configuration/QueryableStateOptions.java
        QueryableStateClient client = new QueryableStateClient("192.168.99.100", 9069);

        // the state descriptor of the state to be fetched.
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average",
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));

        CompletableFuture<ValueState<Tuple2<Long, Long>>> resultFuture =
                client.getKvState(jobId, "query-name", key, BasicTypeInfo.LONG_TYPE_INFO, descriptor);

        LOGGER.info("get kv state return future, waiting......");
        // org.apache.flink.queryablestate.exceptions.UnknownKeyOrNamespaceException: Queryable State Server : No state for the specified key/namespace.
        ValueState<Tuple2<Long, Long>> res = resultFuture.join();
        LOGGER.info("query result:{}",res.value());
        client.shutdownAndWait();
    }
複製代碼
  • 這裏經過QueryableStateClient鏈接QueryableStateClientProxy進行query state;這裏的jobId能夠在job提交以後,經過ui界面查詢獲得,而後使用JobID.fromHexString方法轉爲JobID對象

小結

  • Queryable State的功能目前是beta版本,flink1.7的發行版默認沒有開啓,要開啓的話,須要將flink-queryable-state-runtime_2.11-1.7.0.jar拷貝到/opt/flink/lib/目錄下,這樣子task manager啓動的時候會打印諸如Started Queryable State Proxy Server @ /172.20.0.3:9069的日誌,這樣子就能夠確認是啓用了該功能
  • Queryable State在架構上涉及三個組件,一個是QueryableStateServer,它會在每一個task manager上運行,負責本地state存儲;一個是QueryableStateClientProxy,它也在每一個task manager上運行,負責接收client發來的查詢請求,而後從對應的task manager上獲取對應的state,而後返回給client;一個是QueryableStateClient,它就是一般是運行在flink cluster以外,用於提交用戶的state query
  • QueryableStateServer以及QueryableStateClientProxy均有ports、network-threads、query-threads的屬性能夠配置;QueryableStateServer默認的query.server.ports值爲9097;QueryableStateClientProxy默認的query.proxy.ports值爲9096,client端須要使用這個端口來進行請求
  • 聲明state爲queryable有兩個方法,一個是經過KeyedStream.asQueryableState方法轉爲QueryableStateStream;一個是調用Managed keyed State的StateDescriptor的setQueryable進行聲明;這兩個的區別在於asQueryableState必須是直接做用於KeyedStream對象,所以KeyedStream就不能作後續的transform操做,相似於sink;而經過StateDescriptor的setQueryable進行聲明則相對靈活一點;這裏要注意沒有queryable ListState
  • Queryable State目前有幾點限制,一個是它生命週期跟task同樣,在task運行完的時候就銷燬了,沒辦法查詢,後續可能支持在task完成以後查詢;一個是目前的KvState的Notifications進行使用tell機制,後續可能改成ack模式;一個是目前query的statistics默認是禁用的,後續可能支持發佈到metrics system

doc

相關文章
相關標籤/搜索